当前位置: 首页 > 知识库问答 >
问题:

如何让kafka连接从上次失败中获取消息

罗华翰
2023-03-14

我正在使用弹性搜索Kafka连接在独立模式下。我不困惑使用哪种配置来启动Kafka连接并从最后一个故障点开始。

例如,生产者将继续推动记录进入Kafka和消费者,因为弹性搜索接收器连接器正在消费,现在我的由于某种原因我的消费者下降了,但我的骄傲将继续推动信息进入Kafka。现在,当我修复了ES sink连接器端的问题后,如果我重新启动ES sink连接器,它应该从上次故障中选择,而不是从开始或最近的故障中选择,而是从上次故障中选择。如果失败后收到了10条消息,则当es接收器连接器启动时,它应该首先接收10条消息,然后是最新消息。

请帮助配置 。

共有1个答案

瞿和硕
2023-03-14

Kafka Connect充当任何其他Kafka消费者。当您重新启动时,使用者将回滚到上次提交的偏移量位置。这是默认行为,在Connect中没有更改它的设置。

 类似资料:
  • 我得到以下exeption连接到Mssql服务器。 我在属性中使用相同的配置连接到JDBC,但在尝试连接到R2DBC时出现问题。在Rest时发生,而不是在启动应用程序时发生。

  • 我正在运行一个简单的Kafka streams应用程序,它将使用Node JS记录的信息带到一个Kafka主题。 还需要注意的是,时间戳只是一个数字,表示自1970年6月以来的秒数。 我使用scala中的Kafka流来使用这些数据。 例如。 然而,我不确定如何将时间戳(我从nodeJS发送的)提取到这个流中。 例如,如果我尝试做这样的事情 这会导致错误“无法解析符号流”。我在想我该怎么解决这个问题

  • 我一直在使用ftp在android应用程序的服务器上上传图像,我正在使用以下代码连接ftp。它在Wi-fi中工作正常,但如果我切换到3G或2G连接,就会出现连接超时错误。那么,你能告诉我如何处理这种情况吗。我的客户在Veriozon也面临这个问题,Sprint,ATT网络提供商。它的iPhone版本在所有网络中都运行良好。 代码: 错误: JAVA网ConnectException:无法连接到主机

  • 我正在使用事务性KafkaProducer向主题发送消息。这个很管用。我使用的是具有read_committed隔离级别的KafkaConsumer,而我的seek和seekToEnd方法存在问题。根据文档,seek和seekToEnd方法给出了LSO(上次稳定偏移量)。但这有点让人摸不着头脑。因为它给我的价值总是一样的,主题结束了。无论最后一个条目是(由生产者提交的)还是中止的事务的一部分。例如

  • 我尝试向kafka发布/使用我的java对象。我使用Avro模式。 我的基本程序运行良好。在我的程序中,我在生产者(用于编码)和消费者(用于解码)中使用我的模式。 如果我在接收者处将不同的对象发布到不同的主题(例如:100个主题),我不知道我收到了什么类型的消息?...我想从接收到的字节中获取avro模式,并想将其用于解码...我的理解正确吗?如果是这样,我如何从接收到的对象中检索?