当前位置: 首页 > 知识库问答 >
问题:

如何制作可重启的生产者?

壤驷升
2023-03-14

最新版本的kafka支持精确一次语义(EoS)。为了支持这一概念,在每个消息中都添加了额外的详细信息。这意味着在你的消费者;如果打印消息的偏移量,它们不一定是连续的。这使得轮询一个主题以阅读最后提交的消息变得更加困难。

在我的例子中,consumer打印了如下所示的内容

Offset-0 0
Offset-2 1
Offset-4 2

问题:为了编写可重启的proudcer;我对话题进行了投票,并阅读了上一条消息的内容。在这种情况下;最后一条消息将是偏移量#5,这不是有效的消费者记录。因此,我在代码中看到了错误。

我可以使用下面提供的解决方案:获取发送到Kafka主题的最后一条消息。唯一的问题是不使用consumer.seek(分区,last_offset=1);我将使用consumer.seek(分区,last_offset-2)。这可以立即解决我的问题,但这不是一个理想的解决方案。

对于用Java编写的用户来说,获取最后提交的消息的最可靠和最好的解决方案是什么?或

是否可以对分区使用本地状态存储?或

共有1个答案

叶允晨
2023-03-14

在我的例子中,多个生产者将数据推送到一个大主题。因此,阅读整个主题将是一场噩梦。

<罢工> 我找到的解决办法是 维护另一个主题,即生产者可以存储元数据的“p1_track”。在 事务一个生产者将把数据发送到一个大主题和p1_track。

<罢工> 当我重新启动一个生产者,它将读取P1_Track并找出从哪里开始。

 类似资料:
  • 我使用的是汇流3.3.0。我的意图是使用kafka-connect将Kafka主题中的值插入Oracle表中。我的连接器与我使用avro console producer生成的avro记录工作良好,如下所示: 最后是序列化程序: 但我所理解的是,需要定义一些类似模式的东西,并使用一些avro序列化器来获得确切的数据,就像我使用avro console Consumer所做的那样。我读过一些示例代码

  • 问题内容: 在python中,我可以使用装饰器向类添加方法。是否有类似的装饰器将属性添加到类?我可以更好地表明我在说什么。 我上面使用的语法是否可能还是需要更多的语法? 我想要类属性的原因是可以延迟加载类属性,这似乎很合理。 问题答案: 这是我的处理方式: 在我们打电话时,设置员没有工作 ,因为我们正在打电话 ,而不是。 添加元类定义可以解决此问题: 现在一切都会好起来的。

  • 问题内容: 嗨,我是Java GUI的新手,正试图使启动屏幕或图像显示3秒钟。然后,它将进入我的主程序。是否有人有想法做到这一点,或者可以将我链接到任何教程? 到目前为止,我已经做到了这一点,但不确定从何而来。 问题答案: 最简单的方法是创建并添加您的内容,然后使用 试试这个代码: 或者, 您可以使用 SplashScreen 类创建一个启动画面 ****

  • 主要内容:1 创建DefaultMQProducer实例,2 start启动生产者,2.1 getOrCreateMQClientInstance获取或者创建MQClientInstance,2.2 registerProducer注册生产者,3 start启动MQClientInstance,3.1 mQClientAPIImpl#start启动netty客户端,3.2 startScheduledTask启动各种定时任务,基于RocketMQ 4.9.3,详细介绍了RocketMQ的客户端P

  • 我们在Spark 2.1中使用Kafka0.10,我发现我们的制作人发布消息总是很慢。在给Spark executors提供8个内核后,我只能达到1k/s左右,而另一篇帖子则说它们很容易达到百万/秒。我试着调一下玲珑的曲调。ms和batch。大小来找出答案。然而我发现了玲儿。ms=0对我和这批人来说似乎是最佳选择。大小没有多大影响。我每次迭代发送160k个事件。看来我得让Kafka制作人知道到底发

  • 问题内容: 我正在尝试设置python 库,以便将包含其他字典作为元素的字典保存到文件中。浮点数很多,我想将位数限制为例如。 根据其他帖子,应使用SO 。但是,它不起作用。 例如,下面的代码在Python3.7.1中运行,将打印所有数字: 我该如何解决? 可能无关紧要,但我在macOS上。 编辑 该问题被标记为重复。但是,在原始帖子的已接受答案(到目前为止是唯一的答案)中明确指出: 注意:此解决方