我们计划使用JMS源连接器将数据传输到我们的Kafka集群中。来自ActiveMQ的数据是XML格式的。JMS源连接器使用内部messageID(message . getjmsmessageid())作为键。
充当连接器流式传输到的 Kafka 主题上的键的字段需要从 (XML) 有效负载中检索。
为此,需要在连接器中执行几个步骤。
我们正在处理金融交易,需要保证消息的顺序。我们有很高的吞吐量,据我所知,配置tasks.max通过在Kafka Connect Workers之间分配任务来实现并行。
第一个问题:并行性如何与单一消息转换器结合使用?“(源)连接器-变压器-转换器”是否通过设置tasks.max形成一个分布在一起的管道,或者tasks.max设置是否只适用于连接器?
后者似乎有点奇怪,所以假设前者是正确的,我还有另一个疑问。
我们的 Kafka 主题键 - 将保证 Kafka 主题的顺序 - 在连接器的任务中确定。选择任务.max
在多个任务之间分布,两条(或更多)消息(在有效负载中包含相同的密钥)以一定的顺序从ActiveMQ到达,并且可以发送到不同的Kafka Connect任务。
理论上,当最终流到Kafka主题时,顺序可以颠倒(在同一分区上,因为它们现在具有相同的密钥)。
我这样推理是对的吗?有没有办法绕过这个?或者排序保证只能在这个用例中使用一个任务。
并行性如何与单一消息转换器结合使用?
您的第一个答案是正确的-转换器按照连接器配置中定义的顺序执行每个正在运行的任务。任务生成的每个SourceRecord都由同一任务中的所有转换器处理,然后发送给Kafka。
我这样推理是对的吗?有没有办法规避这个问题?或者,只有在使用一个任务的情况下,才能保证订购。
保证消息排序的最简单的方法是拥有一个单一的任务,但是这显然是不可伸缩的。有几种方法可以解决这个问题。
<代码>
这实现起来并不简单(例如,您需要在运行时处理任务数量的变化),但却是可行的。
根据ID将原始消息划分为不同的ActiveMQ队列。因此,现在您可以为一个ActiveMQ队列获得一个Connect任务。
使用Kafka流对消息进行排序。基本上,您将从ActiveMQ读取消息,并使用Kafka Connect将它们写入主题“未排序的消息”。一个单独的Kafka Streams应用程序将从“未排序的消息”主题中读取数据,并将排序后的数据写入“已排序的消息”主题。这在这里讨论:Apache Kafka根据消息的值对消息进行排序
tasks.max设置是否只适用于连接器?
这个嘛
我们的Kafka主题密钥-这将保证Kafka主题的顺序
不,不是的。它只保证分区,仅此而已
仅在使用一个任务的用例中,排序保证才是可能的吗?
这取决于来源。我不知道 AMQ,但如果读取消息将其从队列中删除,则多个任务不可能收到该消息
Kafka 接收器连接器如何在从分区提取消息时确保消息排序。我有多个分区,并且在发布每个分区带有哈希键的消息时确保了消息排序。现在,当多个接收器任务(及其工作线程)从多个 JVM 扩展,负责从同一分区获取消息并通过 HTTP 通知目标系统时,我如何保证目标系统将按顺序接收消息。
我遇到了两个关于订购的短语, 生产者发送到特定主题分区的消息将按发送顺序追加。也就是说,如果记录M1与记录M2由同一生产者发送,并且M1首先发送,则M1的偏移量将低于M2并出现在日志中的较早位置。 另一个 问题是,如果存在如#2所述的失败发送,那么该顺序是否仍会保留到特定分区?如果一条消息存在潜在问题,将删除每个分区的所有以下消息“以保留顺序”,或者将发送“正确”的消息,并将失败的消息通知应用程序
我们的基础设施中有融合平台。核心是,我们使用kafka代理来分发事件。许多设备会生成Kafka主题的事件(每种类型的事件都有一个Kafka主题),事件在谷歌的protobuf中序列化。我们有confluent的模式注册表来跟踪protobuf模式。 我们需要的是,对于一些事件,我们需要应用一些转换,然后将转换输出发布到其他一些Kafka主题。当然,Kafka流是实现这一点的一种方法,如本例中所示。
我试图构建一个系统,从Kafka读取json数据(无模式),将其转换为avro并将其推送到s3。 我已经能够使用kstream和KSQL实现json到avro的转换。我想知道使用Kafka Connect的自定义转换是否可以实现同样的效果。 这是我迄今为止所尝试的: 其中avro_schema是avsc文件中指定的架构名称。 我不确定这是否是正确的方法,但我面临的问题是,当调用newRecord(
用例是将整个消息(JSON)和键作为记录存储在表中,表中有两列“id”和“data”。 数据库是Postgres,它支持JSON的列类型。 根据本文,JSONCon的支持类型是字符串、int64等https://cwiki.apache.org/confluence/display/KAFKA/KIP-301:JsonConver的模式推理 是否可以将数据字段的类型设置为JSON,然后将其存储在P
我有一个kafka connect插件,部署在kafka集群中(在独立模式下,仅用于测试,目的是分布式完成)。这个Kafka连接插件使用curator连接到集群的zookeper,并从中提取一些信息,以决定如何处理这些消息。 代码如下: 在treeCache启动时超时,配置根路径存在于本地zookeeper中(已确认在zookeeper外壳中执行ls,对于我尝试使用的zkConnection字符串