我有多个微服务和前面的API,喜欢使用相同的主题为事件每个域事件在单独的分区,我能够配置spring kafka绑定器发送到不同的分区使用
spring.cloud.stream.bindings.<channel>.producer.partition-key- extractor-name=
实现
我的问题是,我能否将Kstream绑定器配置为只能为@input和@output使用分区。
spring.cloud.stream.kafka.streams.bindings.<channel>.producer.configuration.partitioner.class=
您是否确定地将记录发送到某个分区?换句话说,您知道每个键的实际分区吗?如果只提供PartitionKeyExtractorStrategy
,那么绑定器将任意选择一个分区来发送该记录。如果希望使其具有确定性,那么可以在生产者端提供PartitionSelectorClass
作为属性(实现接口PartitionSelectorStrategy
)。该接口允许您根据键选择分区。假设您希望将所有带有键UUID-1的记录发送到分区1
,并且通过PartitionSelectorStrategy
实现对其进行编码。这意味着kafka streams处理器知道键为UUID-1的记录来自分区1
。有了这些假设,您可以在kafka streams处理器中执行以下操作。这基本上是为你的其他问题提供的这个答案的变体。
@StreamListener("requesti")
@SendTo("responseo")
public KStream<UUID,Account> process(KStream<UUID, Account> events) {
return events.transform(() -> new Transformer<UUID, Account, KeyValue<UUID, Account>>() {
ProcessorContext context;
@Override
public void init(ProcessorContext context) {
this.context = context;
}
@Override
public KeyValue<UUID,Account> transform(UUID key, Account value) {
if (this.context.partition() == 1) {
//your processing logic
return KeyValue.pair(key, value);
}
return null;
}
@Override
public void close() {
}
});
}
通过上面的代码,基本上可以过滤掉transform
方法中所有不相关的分区。仍然存在将出站数据发送到特定分区的问题。如果按照上面的代码执行,那么绑定器将把数据发送到任意分区(这可能是为绑定器添加的一个很好的特性)。但是,在本例中,如果希望出站记录登陆到确定性分区上,则可以直接使用Kafka流。见下文。
@StreamListener("requesti")
public void process(KStream<UUID, Account> events) {
final KStream<UUID, Account> transformed = events.transform(() -> new Transformer<UUID, Account, KeyValue<UUID, Account>>() {
ProcessorContext context;
@Override
public void init(ProcessorContext context) {
this.context = context;
}
@Override
public KeyValue<UUID, Account> transform(UUID key, Account value) {
if (this.context.partition() == 1) {
//your processing logic
return KeyValue.pair(key, value);
}
return null;
}
@Override
public void close() {
}
});
transformed.to("outputTopic", Produced.with(new JsonSerde<>(), new JsonSerde<>(), new CustomStreamPartitioner()));
}
class CustomStreamPartitioner implements StreamPartitioner<UUID, Account> {
@Override
public Integer partition(String topic, UUID key, Account value, int numPartitions) {
return 1; //change to the right partition based on the key.
}
}
我的spring boot项目有一个演示Kafka Streams API的应用程序。我可以使用以下命令使用主题中的所有消息 Kafka Streams API中使用KStream或ktable使用消息的类似命令是什么?我试过了 两者都不起作用。我确实创建了一个测试用例,用而不是流来使用,但它不起作用。代码上传到Github以供参考。任何帮助都会很好。
我们希望在Kafka streams应用程序中使用GlobalKTable。输入主题(ktable/kstream)有N个分区,并且GlobalKTable将用作流应用程序中的字典。 GlobalKTable的输入主题必须与其他输入主题(它们是KTable/KStream的源)具有相同数量的分区吗? 据我所知,答案是否定的(它不受限制,主题也可能有M个分区,其中N>M),因为GlobalKTabl
Kafka主题分区偏移位置始终从0或随机值开始,如何确保使用者记录是分区中的第一条记录?有没有办法找出答案?如果有的话,请让我知道。谢谢。
简单问题: 假设我有一个具有3个分区的主题:Topic:StateEvents P1、P2和P3。 让我们假设生产者生成20条消息: 1, 2, 3, ..........20 我的问题是: 当制作人生成这些消息时: 1)每个消息将只在且仅在1个分区?也就是说,1在P1,2在P2,3在P3,然后4在P1,5在P2,6在P3,以此类推? 2)如果#1为真,当消费者订阅时,它将订阅所有分区,以便获得所
Kafka只提供一个分区内消息的总顺序,而不提供主题中不同分区之间的消息的总顺序。每分区排序与按键对数据进行分区的能力相结合,对于大多数应用程序来说已经足够了。但是,如果您需要消息的总顺序,这可以通过只有一个分区的主题来实现,尽管这意味着每个使用者组只有一个使用者进程。 下面是我的问题: > 这是否意味着如果我希望有多个消费者(来自同一组)阅读一个主题,我需要有多个分区? 分区是如何编号的?从0开
kafka版本1.1 我们使用Kafka KStream根据事件本身中选定的键聚合事件。大致低于它的作用 在“聚合器”函数中,我在一定条件下返回空值,以产生墓碑事件。 kafka创建两个主题,重新分区和变更。在重新分区主题中保留设置为-1。不管墓碑事件如何,这些主题都在继续增长。我找不到清理它们的方法。 我们的要求是直截了当的: 只要满足密钥的条件,该密钥的聚合更改日志就没有用处。我们希望彻底永久