当前位置: 首页 > 知识库问答 >
问题:

使用Kafka Connect从IBMMQ读取时保留消息顺序

陶柏
2023-03-14

当使用Kafka Connect IBM MQ Source Connector使用5个任务的并行级别从IMB MQ读取时,是否可以保留消息顺序(将具有相同键的消息分配给相同的分区)?

共有1个答案

郭瀚海
2023-03-14

默认情况下,使用源连接器复制的邮件将具有< code>null键。您可以使用Kafka SMTs为您的信息创建密钥。在连接器的配置中,下面几行应该可以完成这个任务(确保将< code>messageId替换为您希望用作键的字段):

transforms=createKey
transforms.createKey.type=org.apache.kafka.connect.transforms.ValueToKey
transforms.createKey.fields=messageId
 类似资料:
  • 假设我有一个包含3个应用程序的流——一个源、处理器和接收器。 我需要保留从源收到的消息的顺序。当我收到消息A,B,C,D,我必须将它们作为A,B,C,D.发送到接收器(我不能将它们作为B,A,C,D)发送。 如果每个应用程序只有一个实例,那么一切都将按顺序运行,并且顺序将被保留。 如果我每个应用程序有 10 个实例,则消息 A、B、C、D 可能会在不同的实例中同时处理。我不知道这些消息的顺序是什么

  • 我想使用rabbitMq队列中Storm喷口中的消息。 现在,我们使用Spring AMQP异步发送和接收来自RabbitMq的消息。 Spring AMQP提供了从队列读取消息的机制(创建监听器或使用注释@RabbitListner)。 问题是我可以让一个侦听器从队列中读取消息。但是,我如何将此消息发送到Storm群上运行的Storm喷口? 拓扑将启动一个集群,但在我的spout的nextTup

  • 我有一个Spring Boot应用程序,它接收JSON请求并将其推送到IBM MQ JMS队列中。可能有n个JSON请求将被推送到队列。我的目标是处理队列中的每个请求。如何侦听队列并使用spring boot逐个处理消息?

  • 我有一个java客户机,它在队列中发送jms消息(“队列请求”)。该消息包含一个int属性(“id”),其中包含唯一的客户端id号。消息正在处理中,然后进入另一个队列(“队列响应”)。如何让客户机等到具有其id的消息在队列中,然后读取它。我曾尝试使用侦听器并实现onMessage,但当收到消息时,我如何停止侦听?

  • 我试图从__consumer_offsets主题中使用,因为这似乎是检索关于消费者的kafka度量(如消息滞后等)的最简单的方法。理想的方法是从jmx访问它,但希望先尝试一下,返回的消息似乎是加密的或不可读的。尝试添加stringDeserializer属性。有没有人对如何纠正这一点有什么建议?这里的提法也是重复的 重复的consumer_offset 没有帮助,因为它没有引用我的问题,即在Jav

  • 我刚开始学Kafka,Kafka-蟒蛇。在下面的代码中,我试图在消息到达时读取它们。但出于某种原因,消费者似乎要等到一定数量的消息积累后才能获取它们。 我最初以为是因为正在批量出版的制片人。当我运行“kafka-console-consumer--bootstrap-servers--topic”时,我可以看到发布后收到的每一条消息(就像在consumer控制台上看到的那样) 有人能指出用KafK