我想告诉Kafka我的消费者何时成功处理了一条记录,因此我通过设置enable关闭了自动提交。汽车将
提交到false。我有两条关于我订阅的主题的消息,偏移量分别为0和1,并创建了一个消费者,因此每次调用poll
最多返回一条记录(通过将max.poll.records
设置为1)。
我现在调用consumer.poll(5000)
并收到第一条消息,但我不承认它;我不调用委员会同步或
委员会同步。如果我现在再次调用
consumer.poll(5000)
,使用相同的消费者,我希望得到我刚刚阅读的完全相同的消息,但是,相反,我收到了第二条消息。
如何获得
消费者。轮询
,在我明确确认之前继续发送相同的消息?
你所描述的是预期的行为。每次调用poll()
,它都会返回下一条消息。您提交的偏移量仅在连接新使用者时使用,以便它知道从何处(重新)开始。
在MessageHub中,我们设置了会话。超时
至30秒。因此,您需要稍微加快调用poll()
的速度,以避免断开连接。如果您的处理时间比这要长,那么我可以考虑两个选项:
>
使用Kafka 0.10.2并设置max.poll.interval.ms
来告诉您的Kafka客户端在处理前一条记录时保持会话活动(而不必调用投票()
)。(此功能是在0.10.1中添加的,但我们不支持该版本。0.10.2工作,因为它能够与0.10.0经纪人工作)
使用seek()返回到poll
之后的上一个偏移量,以便它继续返回相同的记录。
希望这能有所帮助!
我有一个骆驼路线如下。虽然我设置了处理(true),但我不明白为什么defaul在所有重试都耗尽后,defaul的处理程序会调用的。 日志: 20.04.03 11:46:53.907 INFO ad #6 - timer://testRoute route1 面包屑 Id=ID-xxxxxx-1585894556662-0-4 |世界您好 20.04.03 11:46:53.913错误广告#6-
我刚接触Kafka,正在为我的新应用程序尝试一些小用例。用例基本上是Kafka制作人- 当消费时(步骤2),下面是步骤的顺序...1.消费者。轮询(1.0)1. a.产生多个主题(多个水槽代理正在监听)1.b。产生。轮询()2。每25个msgs刷新()3。提交()每个msgs(asynchCommit=false) 问题1:这个动作顺序对吗!?! 问题2:这会导致数据丢失吗?因为刷新是每25毫秒一
我正在尝试使用PollRich获得JPA实体 但是在那之后,尽管表包含数百行,但我只得到一行。如何获取所有行?我想要像往常一样的polEnrich行为,它给我所有的表行。
我开发了一个Django文件上传API,它从客户端接收发布的数据并将数据保存为文件。 根据Django CSRF手册,HTTP请求头应使用CSRFToken cookie值设置X-CSRFToken。我已经用下面的代码设置了X-CSRFToken,但是Django服务器仍然禁止POST请求(403),如下图所示。 您是如何通过向Django服务器发送POST请求来克服Django CSRF的? 谢
在我的应用程序中,我想扫描GS1-128条形码,需要从ZXing条形码扫描仪传递FNC1字符。现在我只收到没有FNC1字符的纯文本。 是否有方法传递DecodeHintType。假设\u GS1通过Intent连接到扫描仪应用程序? 我不想在我的应用程序中包含完整的扫描仪源,而是使用意图。 在扫描仪的源代码中,我可以看到需要设置DecodeHintType才能实现:https://code.goo
我希望能够滚动通过整个页面,但没有滚动条显示。 在Google Chrome中是: 但Mozilla Firefox和Internet Explorer似乎不是这样工作的。 我也在CSS中尝试过这个: 这确实隐藏了滚动条,但我不能再滚动了。 有没有一种方法,我可以删除滚动条,同时仍然能够滚动整个页面? 请使用CSS或HTML。