KafkaProducer是Kafka中Producer的一种实现,其主要功能就是发送消息给Kafka中broker。其send()方法如下:
/**
* Asynchronously send a record to a topic. Equivalent to <code>send(record, null)</code>.
* See {@link #send(ProducerRecord, Callback)} for details.
*/
@Override
public Future<RecordMetadata> send(ProducerRecord<K, V> record) {
return send(record, null);
}
再看两个参数的send()方法,代码如下:
/**
* Asynchronously send a record to a topic and invoke the provided callback when the send has been acknowledged.
* <p>
* The send is asynchronous and this method will return immediately once the record has been stored in the buffer of
* records waiting to be sent. This allows sending many records in parallel without blocking to wait for the
* response after each one.
* <p>
* The result of the send is a {@link RecordMetadata} specifying the partition the record was sent to and the offset
* it was assigned.
* <p>
* Since the send call is asynchronous it returns a {@link java.util.concurrent.Future Future} for the
* {@link RecordMetadata} that will be assigned to this record. Invoking {@link java.util.concurrent.Future#get()
* get()} on this future will block until the associated request completes and then return the metadata for the record
* or throw any exception that occurred while sending the record.
* <p>
* If you want to simulate a simple blocking call you can call the <code>get()</code> method immediately:
*
* <pre>
* {@code
* byte[] key = "key".getBytes();
* byte[] value = "value".getBytes();
* ProducerRecord<byte[],byte[]> record = new ProducerRecord<byte[],byte[]>("my-topic", key, value)
* producer.send(record).get();
* }</pre>
* <p>
* Fully non-blocking usage can make use of the {@link Callback} parameter to provide a callback that
* will be invoked when the request is complete.
*
* <pre>
* {@code
* ProducerRecord<byte[],byte[]> record = new ProducerRecord<byte[],byte[]>("the-topic", key, value);
* producer.send(myRecord,
* new Callback() {
* public void onCompletion(RecordMetadata metadata, Exception e) {
* if(e != null)
* e.printStackTrace();
* System.out.println("The offset of the record we just sent is: " + metadata.offset());
* }
* });
* }
* </pre>
*
* Callbacks for records being sent to the same partition are guaranteed to execute in order. That is, in the
* following example <code>callback1</code> is guaranteed to execute before <code>callback2</code>:
*
* <pre>
* {@code
* producer.send(new ProducerRecord<byte[],byte[]>(topic, partition, key1, value1), callback1);
* producer.send(new ProducerRecord<byte[],byte[]>(topic, partition, key2, value2), callback2);
* }
* </pre>
* <p>
* Note that callbacks will generally execute in the I/O thread of the producer and so should be reasonably fast or
* they will delay the sending of messages from other threads. If you want to execute blocking or computationally
* expensive callbacks it is recommended to use your own {@link java.util.concurrent.Executor} in the callback body
* to parallelize processing.
*
* @param record The record to send
* @param callback A user-supplied callback to execute when the record has been acknowledged by the server (null
* indicates no callback)
*
* @throws InterruptException If the thread is interrupted while blocked
* @throws SerializationException If the key or value are not valid objects given the configured serializers
* @throws BufferExhaustedException If <code>block.on.buffer.full=false</code> and the buffer is full.
*
*/
@Override
public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {
try {
// first make sure the metadata for the topic is available
// 首先确保该主题topic对应的元数据metadata是可用的
long waitedOnMetadataMs = waitOnMetadata(record.topic(), this.maxBlockTimeMs);
// 计算剩余等待时间remainingWaitMs
long remainingWaitMs = Math.max(0, this.maxBlockTimeMs - waitedOnMetadataMs);
// 得到序列化key:serializedKey
byte[] serializedKey;
try {
// 根据record中topic、key,利用valueSerializer得到序列化key:serializedKey
serializedKey = keySerializer.serialize(record.topic(), record.key());
} catch (ClassCastException cce) {
throw new SerializationException("Can't convert key of class " + record.key().getClass().getName() +
" to class " + producerConfig.getClass(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG).getName() +
" specified in key.serializer");
}
// 得到序列化value:serializedValue
byte[] serializedValue;
try {
// 根据record中topic、value,利用valueSerializer得到序列化value:serializedValue
serializedValue = valueSerializer.serialize(record.topic(), record.value());
} catch (ClassCastException cce) {
throw new SerializationException("Can't convert value of class " + record.value().getClass().getName() +
" to class " + producerConfig.getClass(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG).getName() +
" specified in value.serializer");
}
// 调用partition()方法获得分区号partition
int partition = partition(record, serializedKey, serializedValue, metadata.fetch());
// 计算序列化后的key、value及其offset、size所占大小serializedSize
int serializedSize = Records.LOG_OVERHEAD + Record.recordSize(serializedKey, serializedValue);
// 确保记录大小serializedSize是有效的
ensureValidRecordSize(serializedSize);
// 根据record中的topic和partition构造TopicPartition实例tp
TopicPartition tp = new TopicPartition(record.topic(), partition);
log.trace("Sending record {} with callback {} to topic {} partition {}", record, callback, record.topic(), partition);
// 调用accumulator的append()方法添加记录,获得记录添加结果RecordAppendResult类型的result
RecordAccumulator.RecordAppendResult result = accumulator.append(tp, serializedKey, serializedValue, callback, remainingWaitMs);
// 根据结果result的batchIsFull或newBatchCreated确定是否执行sender的wakeup()
if (result.batchIsFull || result.newBatchCreated) {
log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition);
this.sender.wakeup();
}
// 返回result中的future
return result.future;
// handling exceptions and record the errors;
// for API exceptions return them in the future,
// for other exceptions throw directly
} catch (ApiException e) {
log.debug("Exception occurred during message send:", e);
if (callback != null)
callback.onCompletion(null, e);
this.errors.record();
return new FutureFailure(e);
} catch (InterruptedException e) {
this.errors.record();
throw new InterruptException(e);
} catch (BufferExhaustedException e) {
this.errors.record();
this.metrics.sensor("buffer-exhausted-records").record();
throw e;
} catch (KafkaException e) {
this.errors.record();
throw e;
}
}
其大体逻辑如下:
1、首先调用waitOnMetadata()方法确保该主题topic对应的元数据metadata是可用的;
2、计算剩余等待时间remainingWaitMs;
3、根据record中topic、key,利用valueSerializer得到序列化key:serializedKey;
4、根据record中topic、value,利用valueSerializer得到序列化value:serializedValue;
5、调用partition()方法获得分区号partition;
6、计算序列化后的key、value及其offset、size所占大小serializedSize;
7、调用ensureValidRecordSize()方法确保记录大小serializedSize是有效的;
8、根据record中的topic和partition构造TopicPartition实例tp;
9、调用accumulator的append()方法添加记录,获得记录添加结果RecordAppendResult类型的result;
10、根据结果result的batchIsFull或newBatchCreated确定是否执行sender的wakeup();
11、返回result中的future。