生产者客户端API
Properties props = new Properties();
props.put("bootstrap.servers", "192.168.6.187:9092,192.168.6.188:9092,192.168.6.229:9092");
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<>(props);
for(int i = 0; i < 100; i++)
producer.send(new ProducerRecord<String, String>("topic_name", Integer.toString(i), Integer.toString(i))
);
producer.close();
生产者的缓冲空间池保留尚未发送到服务器的消息,后台I/O线程负责将这些消息转换成请求发送到集群。如果使用后不关闭生产者,则会泄露这些资源。
send()方法是异步的,添加消息到缓冲区等待发送,并立即返回。生产者将单个的消息批量在一起发送来提高效率。
ack判别请求是否为完整,指定“all”将会阻塞消息,这种设置性能最低,但是是最可靠的。
retries,如果请求失败,生产者会自动重试,指定0次,如果启用重试,则会有重复消息的可能性。
producer缓存每个分区未发送的消息。缓存的大小通过 batch.size 指定。值较大的话将会产生更大的批。并需要更多的内存(因为每个“活跃”的分区都有1个缓冲区)。
默认缓冲可立即发送,即便缓冲空间还没有满,但是,如果想减少请求的数量,可以设置linger.ms大于0。这将指示生产者发送请求之前等待一段时间,在高负载下,相近的时间一般也会组成批,即使是 linger.ms=0。在不处于高负载的情况下,如果设置比0大,以少量的延迟代价换取更少的,更有效的请求。
buffer.memory 控制生产者可用的缓存总量,如果消息发送速度比其传输到服务器的快,将会耗尽这个缓存空间。当缓存空间耗尽,其他发送调用将被阻塞,阻塞时间的阈值通过max.block.ms设定,之后它将抛出一个TimeoutException。
key.serializer和value.serializer示例,将用户提供的key和value对象ProducerRecord转换成字节,可以使用附带的ByteArraySerializaer或StringSerializer处理简单的string或byte类型。
send()
public Future<RecordMetadata> send(ProducerRecord<K,V> record,Callback callback)
异步发送一条消息到topic,并调用callback(当发送已确认)。
send是异步的,并且一旦消息被保存在等待发送的消息缓存中,此方法就立即返回。这样并行发送多条消息而不阻塞去等待每一条消息的响应。
发送的结果是一个RecordMetadata,它指定了消息发送的分区,分配的offset和消息的时间戳。如果topic使用的是CreateTime,则使用用户提供的时间戳或发送的时间,如果topic使用的是LogAppendTime,则追加消息时,时间戳是broker的本地时间。
由于send调用是异步的,它将为此消息的RecordMetadata返回一个Future。如果future调用get(),则将阻塞,直到相关请求完成并返回该消息的metadata,或抛出发送异常。
要模拟一个简单的阻塞调用,可以调用get()方法。
byte[] key = "key".getBytes();
byte[] value = "value".getBytes();
ProducerRecord<byte[],byte[]> record = new ProducerRecord<byte[],byte[]>("topic_name", key, value)
producer.send(record).get();
完全无阻塞的话,可以利用回调参数提供的请求完成时的回调通知
ProducerRecord<byte[],byte[]> record = new ProducerRecord<byte[],byte[]>("topic_name", key, value);
producer.send(record,new Callback() {
public void onCompletion(RecordMetadata metadata, Exception e){
if(e != null)
e.printStackTrace();
System.out.println("The offset of the record we just sent is: " + metadata.offset());}
});
发送到同一个分区的消息回调保证按一定的顺序执行,也就是说,在下面的例子中 callback1 保证执行在 callback2 之前:
producer.send(new ProducerRecord<byte[],byte[]>(topic, partition, key1, value1), callback1);
producer.send(new ProducerRecord<byte[],byte[]>(topic, partition, key2, value2), callback2);
注意:callback一般在生产者的I/O线程中执行,所以是相当的快的,否则将延迟其他线程的消息发送。如果你需要执行阻塞或计算昂贵(消耗)的回调,建议在callback主体中使用自己的Executor来并行处理。
消费者客户端API
Kafka消费者不是线程安全的。所有网络I/O都发生在进行调用应用程序的线程中。用户的责任是确保多线程访问正确同步。
public class KafkaConsumer<K,V> extends Object implements Consumer<K,V>
消费者TCP长连接到broker来拉取消息。故障导致的消费者关闭失败,将会泄露这些连接,消费者不是线程安全的
① 偏移量和消费者的位置
kafka为分区中的每条消息保存一个偏移量(offset),这个偏移量是该分区中一条消息的唯一标示符。也表示消费者在分区的位置:
消费者的位置给出了下一条记录的偏移量。它在每次消费者在调用poll(long)中接收消息时自动增长。
“已提交”的位置是已安全保存的最后偏移量,如果进程失败或重新启动时,消费者将恢复到这个偏移量。消费者可以选择定期自动提交偏移量,也可以选择通过调用commit API来手动的控制。
② 消费者组和主题订阅
Kafka的消费者组概念,通过进程池瓜分消费和处理消息。这些进程可以在同一台机器运行,也可分布到多台机器上,增加可扩展性和容错性,相同group.id的消费者将视为同一个消费者组。
分组中的每个消费者通过subscribe API动态的订阅一个topic列表。kafka将topic中的消息发送到每个消费者组中。并通过平衡分区在消费者分组中所有成员之间来达到平均。因此每个分区恰好地分配1个消费者(一个消费者组中)。所以如果一个topic有4个分区,并且一个消费者分组有2个消费者。那么每个消费者消费2个分区。
消费者组的成员是动态维护的:如果一个消费者故障。分配给它的分区将重新分配给同一个分组中其他的消费者。同样的,如果一个新的消费者加入到分组,将从现有消费者中移一个给它,这被称为重新平衡分组。 当新分区添加到订阅的topic时,或者当创建与订阅的正则表达式匹配的新topic时,也将重新平衡。将通过定时刷新自动发现新的分区,并将其分配给分组的成员。
当分组重新分配自动发生时,可以通过ConsumerRebalanceListener通知消费者,这允许他们完成必要的应用程序级逻辑,例如状态清除,手动偏移提交等。
它也允许消费者通过使用assign(Collection)手动分配指定分区,如果使用手动指定分配分区,那么动态分区分配和协调消费者组将失效。
③ 发现消费者故障
订阅一组topic后,当调用poll(long)时,消费者将自动加入到组中。只要持续的调用poll,消费者将一直保持可用,并继续从分配的分区中接收消息。此外,消费者向服务器定时发送心跳。 如果消费者崩溃或无法在session.timeout.ms配置的时间内发送心跳,则消费者将被视为死亡,其分区将被重新分配。
还有一种可能,消费可能遇到“活锁”的情况,它持续的发送心跳,但是没有处理。为了预防消费者在这种情况下一直持有分区,我们使用max.poll.interval.ms活跃检测机制。 在此基础上,如果你调用的poll的频率大于最大间隔,则客户端将主动地离开组,以便其他消费者接管该分区。 发生这种情况时,你会看到offset提交失败(调用commitSync()引发的CommitFailedException)。这是一种安全机制,保障只有活动成员能够提交offset。所以要留在组中,必须持续调用poll。
消费者提供两个配置设置来控制poll循环:
max.poll.interval.ms:增大poll的间隔,可以为消费者提供更多的时间去处理返回的消息(调用poll(long)返回的消息,通常返回的消息都是一批)。缺点是此值越大将会延迟组重新平衡。
max.poll.records:此设置限制每次调用poll返回的消息数,这样可以更容易的预测每次poll间隔要处理的最大值。通过调整此值,可以减少poll间隔
对于消息处理时间不可预测的情况,这些选项是不够的。 处理这种情况的推荐方法是将消息处理移到另一个线程中,让消费者继续调用poll。 但是必须注意确保已提交的offset不超过实际位置。另外,必须禁用自动提交,并只有在线程完成处理后才为记录手动提交偏移量(取决于你)
【示例】
这是个【自动提交偏移量】的简单的kafka消费者API。
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("foo", "bar"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records){
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());}
}
设置enable.auto.commit,偏移量由auto.commit.interval.ms控制自动提交的频率。
集群是通过配置bootstrap.servers指定一个或多个broker。不用指定全部的broker,它将自动发现集群中的其余的borker(最好指定多个,万一有服务器故障)。
broker通过心跳机制自动检测test组中失败的进程,消费者会自动ping集群,告诉进群它还活着。只要消费者能够做到这一点,它就被认为是活着的,并保留分配给它分区的权利,如果它停止心跳的时间超过session.timeout.ms,那么就会认为是故障的,它的分区将被分配到别的进程。
这个deserializer设置如何把byte转成object类型,例子中,通过指定string解析器,告诉获取到的消息的key和value只是简单个string类型。
【手动控制偏移量】
当消息认为已消费过了,这个时候再去提交它们的偏移量。这个很有用的,当消费的消息结合了一些处理逻辑,这个消息就不应该认为是已经消费的,直到它完成了整个处理。
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test");
props.put("enable.auto.commit", "false");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("foo", "bar"));
final int minBatchSize = 200;
List<ConsumerRecord<String, String>> buffer = new ArrayList<>();
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
buffer.add(record);
}
if (buffer.size() >= minBatchSize) {
insertIntoDb(buffer);
consumer.commitSync();
buffer.clear();
}
}
在这个例子中,我们将消费一批消息并将它们存储在内存中。当我们积累足够多的消息后,我们再将它们批量插入到数据库中。如果我们设置offset自动提交,消费将被认为是已消费的。这样会出现问题,我们的进程可能在批处理记录之后,但在它们被插入到数据库之前失败了。
为了避免这种情况,我们将在相应的记录插入数据库之后再手动提交偏移量。这样我们可以准确控制消息是成功消费的。提出一个相反的可能性:在插入数据库之后,但是在提交之前,这个过程可能会失败(即使这可能只是几毫秒,这是一种可能性)。在这种情况下,进程将获取到已提交的偏移量,并会重复插入的最后一批数据。这种方式就是所谓的“至少一次”保证,在故障情况下,可以重复。
使用手动偏移控制的优点是,可以直接控制记录何时被视为“已消耗”。
上面的例子使用commitSync表示所有收到的消息为”已提交”,在某些情况下,可能希望更精细的控制,通过指定一个明确消息的偏移量为“已提交”。在下面,我们处理完每个分区中的消息后,提交偏移量。
try {
while(running) {
ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE);
for (TopicPartition partition : records.partitions()) {
List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
for (ConsumerRecord<String, String> record : partitionRecords) {
System.out.println(record.offset() + ": " + record.value());
}
long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1)));}
}
} finally {
consumer.close();
}
注意:已提交的offset应始终是你的程序将读取的下一条消息的offset。因此,调用commitSync(offsets)时,你应该加1个到最后处理的消息的offset。
④ 订阅指定的分区
如果消费者进程本身具有高可用性,并且如果它失败,会自动重新启动(可能使用集群管理框架如YARN,Mesos,或者AWS设施,或作为一个流处理框架的一部分)。 在这种情况下,不需要Kafka检测故障,重新分配分区,因为消费者进程将在另一台机器上重新启动。
要使用此模式,只需调用assign(Collection)消费指定的分区即可:
String topic = "foo";
TopicPartition partition0 = new TopicPartition(topic, 0);
TopicPartition partition1 = new TopicPartition(topic, 1);
consumer.assign(Arrays.asList(partition0, partition1));
一旦手动分配分区,可以在循环中调用poll。消费者分组仍需要提交offset,只是现在分区的设置只能通过调用assign修改,因为手动分配不会进行分组协调,因此消费者故障不会引发分区重新平衡。每一个消费者是独立工作的(即使和其他的消费者共享GroupId)。为了避免offset提交冲突,通常需要确认每一个consumer实例的gorupId都是唯一的。
注意,手动分配分区(即,assgin)和动态分区分配的订阅topic模式(即,subcribe)不能混合使用。
⑤ offset存储在其他地方
消费者可以不使用kafka内置的offset仓库。可以选择自己来存储offset。要注意的是,将消费的offset和结果存储在同一系统中,用原子的方式存储结果和offset,但这不能保证原子,要想消费是完全原子的,并提供的“正好一次”的消费保证比kafka默认的“至少一次”的语义要更高。需要使用kafka的offset提交功能。
如果消费的结果存储在关系数据库中,存储在数据库的offset,让提交结果和offset在单个事务中。这样,事务成功,则offset存储和更新。如果offset没有存储,那么偏移量也不会被更新。
如果offset和消费结果存储在本地仓库。例如,可以通过订阅一个指定的分区并将offset和索引数据一起存储来构建一个搜索索引。
每个消息都有自己的offset,所以要管理自己的偏移,只需要做到以下几点:
配置 enable.auto.commit=false
使用提供的 ConsumerRecord 来保存你的位置。
在重启时用 seek(TopicPartition, long) 恢复消费者的位置。
当分区分配也是手动完成的(像上文搜索索引的情况),这种类型的使用是最简单的。 如果分区分配是自动完成的,需要特别小心处理分区分配变更的情况。可以通过调用subscribe(Collection,ConsumerRebalanceListener)和subscribe(Pattern,ConsumerRebalanceListener)中提供的ConsumerRebalanceListener实例来完成的。例如,当分区向消费者获取时,消费者将通过实现ConsumerRebalanceListener.onPartitionsRevoked(Collection)给这些分区提交它们的offset。当分区分配给消费者时,消费者通过ConsumerRebalanceListener.onPartitionsAssigned(Collection)为新的分区正确地将消费者初始化到该位置。
ConsumerRebalanceListener的另一个常见用法是清除应用已移动到其他位置的分区的缓存。
⑥ 控制消费的位置
大多数情况下,消费者只是简单的从头到尾的消费消息,周期性的提交位置(自动或手动)。kafka也支持消费者去手动的控制消费的位置,可以消费之前的消息也可以跳过最近的消息。
有几种情况,手动控制消费者的位置可能是有用的。
一种场景是对于时间敏感的消费者处理程序,对足够落后的消费者,直接跳过,从最近的消息开始消费。
另一个使用场景是本地状态存储系统。在这样的系统中,消费者将要在启动时初始化它的位置(无论本地存储是否包含)。同样,如果本地状态已被破坏(假设因为磁盘丢失),则可以通过重新消费所有数据并重新创建状态。
kafka使用seek(TopicPartition, long)指定新的消费位置。用于查找服务器保留的最早和最新的offset的特殊的方法也可用(seekToBeginning(Collection) 和 seekToEnd(Collection))。
⑦ 消费者流量控制
如果消费者分配了多个分区,并同时消费所有的分区,这些分区具有相同的优先级。在一些情况下,消费者需要首先消费一些指定的分区,当指定的分区有少量或者已经没有可消费的数据时,则开始消费其他分区。
例如流处理,当处理器从2个topic获取消息并把这两个topic的消息合并,当其中一个topic长时间落后另一个,则暂停消费,以便落后的赶上来。
kafka支持动态控制消费流量,分别在future的poll(long)中使用pause(Collection) 和 resume(Collection) 来暂停消费指定分配的分区,重新开始消费指定暂停的分区。
⑧ 多线程处理
Kafka消费者不是线程安全的。所有网络I/O都发生在进行调用应用程序的线程中。用户的责任是确保多线程访问正确同步。非同步访问将导致ConcurrentModificationException。
此规则唯一的例外是wakeup(),它可以安全地从外部线程来中断活动操作。在这种情况下,将从操作的线程阻塞并抛出一个WakeupException。这可用于从其他线程来关闭消费者。 以下代码段显示了典型模式:
public class KafkaConsumerRunner implements Runnable {
private final AtomicBoolean closed = new AtomicBoolean(false);
private final KafkaConsumer consumer;
public void run() {
try {
consumer.subscribe(Arrays.asList("topic"));
while (!closed.get()) {
ConsumerRecords records = consumer.poll(10000);
// Handle new records
}
} catch (WakeupException e) {
// Ignore exception if closing
if (!closed.get()) throw e;
} finally {
consumer.close();
}
}
// Shutdown hook which can be called from a separate thread
public void shutdown() {
closed.set(true);
consumer.wakeup();
}
}
在单独的线程中,可以通过设置关闭标志和唤醒消费者来关闭消费者。
closed.set(true);
consumer.wakeup();
我们没有多线程模型的例子。但留下几个操作可用来实现多线程处理消息。
每个线程一个消费者,这种方法的优点和缺点:
这是最容易实现的;因为它不需要在线程之间协调,所以通常它是最快的;它按顺序处理每个分区(每个线程只处理它接受的消息)。更多的消费者意味着更多的TCP连接到集群(每个线程一个)。一般kafka处理连接非常的快,所以这是一个小成本;更多的消费者意味着更多的请求被发送到服务器,但稍微较少的数据批次可能导致I/O吞吐量的一些下降;所有进程中的线程总数受到分区总数的限制。
解耦消费和处理
另一个替代方式是一个或多个消费者线程,它来消费所有数据,其消费所有数据并将ConsumerRecords实例切换到由实际处理记录的处理器线程池来消费的阻塞队列。这个选项同样有利弊:
可扩展消费者和处理进程的数量。这样单个消费者的数据可分给多个处理器线程来执行,避免对分区的任何限制;跨多个处理器的顺序保证需要特别注意,因为线程是独立的执行,后来的消息可能比早到的消息先处理,如果对排序没有问题,这就不是个问题;手动提交变得更困难,因为它需要协调所有的线程以确保对该分区的处理完成。
这种方法有多种玩法,例如,每个处理线程可以有自己的队列,消费者线程可以使用TopicPartition hash到这些队列中,以确保按顺序消费,并且提交也将简化。
Kafka Streams API
Kafka Streams从一个或多个输入topic进行连续的计算并输出到0或多个外部topic中。
可以通过TopologyBuilder类定义一个计算逻辑处理器DAG拓扑。或者也可以通过提供的高级别KStream DSL来定义转换的KStreamBuilder。
KafkaStreams类管理Kafka Streams实例的生命周期。一个stream实例可以在配置文件中为处理器指定一个或多个Thread。
KafkaStreams实例可以作为单个streams处理客户端(也可能是分布式的),与其他相同应用ID的实例进行协调(无论是否在同一个进程中,在同一台机器的其他进程中,或远程机器上)。这些实例将在输入topic分区的基础上来划分工作,以便所有的分区都被消费掉。如果实例添加或失败,所有实例将重新平衡它们之间的分区分配,以保证负载平衡。
在内部,KafkaStreams实例包含一个正常的KafkaProducer和KafkaConsumer实例,用于读取和写入,
一个简单的例子:
Map<String, Object> props = new HashMap<>();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "my-stream-processing-application");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
StreamsConfig config = new StreamsConfig(props);
KStreamBuilder builder = new KStreamBuilder();
builder.stream("my-input-topic").mapValues(value -> value.length().toString()).to("my-output-topic");
KafkaStreams streams = new KafkaStreams(builder, config);
streams.start();