Regardless how Kafka is used, we will always use Kafka by creating a producer that writes data to Kafka, a consumer reads data from Kafka, or an application that serves both roles.
We start producing messages to Kafka by creating a ProducerRecord
, which must include the topic we want to send the record to and a value. We can optionally specify a key, a partition, a timestamp and/or a collection of headers. Once ProducerRecord
is sent, the first thing the producer will do is serialize the key and value objects to byte array so they can be sent over the network.
If a partition is not explicitly specified, the data is sent to a partitioner which chooses a partition based on the ProducerRecord
key. Once a partition is selected, the producer knows which topic and partition the record will be sent to. The record is then added to a batch of records that will also be sent to the same topic and partition. A separate thread is responsible for sending those batches of records to the appropriate brokers.
When the broker receives the messages, it sends back a response. If the messages were successfully written to Kafka, it will return a RecordMetadata
object with the topic, partition, and the offset of the record within the partition. Otherwise, an error is returned.
The Kafka producer object has 3 mandatory properties:
boostrap.servers
List of host:port
pairs of brokers that the producer will use to establish initial connection to the Kafka cluster.
key.serializer
Name of a class that will be used to serialize the keys of the records we will produce to Kafka. Brokers expect byte arrays as keys and values of messages.
value.serializer
Like above, this is name of a class that will be used to serialize the values of the records produced to Kafka.
An example on initializing the producer is:
Properties kafkaProps = new Properties(); 1
kafkaProps.put("bootstrap.servers", "broker1:9092,broker2:9092");
kafkaProps.put("key.serializer",
"org.apache.kafka.common.serialization.StringSerializer"); 2
kafkaProps.put("value.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
producer = new KafkaProducer<String, String>(kafkaProps);
There are three primary methods of sending messages:
Future
object. We can use get()
to wait on it and see whether if the send is successsend()
function with a callback function, which get triggered when it receives a response from the broker.To send a message, the simplest way is to do
ProducerRecord<String, String> record =
new ProducerRecord<>("CustomerCountry", "Precision Products",
"France"); 1
try {
producer.send(record);
} catch (Exception e) {
e.printStackTrace();
}
For asynchronous send, a Callback
function need to be passed in the parameter