我在一个线程中创建了一个Kafka consumer实例,作为构造函数的一部分,在thread inside run方法中,我确实调用了不同的web服务,为了保持调用的非阻塞性,我正在使用completable future。我的问题是,我无法通过调用thenApply方法并传递Kafka consumer实例来发出commit,因为这会给我一个错误,即Kafka consumer不是线程安全的。虽然在我的commit方法中,我已经编写了代码
synchronized(consumer) {
commitResponse();
}
我仍然得到了ConcurrentModificationException
。
class KafkaConsumerThread implements Runnable {
KafkaConsumer<String, String> consumer;
public KafkaConsumerThread(Properties properties) {
consumer = new KafkaConsumer<String, String>(properties);
...
}
@Override
public void run() {
try {
// synchronized (consumer) {
consumer.subscribe(topics);
while (true) {
if (closed.get()) {
consumer.close();
}
ConsumerRecords<String, String> records = consumer.poll(120000);
for (ConsumerRecord<String, String> record : records) {
getAsyncClient().prepareGet(webServiceUrl)
.execute()
.toCompletableFuture()
.thenApply(resp -> callAnotherService1(resp))
.thenApply(resp -> callAnotherService2(resp))
.thenApply(resp -> commitResponse(resp, consumer));
}
}
}
} catch (Exception ex) {
...
}
在上面的代码中,我在commitResponse方法中得到了一个异常,即“KafkaConsumer对于多线程访问不安全”。虽然在提交响应中,如果将提交包含在已同步(使用者)中,我仍然会得到错误。
最可能的原因是,poll
方法没有同步,并且是在异步执行提交时执行的(仍然保持内部Kafka锁)。
请参阅对私有方法的引用:org.apache.kafka.clients.consumer.KafkaConsumer.acquire()
和org.apache.kafka.clients.consumer.KafkaConsumer.release()
inorg.apache.kafka.clients.consumer.Kafka消费者
我正在使用Kafka 0.8 最近,我们开始喂食和消耗一个行为怪异的新主题,消耗的偏移量突然被重置,它尊重我们设置的auto.offset.reset策略(实际上是最小的)但我无法理解为什么该主题会突然重置其偏移量。 我正在使用高级消费者。 这是我发现的一些错误日志: 我们有一堆这样的错误日志: 每次出现此问题时,我都会看到警告日志: 然后真正的问题发生了: 现在的问题是:有人已经经历过这种行为吗
我设置了MirrorMaker2,用于在两个DC之间复制数据。 我的 mm2 属性, 看到下面的MM2创业。 我的数据正在按预期进行复制。源主题作为源在目标集群中创建..但是,消费者群体补偿并没有被复制。 已在源群集中启动使用者组。 消耗了少量消息并将其停止。在此主题中发布了新消息,镜像制造商也将数据镜像到目标集群。 我尝试使用来自目标集群的消息,如下所示。 由于我使用相同的使用者组,因此我希望我
当我运行这个命令时,我得到了两个主题。我知道我创建了测试主题,但我看到了另一个名为“消费者偏移”的主题。从名称来看,这意味着它与消费者补偿有关,但它是如何使用的?
我有一个微服务应用程序,它是使用camel kafka开发的,用于消费来自kafka的消息。我用4个并发实例运行这个服务,其中每个实例都有1个消费者,他们消费的主题有20个分区。 在维护期间(每天凌晨1-2点),应用程序将通过停止消费者来暂停消费。 当应用程序暂停消费时,kafka重新平衡发生,一些实例将在停止消费之前分配更多分区。 e、 由instanceA(partition1-5)使用的g分
我指的是Kafka源代码连接器的Flink 1.14版本,代码如下。 我期待以下要求。 在最新的应用程序开始时,必须阅读Kafka主题的最新偏移量 在检查点上,它必须将消耗的偏移量提交给Kafka 重新启动后(当应用程序手动终止/系统错误时),它必须从最后提交的偏移量中选取,并且必须消耗使用者延迟,然后再使用新的事件提要 有了Flink新的KafkaConsumer API(KafkaSource
我有一个Kafka消费者。好像工作了一段时间,然后就死了。它重复这样做。我得到了这个异常,但没有其他信息。 305000毫秒是5分钟。有什么线索可能导致这种情况吗?或者尝试找出答案的步骤? 如果相关: 我在不同的机器上有3个进程,使用最新的JavaKafka客户端版本0.10.2.0。每台机器运行20个线程,每个线程都有一个单独的消费者。根据设计,当一个线程死亡时,所有线程都被杀死,进程死亡,然后