我有几个Samza工作运行所有阅读Kafka主题的消息,并为新主题编写新消息。为了发送新消息,我使用Samza内置的OutgoingMessageEnvelope。并使用MessageCollector发送新消息。它看起来是这样的:
collector.send(new OutgoingMessageEnvelope(SystemStream, newMessage))
您应该能够使用分区键发送消息,
public OutgoingMessageEnvelope(SystemStream systemStream,
java.lang.Object partitionKey,
java.lang.Object key,
java.lang.Object message)
Constructs a new OutgoingMessageEnvelope from specified components.
Parameters:
systemStream - Object representing the appropriate stream of which this envelope will be sent on.
partitionKey - A key representing which partition of the systemStream to send this envelope on.
key - A deserialized key to be used for the message.
message - A deserialized message to be sent in this envelope.
使用此方法将对数据进行分区。但是,我认为如果您正在考虑以编程方式控制分区的数量,那么您应该使用kafka API来创建/更改主题,如本文所述
null camel-kafka中是否有任何配置,我们可以使用它来增加kafka主题分区计数?
uuusing Spring Kafka org . Spring framework . Kafka . listener . concurrentmessagelistenercontainer根据ContainerProperties和主题中的分区数量创建多个侦听器。javadoc说“来自同一个分区的消息将被顺序处理”。因此,如果只有1个分区,并发性设置为10,会发生什么——不会有任何并发性
我正在使用jhipster kafka实现,它使用confluentinc/cp-kafka: 5.4.0映像,当我尝试使用环境变量“KAFKA_CREATE_TOPICS:”创建Kafka主题时,我没有收到任何错误,但主题没有被创建,我在想三种不同的方法来解决这个问题, 通过 docker 撰写文件上的命令创建主题。 将Kafka和动物园管理员的形象更改为另一个像沃尔斯特迈斯特形象的形象。 在
我想为几个主题创建一个kafka消费者。consumer的方法构造函数允许我在订阅中传输主题列表的参数,如下所示: 之后,我想轮询记录从Kafka流每3秒并处理它们,但我想知道什么是这个消费者-如何将不同主题的记录轮询-首先一个主题,然后另一个,或并行。会不会一个消息量大的主题会一直处理,另一个消息量小的主题会等待?