当前位置: 首页 > 知识库问答 >
问题:

Kafka 连接转换排序保证

薛博艺
2023-03-14

我们计划使用JMS源连接器将数据传输到我们的Kafka集群中。来自ActiveMQ的数据是XML格式的。JMS源连接器使用内部messageID(message . getjmsmessageid())作为键。

充当连接器流式传输到的 Kafka 主题上的键的字段需要从 (XML) 有效负载中检索。

为此,需要在连接器中执行几个步骤。

  • 要将XML转换为内部Kafka Connect Struct,我们使用自定义转换插件(https://github.com/jcustenborder/kafka-connect-transform-xml)
  • 然后,ValueToKey 和 ExtractField 转换器设置作为有效负载一部分的密钥。
  • 现在,这个键值对已准备好发送到我们的 Kafka 主题。

我们正在处理金融交易,需要保证消息的顺序。我们有很高的吞吐量,据我所知,配置tasks.max通过在Kafka Connect Workers之间分配任务来实现并行。

第一个问题:并行性如何与单一消息转换器结合使用?“(源)连接器-变压器-转换器”是否通过设置tasks.max形成一个分布在一起的管道,或者tasks.max设置是否只适用于连接器?

后者似乎有点奇怪,所以假设前者是正确的,我还有另一个疑问。

我们的 Kafka 主题键 - 将保证 Kafka 主题的顺序 - 在连接器的任务中确定。选择任务.max

在多个任务之间分布,两条(或更多)消息(在有效负载中包含相同的密钥)以一定的顺序从ActiveMQ到达,并且可以发送到不同的Kafka Connect任务。

理论上,当最终流到Kafka主题时,顺序可以颠倒(在同一分区上,因为它们现在具有相同的密钥)。

我这样推理是对的吗?有没有办法绕过这个?或者排序保证只能在这个用例中使用一个任务。

共有2个答案

翟宾实
2023-03-14

并行性如何与单一消息转换器结合使用?

您的第一个答案是正确的-转换器按照连接器配置中定义的顺序执行每个正在运行的任务。任务生成的每个SourceRecord都由同一任务中的所有转换器处理,然后发送给Kafka。

我这样推理是对的吗?有没有办法规避这个问题?或者,只有在使用一个任务的情况下,才能保证订购。

保证消息排序的最简单的方法是拥有一个单一的任务,但是这显然是不可伸缩的。有几种方法可以解决这个问题。

  1. 对任务读取的消息进行分区,以便每个任务始终使用相同的密钥读取所有消息。一些消息队列服务器内置了对此的支持。例如,ActiveMQ支持选择器。在这种情况下,您可以让每个任务读取以下消息:

<代码>

这实现起来并不简单(例如,您需要在运行时处理任务数量的变化),但却是可行的。

根据ID将原始消息划分为不同的ActiveMQ队列。因此,现在您可以为一个ActiveMQ队列获得一个Connect任务。

使用Kafka流对消息进行排序。基本上,您将从ActiveMQ读取消息,并使用Kafka Connect将它们写入主题“未排序的消息”。一个单独的Kafka Streams应用程序将从“未排序的消息”主题中读取数据,并将排序后的数据写入“已排序的消息”主题。这在这里讨论:Apache Kafka根据消息的值对消息进行排序

魏朗
2023-03-14

tasks.max设置是否只适用于连接器?

这个嘛

我们的Kafka主题密钥-这将保证Kafka主题的顺序

不,不是的。它只保证分区,仅此而已

仅在使用一个任务的用例中,排序保证才是可能的吗?

这取决于来源。我不知道 AMQ,但如果读取消息将其从队列中删除,则多个任务不可能收到该消息

 类似资料:
  • Kafka 接收器连接器如何在从分区提取消息时确保消息排序。我有多个分区,并且在发布每个分区带有哈希键的消息时确保了消息排序。现在,当多个接收器任务(及其工作线程)从多个 JVM 扩展,负责从同一分区获取消息并通过 HTTP 通知目标系统时,我如何保证目标系统将按顺序接收消息。

  • 我遇到了两个关于订购的短语, 生产者发送到特定主题分区的消息将按发送顺序追加。也就是说,如果记录M1与记录M2由同一生产者发送,并且M1首先发送,则M1的偏移量将低于M2并出现在日志中的较早位置。 另一个 问题是,如果存在如#2所述的失败发送,那么该顺序是否仍会保留到特定分区?如果一条消息存在潜在问题,将删除每个分区的所有以下消息“以保留顺序”,或者将发送“正确”的消息,并将失败的消息通知应用程序

  • 我们的基础设施中有融合平台。核心是,我们使用kafka代理来分发事件。许多设备会生成Kafka主题的事件(每种类型的事件都有一个Kafka主题),事件在谷歌的protobuf中序列化。我们有confluent的模式注册表来跟踪protobuf模式。 我们需要的是,对于一些事件,我们需要应用一些转换,然后将转换输出发布到其他一些Kafka主题。当然,Kafka流是实现这一点的一种方法,如本例中所示。

  • 我试图构建一个系统,从Kafka读取json数据(无模式),将其转换为avro并将其推送到s3。 我已经能够使用kstream和KSQL实现json到avro的转换。我想知道使用Kafka Connect的自定义转换是否可以实现同样的效果。 这是我迄今为止所尝试的: 其中avro_schema是avsc文件中指定的架构名称。 我不确定这是否是正确的方法,但我面临的问题是,当调用newRecord(

  • 用例是将整个消息(JSON)和键作为记录存储在表中,表中有两列“id”和“data”。 数据库是Postgres,它支持JSON的列类型。 根据本文,JSONCon的支持类型是字符串、int64等https://cwiki.apache.org/confluence/display/KAFKA/KIP-301:JsonConver的模式推理 是否可以将数据字段的类型设置为JSON,然后将其存储在P

  • 我有一个kafka connect插件,部署在kafka集群中(在独立模式下,仅用于测试,目的是分布式完成)。这个Kafka连接插件使用curator连接到集群的zookeper,并从中提取一些信息,以决定如何处理这些消息。 代码如下: 在treeCache启动时超时,配置根路径存在于本地zookeeper中(已确认在zookeeper外壳中执行ls,对于我尝试使用的zkConnection字符串