当前位置: 首页 > 知识库问答 >
问题:

在本地执行模式下停止/启动Kafka消费者/生产者流

祁正阳
2023-03-14

设置:

  • Java 8

创建了一个简单的示例程序,以使用来自一个Kafka主题的1M消息并生成到另一个主题-以本地执行模式运行。这两个主题都有32个分区。

当我让它从头到尾运行时,它会消耗并生成所有消息。如果在完成之前启动然后停止(SIGINT),然后再次重新启动,则生产者仅接收原始1M消息的子集。

我已经确认了我对消费者的补偿,它阅读了所有1M消息。

final StreamExecutionEnvironment env = 
    StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(32);
env.enableCheckpointing(1000L, CheckpointingMode.EXACTLY_ONCE);

--

producer.setFlushOnCheckpoint(true);
producer.setLogFailuresOnly(false);

在本地执行模式下,这是预期的吗?是否需要启用保存点来停止和重新启动流作业?出现这种情况时,生产者似乎没有提交所有消息。

提前感谢!

共有1个答案

鄢博简
2023-03-14

首先,在随后的运行中,它只接收消息的子集,因为Flink Kafka消费者使用Kafka中提交的偏移量作为起始位置。目前,在版本中避免这种情况的唯一方法(截至目前为止为1.2.0)是始终分配一个新的group.id。在下一个版本中,将为此提供新选项:https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/connectors/kafka.html#kafka-consumers-start-position-configuration.

另请注意,Kafka中提交的偏移量根本不用于Flink中的精确一次处理保证。Flink仅依赖于检查点偏移量。有关此的更多详细信息,请参阅上面链接中的Flink Kafka连接器文档。

 类似资料:
  • 一、线程间通信的两种方式 1.wait()/notify() Object类中相关的方法有notify方法和wait方法。因为wait和notify方法定义在Object类中,因此会被所有的类所继承。这些方法都是final的,即它们都是不能被重写的,不能通过子类覆写去改变它们的行为。 ①wait()方法: 让当前线程进入等待,并释放锁。 ②wait(long)方法: 让当前线程进入等待,并释放锁,

  • 我有一个简单的Kafka设置。生成器正在以较高的速率向单个分区生成具有单个主题的消息。单个使用者正在使用来自此分区的消息。在此过程中,使用者可能会多次暂停处理消息。停顿可以持续几分钟。生产者停止产生消息后,所有排队的消息都将由使用者处理。生产者产生的消息似乎不会立即被消费者看到。我使用的是Kafka0.10.1.0。这里会发生什么?下面是使用消息的代码部分: 代理上的所有配置都保留为kafka默认

  • 我们有一个制作人 在开发过程中,我重新部署了producer应用程序,并做了一些更改。但在此之后,我的消费者没有收到任何消息。我尝试重新启动消费者,但没有成功。问题可能是什么和/或如何解决? 消费者配置: 生产者配置: 编辑2: 5分钟后,消费者应用程序死亡,但以下情况除外:

  • 生产者线程与消费者线程使用信号量同步 生产者线程与消费者线程使用信号量同步 源码/* * Copyright (c) 2006-2018, RT-Thread Development Team * * SPDX-License-Identifier: Apache-2.0 * * Change Logs: * Date Author Notes * 2018-08-24 yangjie the f

  • 我有一个生产者/消费者模式,如下所示 固定数量的生成器线程,每个线程写入它们自己的BlockingQueue,通过执行器调用 单个使用者线程,读取生产者线程 每个生产者都在运行一个数据库查询,并将结果写入其队列。消费者轮询所有生产者队列。目前,如果出现数据库错误,生产者线程就会死掉,然后消费者就会永远停留在产品队列中等待更多的结果。 我应该如何构造它来正确处理catch错误?

  • 我目前正在开发Kafka模块,我正在使用Kafka通信的抽象。我能够集成生产者 Spring Boot测试类 监听器类 我的问题是:在测试类中,我断言分区、有效负载等是从BlockingQueue轮询的,然而,我的问题是如何验证用KafkaListener注释的类中的业务逻辑是否得到正确执行,并根据错误处理和其他业务场景将消息路由到不同的主题。在一些示例中,我看到了CountDownLatch的断