Yazılarımı Evernote'ta tutmak yerine Medium'a taşıdım, ek bir yerde daha tutayım diye burda da paylaşıyorum. İlgilenen arkadaşlar da faydalanabilir.
NOT: Java +8 sürümleri yüklü olmalıdır.
Kafka'yı indirmemiz gerek. İndirme linki.
İndirme bittiğinde klasörü dışarı çıkartalım.
İlk olarak zookeeper-server'ı başlatmamız gerek. Topic partitionlarının offset değerlerini generate eden kısım ile zookeeper ilgileniyor. Onu önce başlatmamız gerek.
Zookeeper'ı başlatalım.
Başka bir terminal açıp Kafka-server'ını başlatalım.
İlk mesajı yollamadan önce "topic" oluşturmamız gerekiyor. Kafka'ya yolladığımız mesajları belirli topic'e yazarız.
Hemen yeni bir terminal açıp topic oluşturalım.
Yukarıdaki komut ile "first-topic" adıyla bir topic oluşturduk. Şimdi topic'e bir göz atalım.
Bu komut ile birlikte topic detayları terminale çıktı olarak yazılacak.
Tabii topic adının "first-topic" olduğunu varsayalım burada.
Yeni bir terminal açıp topic'e mesaj yazalım. Producer'ı çalıştıralım. İlerleyen kısımlarda bunu Java ile gerçekleştireceğiz.
Terminale komutları girdikten sonra bizden bir input bekleyecek. İstediğimiz mesajı topice yazabiliriz.
Şimdi de yazdığımız mesajları consumer ile dinleyelim. İlerleyen kısımlarda bunu Java ile gerçekleştireceğiz producer'da olduğu gibi.
"first-topic" isimli topic'e producer ile yazdığımız mesajları consumer ile dinleyebildik. Şimdi bu işlemleri Java ile yapalım. İlk olarak Producer'ı oluşturalım.
Kafka nesnelerini kullanabilmek için pom.xml'e dependency'yi eklemeyi unutmayalım.
Yukarıdaki kulanımda uygulamayı çalıştırdığımızda mesajı topic'e bir kere yazacak. Bunu scheduled olarak yazıp mesajların belirli aralıklarla topic'e yazılmasını sağlayabiliriz. Böylelikle yazdığımız mesajları consumer ile daha rahat gözlemleyebiliriz. Dosyayı bu sefer 1 saniye aralıklarla JSON formatında yazalım.
JSON nesnelerini kullanabilmemiz için pom.xml'e dependency'sini eklemeyi unutmayalım.
Yukarıdaki kod parçalarıyla topic'e mesaj yazmış olduk. Bunu consumer terminalinden kontrol edebiliriz.
Şimdi ise Consumer'ı oluşturalım.
Burada yazılan group id'ye config/consumer.properties dosyası içinden ulaşabiliriz.
Consumer sürekli olarak topic'i dinleyecek ve mesaj yazıldığında ise çalıştığı IDE'nin ekranına çıktı olarak yazdıracaktır.
NOT: Java +8 sürümleri yüklü olmalıdır.
Kafka'yı indirmemiz gerek. İndirme linki.
İndirme bittiğinde klasörü dışarı çıkartalım.
Kod:
$ tar -xzf kafka_2.13-2.7.0.tgz
$ cd kafka_2.13-2.7.0
İlk olarak zookeeper-server'ı başlatmamız gerek. Topic partitionlarının offset değerlerini generate eden kısım ile zookeeper ilgileniyor. Onu önce başlatmamız gerek.
Zookeeper'ı başlatalım.
Kod:
bin/zookeeper-server-start.sh config/zookeeper.properties
Başka bir terminal açıp Kafka-server'ını başlatalım.
Kod:
$ bin/kafka-server-start.sh config/server.properties
İlk mesajı yollamadan önce "topic" oluşturmamız gerekiyor. Kafka'ya yolladığımız mesajları belirli topic'e yazarız.
Hemen yeni bir terminal açıp topic oluşturalım.
Kod:
$ bin/kafka-topics.sh --create --topic first-topic --bootstrap-server localhost:9092
Yukarıdaki komut ile "first-topic" adıyla bir topic oluşturduk. Şimdi topic'e bir göz atalım.
Kod:
$ bin/kafka-topics.sh --describe --topic first-topic --bootstrap-server localhost:9092
Bu komut ile birlikte topic detayları terminale çıktı olarak yazılacak.
Tabii topic adının "first-topic" olduğunu varsayalım burada.
Yeni bir terminal açıp topic'e mesaj yazalım. Producer'ı çalıştıralım. İlerleyen kısımlarda bunu Java ile gerçekleştireceğiz.
Kod:
$ bin/kafka-console-producer.sh --topic first-topic --bootstrap-server localhost:9092
First message!
Second Message!
Terminale komutları girdikten sonra bizden bir input bekleyecek. İstediğimiz mesajı topice yazabiliriz.
Şimdi de yazdığımız mesajları consumer ile dinleyelim. İlerleyen kısımlarda bunu Java ile gerçekleştireceğiz producer'da olduğu gibi.
Kod:
$ bin/kafka-console-consumer.sh --topic first-topic --from-beginning --bootstrap-server localhost:9092
First message!
Second Message!
"first-topic" isimli topic'e producer ile yazdığımız mesajları consumer ile dinleyebildik. Şimdi bu işlemleri Java ile yapalım. İlk olarak Producer'ı oluşturalım.
Kafka nesnelerini kullanabilmek için pom.xml'e dependency'yi eklemeyi unutmayalım.
Java:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.7.0</version>
</dependency>
Java:
public class Producer{
public static void main(String[] args) {
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
String recordValue = "Current time is " + Instant.now().toString();
ProducerRecord<String, String> record = new ProducerRecord<>("first-topic", null, recordValue);
producer.send(record);
log.info("Current time pushed in a topic!");
producer.flush();
}
}
Yukarıdaki kulanımda uygulamayı çalıştırdığımızda mesajı topic'e bir kere yazacak. Bunu scheduled olarak yazıp mesajların belirli aralıklarla topic'e yazılmasını sağlayabiliriz. Böylelikle yazdığımız mesajları consumer ile daha rahat gözlemleyebiliriz. Dosyayı bu sefer 1 saniye aralıklarla JSON formatında yazalım.
JSON nesnelerini kullanabilmemiz için pom.xml'e dependency'sini eklemeyi unutmayalım.
Java:
<dependency>
<groupId>org.json</groupId>
<artifactId>json</artifactId>
<version>20201115</version>
</dependency>
Java:
public static void main(String[] args) {
ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
scheduler.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
String message;
JSONObject json = new JSONObject();
json.put("id", UUID.randomUUID());
json.put("firstName", "FirstName");
json.put("lastName", "LastName");
JSONArray array = new JSONArray();
JSONObject item = new JSONObject();
item.put("name", "CompanyName");
item.put("role", "Title");
array.put(item);
json.put("Job", array);
message = json.toString();
ProducerRecord<String, String> record = new ProducerRecord<>("first-topic", null, message);
producer.send(record);
producer.flush();
}
}, 1, 1, TimeUnit.SECONDS);
}
Yukarıdaki kod parçalarıyla topic'e mesaj yazmış olduk. Bunu consumer terminalinden kontrol edebiliriz.
Şimdi ise Consumer'ı oluşturalım.
Java:
public class Consumer {
public static void main(String[] args) {
String bootstrapServer = "localhost:9092";
String topic = "first-topic";
Properties properties = new Properties();
properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test-consumer-group");
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
// Subscribing
consumer.subscribe(Arrays.asList(topic));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
System.out.println(record);
}
}
}
}
Kod:
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "<group-id>");
Burada yazılan group id'ye config/consumer.properties dosyası içinden ulaşabiliriz.
Consumer sürekli olarak topic'i dinleyecek ve mesaj yazıldığında ise çalıştığı IDE'nin ekranına çıktı olarak yazdıracaktır.
Son düzenleme: