Ejemplo de Apache Kafka con Java

De ChuWiki


En este tutorial vamos a crear un ciente Java de Apache Kafka, consultar los topic que hay creados para saber si hay un topic "quickstart-events" que usaremos como ejemplo y crearlo si no está ya creado. Crearemos también un consumidor y un productor para ese topic.

Partimos de que tenemos Apache Kafka instalado y accesible desde nuestro programa Java. Si quieres seguir el ejemplo y no lo tienes instalado, puedes ver cómo instalar Apache Kafka en tu ordenador o cómo instalar Apache Kafka en un docker.

El código completo de ejemplo lo tienes en github en el proyecto kafka-example

Dependencias Maven para Java y Apache Kafka[editar]

Primero necesitamos tener en nuestro proyecto la API Java de conexión a Kafka. Podemos conseguirla, si nuestro proyecto es maven, con las siguientes dependencias

    <dependency>
      <groupId>org.apache.kafka</groupId>
      <artifactId>kafka-clients</artifactId>
      <version>3.6.1</version>
    </dependency>
    <dependency>
      <groupId>org.slf4j</groupId>
      <artifactId>slf4j-simple</artifactId>
      <version>1.7.36</version>
      <scope>runtime</scope>
    </dependency>

Con kafka-clients hemos añadido la API. Si en nuestras pruebas queremos ver el log de salida de Kafka para depurar posibles errores, necesitamos también un sistema de log compatible con slf4j, que es el que usa Kafka. Ponemos en este ejemplo slf4j-simple.

En mvnrepository puedes ver que la última versión disponible de kafka-clients es la 3.6.1. Y mirando las dependencias runtime, vemos que usa slf4j-api versión 1.7.36, así que esta será la versión que pongamos de slf4j-simple para garantizar compatibilidad.

slf4j-simple solo se necesita en runtime, así que así lo ponemos en la dependencia.

Cliente de administración Java para Apache Kafka[editar]

Para la consulta de topic y creación si hace falta, necesitamos usar un cliente de administración de Kafka. Se puede conseguir con el siguiente código java

String KAFKA_URL = "localhost:29092";

Properties props = new Properties();
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_URL);

try (Admin admin = Admin.create(props)) {
   ...
}

Hay que crear una instancia de la clase Properties y ponerle una propiedad con la URL de conexión a nuestro nodo Apache Kakfa. La propiedad está definida en Kafka y es AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG y la URL es la de nuestro nodo localhost:29092 que hemos metido en la variable KAFKA_URL.

En la clase Kakfa AdminClientConfig hay más posibles propiedades que se pueden configurar para la conexión, pero esta es la única imprescindible. Si tuvieramos un cluster con varios nodos, pondriamos en esta propiedad, en un sólo String, todas las URL separadas por coma.

La obtención del cliente de administración se hace con Admin admin = Admin.create(props). Usamos el método create() pasando las propiedades que acabamos de rellenar. Hemos metido la creación en una estructura try-with-resources try(...) { para asegurar su cierre cuando terminemos de usarla.

Consulta de los topic Kafka desde Java[editar]

Una vez tenemos el cliente de administración creado, podemos consultar los topic existentes con el siguiente código

ListTopicsResult listTopicsResult = admin.listTopics();
KafkaFuture<Set<String>> names = listTopicsResult.names();
Set<String> strings = names.get();
strings.forEach(System.out::println);

Con admin.listTopics() obtenemos una instancia de ListTopicsResult con los datos de los topics. listTopicsResult.names() nos da un conjunto con los nombres de los topics. Puesto que ListTopicsResult no trae los resultados inmediatamente, el método names() nos devuelve un KafkaFuture, indicando así que aunque la llamada nos devuelve el control inmediatamente, los resultados pueden todavía no estar diponibles y tardar un rato.

Haciendo la llamada names.get() nos quedamos bloqueados hasta que estén los resultados disponibles. Y ahora sí, obtenemos un Set de nombres de topics. Nos basta hacer un bucle para sacarlos por pantalla.

O en nuestro ejemplo, buscar si está el topic quickstart-events para crearlo en caso contrario

String TOPIC = "quickstart-events";

if (!strings.contains(TOPIC)){
   // crear topic "quickstart-events"
}

Crear topic Kafka con Java[editar]

Para la creación nos sirve el siguiente código

String topicName = TOPIC;
int partitions = 12;
short replicationFactor = 1;

CreateTopicsResult result = admin.createTopics(Collections.singleton(
   new NewTopic(topicName, partitions, replicationFactor)));

   KafkaFuture<Void> future = result.values().get(topicName);

   future.get();

Aparte del nombre del Topic, necesitamos un par de parámetros.

  • partitions. Kakfa puede partir los mensajes que van en un topic en "particiones" de forma que luego puede distribuir estas entre distitntos nodos y hacer procesamiento en paralelo. Para ejemplos sencillos, 12 es un buen número. Podemos poner lo que queramos, simplemente afectara más o menos al rendimiento. En nuestro ejemplo sencillo, no es importante.
  • replicationFactor. Es cuántas copias queremos de los mensajes en los distintos nodos. Por defecto suelen ser 3, pero en nuestro ejemplo sólo hemos arrancado un nodo. Así que diremos que solo queremos una copia, puesto que no hay más nodos para hacer más copias.

Para la creación del Topic, usamos admin.createTopìcs() que admite como parámetro una colección de NewTopic. Usamos la clase util de java Collection.singleton() para obener una colección a partir de un único elemento, puesto que sólo vamos a crear un Topic nuevo.

Al hacer new NewTopic(), debemos pasar tres parámetros

  • Nombre del Topic, un String, el que queramos. En nuestro caso la variable topicName que vale "quickstart-events".
  • partitions, que ya hemos comentado, de un valor más o menos adecuado, 12.
  • replicationFactor, que debe ser 1 en nuestro ejemplo porque sólo tenemos un nodo.

El Topic puede tardar un tiempo en crearse, por eso la llamada devuelve un KafkaFuture. Haciendo una llamada get() al resultado devuelto, quedamos bloqueados hasta que el topic finalmente se ha creado.

Producer Java para Apache Kafka[editar]

Un Producer o productor es un cliente de Apache Kafka que envía mensajes al nodo. El siguiente código muestra cómo crear un Producer y enviar mensajes al nodo

Properties props = new Properties();
props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_URL);
props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");

int i=0;
try(Producer<String, String> producer = new KafkaProducer<>(props)) {
    while(i<100){
        producer.send(new ProducerRecord<String, String>(TOPIC, Integer.toString(i)));
        i++;
        Thread.sleep(100);
        System.out.println("Producer envia: "+i);
    }
    System.out.println("Termina el Producer");
} catch (InterruptedException e) {
    e.printStackTrace();
}

Primero necesitamos crear una instancia de Properties para rellenarla con los parámetros de configuración de nuestro Producer.

  • ProducerConfig.BOOTSTRAP_SERVERS_CONFIG es la key para indicar la URL de conexión a nuestro nodo Kafka, que tenemos guardada en la variable KAFKA_URL.
  • ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG para indicar cual es la clase que serializa las KEY de los mensajes que enviemos. Usaremos la clase que nos proporciona Kafka "org.apache.kafka.common.serialization.StringSerializer" que serializa String. No vamos a usar KEY en este ejemplo, pero es obligatorio poner este parámetro. Solo por si tienes curiosidad, ¿recuerdas que pusimos 12 particiones?. Cuando enviamos un mensaje podemos ponerle una KEY o identificador. Kafa usará este valor para decidir en qué partición poner el mensaje. Así podemos "jugar" con la KEY si de alguna forma queremos forzar que determinados mensajes vayan en las mismas particiones. Esto es un concepto algo avanzado y no lo vamos a usar.
  • ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG para indicar cual es la clase que serializa nuestros mensajes. Como vamos a enviar String de texto sencillos, usaremos la que nos proporciona Kafka para ello "org.apache.kafka.common.serialization.StringSerializer". Si nuestro dato no fuera un String, podemos ver las clases que proporciona Kafka que implementen "org.apache.kafka.common.serialization.Serializer" para ver si nos vale alguna o bien construir la nuestra propia implementando esa interface.

Una vez rellenas las propiedades, creamos el productor con Producer<String, String> producer = new KafkaProducer<>(props). Lo hemos metido en un try-with-resources para asegurarnos que se cierra cuando terminemos con él.

A modo de ejemplo, hacemos un bucle con un contador i para para enviar los números enteros de 0 a 99, convertidos a String, al nodo Kafka. Ponemos cien milisegundos de espera entre mensaje y mensaje.

El mensaje es una instancia de ProducerRecord. Hay varios constructores, pero usaremos el que lleva dos parámetros. El TOPIC por el que queremos enviar el mensaje "quickstart-events" y el mensaje en sí mismo, el String Integer.toString(i).

Para el envío, se usa producer.send()

Consumer Java para Apache Kafka[editar]

El código para el consumidor de mensasjes de Kafka puede ser el siguiente

Properties props = new Properties();
props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_URL);
props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString());
props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");

try(KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props)) {
    consumer.subscribe(Collections.singleton(TOPIC));
    while (true) {
        ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(5000));
        if (records.isEmpty()){
            break;
        }
        for (ConsumerRecord<String, String> record : records) {
            System.out.printf("value = %s\n", record.value());
        }
    }
    System.out.println("Termina el Cosumer");
}

Al igual que con el productor, necesitamos crear una instancia de Properties y poner ahí la configuración de nuestro consumidor. Tres de los valores ya los hemos explicado, son igual que en el productor: BOOTSTRAP_SERVER_CONFIG y los dos serializadores. Vamos con el que queda sin explicar.

ConsumerConfig.GROUP_ID_CONFIG sirve para que Kafka sepa indentificar a este consumidor. El valor que pongamos detrás puede ser un String cualquiera. Para este ejemplo, lo generamos aleatoriamente con UUID.randomUUID().toString(). Este GROUP_ID_CONFIG tiene dos utilidades. Veámoslas,

Por un lado, podemos poner varios consumidores a escuchar el mismo Topic. Kafka entregará el mensaje a todos los consumidores siempre y cuando el GROUP_ID_CONFIG de cada consumidor sea distinto de los demás. Si dos consumidores tienen el mismo GROUP_ID_CONFIG, Kafka solo entregará el mensaje a uno de ellos y no al otro. Es una forma de poder repartir el trabajo. El cómo lo reparte,

Por otro lado, si solo tenemos un consumidor, lo cerramos y lo arrancamos más tarde con el mismo GROUP_ID_CONFIG, Kafka recuerda qué mensajes le ha entregado y le entegará a partir del último que haya recibido.

En nuestro ejemplo y para evitar problemas si arrancas varias veces el ejemplo, ponemos un GROUP_ID_CONFIG distinto en cada arranque, de forma aleatorio con el UUID. Así recibirás todos los mensajes cada vez.

Una vez tenemos rellenas las Properties, creamos el consumidor con new KafkaConsumer<>(props). Lo metemos en un try-with-resources para asegurarnos de su cierre cuando terminemos con él.

Luego hay que suscribirse al TOPIC con consumer.subscribe(). Se pasa como parámetro una colección con el nombre de todos los Topic a los que queremos suscribirnos. En nuestro ejemplo, sólo a TOPIC ("quickstart-events").

Y finalmente hay que meterse en un bucle para ir leyendo los mensajes. La lectura de los mensajes se hace con consumer.poll(Duration.ofMillis(5000)). El parámetro que se pasa es el timeout que queremos esperar por los mensajes, 5 segundos en nuestro ejemplo. La llamada se queda bloqueada hasta que haya mensajes o pasen los 5 segundos. El resultado lo obtenemos en ConsumerRecords<String, String>.

Si no hemos recibido mensajes ( records.isEmpty() ) consideramos que el productor se ha cansado de enviar, así que terminamos el bucle con break.

Si hemos recibido mensajes, un bucle y sacamos por pantalla record.value() que contiene el mensaje recibido.