Regardless how Kafka is used, we will always use Kafka by creating a producer that writes data to Kafka, a consumer reads data from Kafka, or an application that serves both roles.

Producer Overview

Untitled

We start producing messages to Kafka by creating a ProducerRecord, which must include the topic we want to send the record to and a value. We can optionally specify a key, a partition, a timestamp and/or a collection of headers. Once ProducerRecord is sent, the first thing the producer will do is serialize the key and value objects to byte array so they can be sent over the network.

If a partition is not explicitly specified, the data is sent to a partitioner which chooses a partition based on the ProducerRecord key. Once a partition is selected, the producer knows which topic and partition the record will be sent to. The record is then added to a batch of records that will also be sent to the same topic and partition. A separate thread is responsible for sending those batches of records to the appropriate brokers.

When the broker receives the messages, it sends back a response. If the messages were successfully written to Kafka, it will return a RecordMetadata object with the topic, partition, and the offset of the record within the partition. Otherwise, an error is returned.

Constructing a Kafka Producer

The Kafka producer object has 3 mandatory properties:

An example on initializing the producer is:

Properties kafkaProps = new Properties(); 1
kafkaProps.put("bootstrap.servers", "broker1:9092,broker2:9092");

kafkaProps.put("key.serializer",
    "org.apache.kafka.common.serialization.StringSerializer"); 2
kafkaProps.put("value.serializer",
    "org.apache.kafka.common.serialization.StringSerializer");

producer = new KafkaProducer<String, String>(kafkaProps);

There are three primary methods of sending messages:

To send a message, the simplest way is to do

ProducerRecord<String, String> record =
    new ProducerRecord<>("CustomerCountry", "Precision Products",
        "France"); 1
try {
    producer.send(record); 
} catch (Exception e) {
    e.printStackTrace(); 
}

For asynchronous send, a Callback function need to be passed in the parameter