Rehber Apache Kafka Producer & Consumer'ını Java ile kullanmak

Yazılarımı Evernote'ta tutmak yerine Medium'a taşıdım, ek bir yerde daha tutayım diye burda da paylaşıyorum. İlgilenen arkadaşlar da faydalanabilir.

NOT: Java +8 sürümleri yüklü olmalıdır.

Kafka'yı indirmemiz gerek. İndirme linki.
İndirme bittiğinde klasörü dışarı çıkartalım.

Kod:
$ tar -xzf kafka_2.13-2.7.0.tgz
$ cd kafka_2.13-2.7.0

İlk olarak zookeeper-server'ı başlatmamız gerek. Topic partitionlarının offset değerlerini generate eden kısım ile zookeeper ilgileniyor. Onu önce başlatmamız gerek.
Zookeeper'ı başlatalım.

Kod:
bin/zookeeper-server-start.sh config/zookeeper.properties

Başka bir terminal açıp Kafka-server'ını başlatalım.

Kod:
$ bin/kafka-server-start.sh config/server.properties

İlk mesajı yollamadan önce "topic" oluşturmamız gerekiyor. Kafka'ya yolladığımız mesajları belirli topic'e yazarız.
Hemen yeni bir terminal açıp topic oluşturalım.

Kod:
$ bin/kafka-topics.sh --create --topic first-topic --bootstrap-server localhost:9092

Yukarıdaki komut ile "first-topic" adıyla bir topic oluşturduk. Şimdi topic'e bir göz atalım.

Kod:
$ bin/kafka-topics.sh --describe --topic first-topic --bootstrap-server localhost:9092

Bu komut ile birlikte topic detayları terminale çıktı olarak yazılacak.

topic-desc.png
Tabii topic adının "first-topic" olduğunu varsayalım burada.

Yeni bir terminal açıp topic'e mesaj yazalım. Producer'ı çalıştıralım. İlerleyen kısımlarda bunu Java ile gerçekleştireceğiz.

Kod:
$ bin/kafka-console-producer.sh --topic first-topic --bootstrap-server localhost:9092
First message!
Second Message!

Terminale komutları girdikten sonra bizden bir input bekleyecek. İstediğimiz mesajı topice yazabiliriz.


Şimdi de yazdığımız mesajları consumer ile dinleyelim. İlerleyen kısımlarda bunu Java ile gerçekleştireceğiz producer'da olduğu gibi.

Kod:
$ bin/kafka-console-consumer.sh --topic first-topic --from-beginning --bootstrap-server localhost:9092
First message!
Second Message!


"first-topic" isimli topic'e producer ile yazdığımız mesajları consumer ile dinleyebildik. Şimdi bu işlemleri Java ile yapalım. İlk olarak Producer'ı oluşturalım.

Kafka nesnelerini kullanabilmek için pom.xml'e dependency'yi eklemeyi unutmayalım.

Java:
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>2.7.0</version>
</dependency>

Java:
public class Producer{
    public static void main(String[] args) {
        Properties properties = new Properties();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
        String recordValue = "Current time is " + Instant.now().toString();
        ProducerRecord<String, String> record = new ProducerRecord<>("first-topic", null, recordValue);
        producer.send(record);
        log.info("Current time pushed in a topic!");
        producer.flush();
    }
}

Yukarıdaki kulanımda uygulamayı çalıştırdığımızda mesajı topic'e bir kere yazacak. Bunu scheduled olarak yazıp mesajların belirli aralıklarla topic'e yazılmasını sağlayabiliriz. Böylelikle yazdığımız mesajları consumer ile daha rahat gözlemleyebiliriz. Dosyayı bu sefer 1 saniye aralıklarla JSON formatında yazalım.

JSON nesnelerini kullanabilmemiz için pom.xml'e dependency'sini eklemeyi unutmayalım.

Java:
<dependency>
    <groupId>org.json</groupId>
    <artifactId>json</artifactId>
    <version>20201115</version>
</dependency>

Java:
public static void main(String[] args) {
    ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
    scheduler.scheduleAtFixedRate(new Runnable() {
        @Override
        public void run() {
            Properties properties = new Properties();
            properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
            properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
            properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
            KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
            String message;
            JSONObject json = new JSONObject();
            json.put("id", UUID.randomUUID());
            json.put("firstName", "FirstName");
            json.put("lastName", "LastName");
            JSONArray array = new JSONArray();
            JSONObject item = new JSONObject();
            item.put("name", "CompanyName");
            item.put("role", "Title");
            array.put(item);
            json.put("Job", array);
            message = json.toString();
            ProducerRecord<String, String> record = new ProducerRecord<>("first-topic", null, message);
            producer.send(record);
            producer.flush();
        }
    }, 1, 1, TimeUnit.SECONDS);
}


Yukarıdaki kod parçalarıyla topic'e mesaj yazmış olduk. Bunu consumer terminalinden kontrol edebiliriz.

4248f819-8d60-3338-12cf-cbdc3edb8aaf


Şimdi ise Consumer'ı oluşturalım.
Java:
public class Consumer {
    public static void main(String[] args) {
        String bootstrapServer = "localhost:9092";
        String topic = "first-topic";
        Properties properties = new Properties();
        properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
        properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test-consumer-group");
        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);

        // Subscribing
        consumer.subscribe(Arrays.asList(topic));
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
            for (ConsumerRecord<String, String> record : records) {
                System.out.println(record);
            }
        }
    }
}


Kod:
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "<group-id>");

Burada yazılan group id'ye config/consumer.properties dosyası içinden ulaşabiliriz.


Consumer sürekli olarak topic'i dinleyecek ve mesaj yazıldığında ise çalıştığı IDE'nin ekranına çıktı olarak yazdıracaktır.
 
Son düzenleme:

Geri
Yukarı