当前位置: 首页 > 知识库问答 >
问题:

Kafka连接消息排序

喻珂
2023-03-14

Kafka 接收器连接器如何在从分区提取消息时确保消息排序。我有多个分区,并且在发布每个分区带有哈希键的消息时确保了消息排序。现在,当多个接收器任务(及其工作线程)从多个 JVM 扩展,负责从同一分区获取消息并通过 HTTP 通知目标系统时,我如何保证目标系统将按顺序接收消息。

共有1个答案

阎裕
2023-03-14

每个接收器任务都将从其分配的主题中接收可用的有序事件,但一旦它离开 Kafka 协议处理并发送到远程目标(无论是文件还是 HTTP 终结点),只能根据该系统的排序语义来保证顺序。

例如,如果您正在写入Elasticsearch,则可以通过指定索引所依据的时间戳字段来“排序”事件(在Kibana中)。类似于任何(无)SQL数据库

另一方面,文件系统会按修改时间对文件进行排序,但不能保证任何给定文件中的事件被排序(除非它们来自一个分区)。

我发现HTTP RESTendpoint不太可能理解需要收集哪些顺序事件,并且需要在该服务器endpoint内部确定该逻辑。一种选择是将事件发布到接受分区号和记录来源偏移量的endpoint

 类似资料:
  • 我遇到了两个关于订购的短语, 生产者发送到特定主题分区的消息将按发送顺序追加。也就是说,如果记录M1与记录M2由同一生产者发送,并且M1首先发送,则M1的偏移量将低于M2并出现在日志中的较早位置。 另一个 问题是,如果存在如#2所述的失败发送,那么该顺序是否仍会保留到特定分区?如果一条消息存在潜在问题,将删除每个分区的所有以下消息“以保留顺序”,或者将发送“正确”的消息,并将失败的消息通知应用程序

  • 我正在使用Kafka连接JDBC源连接器从数据库中的视图中读取并将其发布在kafka上,它工作正常。 我的用例是用户可以创建多个对象,并且对象的顺序在我的应用程序中很重要。我想使用用户 ID 作为我发布到主题中的所有消息的消息密钥,以保持它们的顺序。 我的问题是,如何在Kafka connect source连接器中定义消息键?

  • 我们的基础设施中有融合平台。核心是,我们使用kafka代理来分发事件。许多设备会生成Kafka主题的事件(每种类型的事件都有一个Kafka主题),事件在谷歌的protobuf中序列化。我们有confluent的模式注册表来跟踪protobuf模式。 我们需要的是,对于一些事件,我们需要应用一些转换,然后将转换输出发布到其他一些Kafka主题。当然,Kafka流是实现这一点的一种方法,如本例中所示。

  • 我们有一个Kafka主题,从事务性生产者那里读取消息。我们希望将kafka connect消费者设置为只读提交的消息。 由于我是Kafka的新手,我需要这里的专家帮助我设置它。根据我最初的研究,我明白我需要在我们的kafka属性中设置isolation.level=read_committed。 现在我有以下问题 这是我指的正确的财产吗 我们在/confluent/bin文件夹中放置了Kafka外

  • 我是Kafka的新手。我在网上读了很多关于Kafka制作人和Kafka消费者的说明。我成功地实现了前者,它可以向Kafka集群发送消息。然而,我没有完成后一个。请帮我解决这个问题。我看到我的问题像StackOverflow上的一些帖子,但我想更清楚地描述一下。我在虚拟盒子的Ubuntu服务器上运行Kafka和Zookeeper。使用1个Kafka集群和1个Zookeeper集群的最简单配置(几乎是

  • 我们计划使用JMS源连接器将数据传输到我们的Kafka集群中。来自ActiveMQ的数据是XML格式的。JMS源连接器使用内部messageID(message . getjmsmessageid())作为键。 充当连接器流式传输到的 Kafka 主题上的键的字段需要从 (XML) 有效负载中检索。 为此,需要在连接器中执行几个步骤。 要将XML转换为内部Kafka Connect Struct,