当前位置: 首页 > 知识库问答 >
问题:

Kafka Connect 的行为是如何工作的?

巫马玉堂
2023-03-14

我正在为 Elasticsearch 编写一个 Kafka Sink 连接器。

我实现了启动,把,刷新,关闭方法在Sink任务类。

但是,我不知道Kafka Sink Connector的行为到底起什么作用。

如果Connect Worker重复执行所有这些任务,即通过< code>put()方法从Kafka代理获取SinkRecord,在内部对其进行处理,然后将数据发送到Elasticsearch,我想知道offset commit是何时在内部操作的,它是否与< code>flush()方法相关联。

还有,我想知道这种重复工作的顺序是不是固定的。例如,可以在put完成之前进行刷新或提交吗?

我正在开发一个连接,它从远程代理接收数据,并将数据放入另一个远程服务器的弹性搜索中。在这种情况下,我们正在测试如果运行连接的服务器的网络暂时断开会发生什么。我不明白Kafka Connect是如何工作的。

如果您知道在这种情况下信息可能会丢失或复制,请寻求解释。

非常感谢。

共有1个答案

穆正青
2023-03-14

当connect正在运行时,会暂时断开连接

从连接辅助角色使用的偏移量应更新__consumer_offsets内部 Kafka 主题。

只要您监视连接器的 /status 终结点以了解非故障状态,并且可以在正在使用的 Kafka 主题的保留期内重新启动它,就应该很少或没有数据丢失。

此外,正如注释kafka connect elasticsearch中所指出的,已经存在,您可以检查该代码的提交和刷新语义。如果你做叉,请写一份公关,以帮助社区;)

此外,Logstash 具有 Kafka 输入和 Elasticsearch 输出,因此如果您运行的是 ELK 堆栈,那么这可能是比编写自己的连接器更好的选择。

 类似资料:
  • 应用程序具有上下文路径-->/spring-form-simple-project 因此,为了访问,我使用: 这个控制器又返回student.jsp,当提交这个student.jsp时,它用-->@RequestMapping(value=“/AddStudent”,method=RequestMethod.post)调用controller 任何关于这通常如何工作的指示都将是有帮助的。 谢谢!

  • 本文向大家介绍hibernate 是如何工作的?相关面试题,主要包含被问及hibernate 是如何工作的?时的应答技巧和注意事项,需要的朋友参考一下 读取并解析配置文件。 读取并解析映射文件,创建 SessionFactory。 打开 Session。 创建事务。 进行持久化操作。 提交事务。 关闭 Session。 关闭 SessionFactory。

  • 我很想知道谷歌应用商店服务中的Activity认可是如何工作的? 我认为活动是通过加速计数据识别的。是这样吗?。请告诉我详细情况如何

  • 我对GridBagLayout这一主题不熟悉,我无法理解约束、重量和填充之间的确切区别。 我可以而不分配。 除非您指定了至少一个非零值,否则所有组件都会聚集在其容器的中心。这是因为当权重为0.0(默认值)时,GridBagLayout会在其单元格网格和容器边缘之间放置任何额外的空间。 我的问题是,如果这是真的,那么为什么组件之间没有空间,它们看起来是连接的?

  • 从@mock和@injectmocks之间的差异,我理解@injectmocks被用作创建实例的注释,并将用@mock创建的mock注入其中。我想我不明白它是怎么工作的。 以下是我的问题: 在中,当我调用时,它返回一个空集...我的问题是:为什么不抛出(只声明mockedappoinceptions)?也许因为这是一个嘲弄?如果原因是这样,为什么模拟不抛出“NullPointerException