生产者发送消息到一个有四个分区的主题。我们有一个消费者在消费来自这个主题的消息。应用程序在工作日一直运行周末例外:它不会在周末期间调用poll方法。
使用者配置:自动提交,自动提交时间为5s(默认)。
应用程序一直运行良好,直到一个星期天,当它重新开始调用poll方法。我们看到有数百万条消息从这个话题中被轮询出来。消费者基本上是轮询来自主题的所有消息。将新的偏移量与它在周末停止之前的偏移量进行比较时。新的偏移量要小得多,这就像重置为所有四个分区的非常低的数字。
我们不知道在消费者端发生了什么,因为它没有调用poll方法,所以没有打印日志消息。我们查了kafka的服务器日志,但什么也没发现。
<spring.kafka.version>1.1.2.RELEASE</spring.kafka.version>
...
<bean id="defaultKafkaConsumer"
class="org.apache.kafka.clients.consumer.KafkaConsumer">
<constructor-arg>
<map>
<entry key="bootstrap.servers" value="${kafka.bootstrap.servers}"></entry>
<entry key="max.block.ms" value="5000"></entry>
<entry key="group.id" value="kafkaconnect.tca"></entry>
<entry key="auto.offset.reset" value="earliest"></entry>
<entry key="key.deserializer" value="org.apache.kafka.common.serialization.StringDeserializer"></entry>
<entry key="value.deserializer" value="org.apache.kafka.common.serialization.StringDeserializer"></entry>
</map>
</constructor-arg>
</bean>
getKafkaConsumer().subscribe(Arrays.asList(getKafkaTopic()));
// set up the polling task
handler = timer.scheduleAtFixedRate(new Runnable() {
public void run() {
try {
processPoll();
} catch (Throwable t) {
LOG.error(String.format("error processing poll for inet: %s, details: %s - %s", getId(), t.getMessage(), t.getCause()), t);
}
}
}, 3, 3, TimeUnit.MILLISECONDS);
processPoll() Method: destination will not be ready during the weekend.
try {
if (!isDestinationReady()) {
if (destinationIgnoreCnt++ ==0) {
LOG.warn(String.format("outbound destination session is not ready - trying: %s/%s",destinationIgnoreCnt,destinationwaitingloop));
} else if ((destinationIgnoreCnt++ % destinationwaitingloop) == 0) {
LOG.warn(String.format("outbound destination session is not ready - trying %s/%s", destinationIgnoreCnt,destinationwaitingloop));
destinationIgnoreCnt = 1;
}
messageIgnoreCnt = 0;
return;
}
if(!isDestinationOpen()) {
if (destinationIgnoreCnt++ ==0) {
LOG.error(String.format("outbound destination is not opended - trying:%s/%s.", destinationIgnoreCnt,destinationwaitingloop) );
} else if ((destinationIgnoreCnt++ % destinationwaitingloop) == 0) {
LOG.error(String.format("outbound destination is not opended - trying %s/%s.", destinationIgnoreCnt,destinationwaitingloop));
destinationIgnoreCnt = 1;
}
messageIgnoreCnt = 0;
return;
}
if (messageIgnoreCnt++ == 0) {
LOG.info(String.format("kafka poller started. poll interval %s wait: %s", pollingInterval, 60000));
} else if ((messageIgnoreCnt++ % 30) == 0) {// approximately 30mins
LOG.info(String.format("kafka poller started. poll interval %s wait %s", pollingInterval, 60000));
messageIgnoreCnt = 1;
}
if (getKafkaConsumer() == null) {
LOG.critical("KafkaListener consumer is null");
return;
}
ConsumerRecords<String, String> records = getKafkaConsumer().poll(60000);
if (records == null || records.isEmpty()) {
LOG.debug("zero records received from Kafka");
return;
}
for (ConsumerRecord<String, String> record : records) {
LOG.info(String.format("consuming from topic = %s ", record.toString()));
try {
String jsonMsg = record.value();
DirectBatchRequest payload = JacksonUtilities.getObjectMapper().readValue(jsonMsg, DirectBatchRequest.class);
if (payload != null) {
LOG.info(String.format("Got it reportId:%s", payload.getDestinationId()));
if(payload.getDestinationId() == 0) {
LOG.info(String.format("Applying default destination desk:%s", defaultDeskId));
payload.setDestinationId(defaultDeskId);
}
List<RequestEntryType> requestEntryTypeList = ((StreamDirectRequest) payload).getRequestList();
LOG.info(String.format("Processing size: %s" , requestEntryTypeList.size()) );
processRequest((StreamDirectRequest) payload); //async call
LOG.info(String.format("Processing size: %s sent to Steam" , requestEntryTypeList.size()) );
}
} catch (Throwable t) {
LOG.error(String.format("KafkaListener JSON%s conversion error %s", record, t.getMessage()));
}
}
} catch (Throwable t) {
LOG.error(String.format("KafkaListener exception %s", t.getMessage()));
}
默认情况下,如果消费者组中没有活动,Kafka会删除offsets.retention.minutes
之后的偏移量。默认保留时间为1440分钟(1天)。
在您的情况下,由于消费者组周末Rest,因此会重置offset。
参见https://kafka.apache.org/documentation/#brokerconfigs
D: \软件\Kafka\Kafka2.10-0.10.0.1\bin\windows 我使用上面的命令来消费消息,有什么我错过的吗?帮助我: 这个 那些是生产者和消费者......
然而,当在我的环境中测试此示例时,我得到了一个异常。
是否有一种方法以编程方式访问和打印使用者滞后偏移,或者说使用者读取的最后一条记录的偏移与某个生产者写入该使用者分区的最后一条记录的偏移之间的位置差。 要知道我的最终目标是将这个值发送到prometheus进行监视,我应该在上面添加哪些语句来得到滞后偏移值?
我有一个主题列表(目前是10个),其大小可以在未来增加。我知道我们可以产生多个线程(每个主题)来消耗每个主题,但在我的例子中,如果主题的数量增加,那么消耗主题的线程数量也会增加,这是我不希望的,因为主题不会太频繁地获取数据,所以线程将是理想的。 有没有办法让单个消费者从所有话题中消费?如果是的话,我们怎样才能做到呢?另外,Kafka将如何维护抵消?请建议答案。
我们有一个服务器,负责处理消息的生成和消费。我们有4台笔记本电脑,所有带有confluent的Mac都运行相同的命令行。。。 /kafka avro控制台使用者--从一开始--引导服务器0.0.0.0:9092,0.0.0.0:9092--主题主题名称--属性schema.registry.url=http://0.0.0.0:8081 4台笔记本电脑中有3台没有问题使用这些消息,但是第四台不会。
我有一个简单的Kafka设置。生成器正在以较高的速率向单个分区生成具有单个主题的消息。单个使用者正在使用来自此分区的消息。在此过程中,使用者可能会多次暂停处理消息。停顿可以持续几分钟。生产者停止产生消息后,所有排队的消息都将由使用者处理。生产者产生的消息似乎不会立即被消费者看到。我使用的是Kafka0.10.1.0。这里会发生什么?下面是使用消息的代码部分: 代理上的所有配置都保留为kafka默认