我有这样的拓扑:
Topology topology = new Topology();
//WS connection processor
topology.addSource(WS_CONNECTION_SOURCE, new StringDeserializer(), new WebSocketConnectionEventDeserializer(), KafkaTopics.WS_CONNECTION_EVENTS_TOPIC)
.addProcessor(SESSION_PROCESSOR, WSUserSessionProcessor::new, WS_CONNECTION_SOURCE)
.addStateStore(sessionStoreBuilder, SESSION_PROCESSOR)
.addSink(WS_STATUS_SINK, KafkaTopics.WS_USER_ONLINE_STATUS_TOPIC, stringSerializer, stringSerializer, SESSION_PROCESSOR)
//WS session routing
.addSource(WS_ROUTING_BY_SESSION_SOURCE, new StringDeserializer(), new StringDeserializer(),
KafkaTopics.WS_DELIVERY_TOPIC)
.addProcessor(WS_ROUTING_BY_SESSION_PROCESSOR, WSSessionRoutingProcessor::new,
WS_ROUTING_BY_SESSION_SOURCE)
.addStateStore(userConnectedNodesStoreBuilder, WS_ROUTING_BY_SESSION_PROCESSOR, SESSION_PROCESSOR)
//WS delivery
.addSource(WS_DELIVERY_SOURCE, new StringDeserializer(), new StringDeserializer(),
INSTANCE_SPECIFIC_TOPIC)
.addProcessor(WS_DELIVERY_PROCESSOR, WSDeliveryProcessor::new, WS_DELIVERY_SOURCE);
拓扑中最后提到的源是每个应用程序实例的特定主题。我希望该主题仅由该实例处理。此主题的数据由前一个处理器推送,基于哪个实例必须处理该消息。
但是一旦流启动,它会尝试将实例特定的主题分区也分配给其他实例。我们可以在Kafka流中实现这个要求吗?
我希望一个主题仅由特定实例处理。
你想要的是不可能的。对于Kafka Streams程序,同一应用程序的所有实例都需要完全相同,因此需要具有相同的输入主题。
你需要将应用程序分为4个应用程序:第一个应用程序执行程序的共享分区,并写入3个不同的主题。此外,您还有3个应用程序(具有自己的应用程序id),每个应用程序都阅读其中一个主题。
注意,如果需要,可以在同一个JVM中运行多个KafkaStreams客户机。
我有一个bean,它包含同一个组件的两个自动连接的实例: SomeOtherBean有一个原型范围: 每个自动加载的某个其他豆的可配置值需要不同,并将通过属性占位符提供: 理想情况下,我希望使用注释来指定可配置属性的值。 通过XML执行此操作很容易,但我想知道这是否是 a) 无法使用注释或 b) 如何做到这一点。
我们正在构建一个系统,以可控且可预测的速率将数据从生态系统中的一个点移动到另一个点。该系统基本上有一个主要的Kafka主题,它获取N种类型的消息。然后,使用此主题的代理根据消息负载主题将消息放入N个工作队列中的一个。然后,这些工人队列让工人实际执行工作。 现在,我想实现控制旋钮,比如-暂停/恢复/限制来自代理的1种类型的消息。假设代理和工作人员之间没有通信,我怎么做呢。我是否应该为代理写入这些消息
我有一个spring boot应用程序(比方说它叫app-1),它连接到一个kafka集群,并从一个特定的主题进行消费,比方说这个主题叫做“foo”。当另一个应用程序(比如称为app-2)将新的foo项导入数据库时,主题foo总是会收到一条消息。该主题主要用于第三个应用程序(比如app-3),它向可能对这个新foo项目感兴趣的人发送一些电子邮件通知。App-3是集群的,这意味着它有多个实例同时运行
我希望能够通过属性阅读主题,而无需在 Kafka 侦听器注释上指定任何内容。不使用Spring靴。 我尝试通过“topics”键直接从properties对象中读取主题。这将产生一个错误:< code>IllegalStateException:必须提供topics、topicPattern或topicPartitions。
我有一些关于Kafka主题分区->spark流媒体资源利用的用例,我想更清楚地说明这些用例。 我使用spark独立模式,所以我只有“执行者总数”和“执行者内存”的设置。据我所知并根据文档,将并行性引入Spark streaming的方法是使用分区的Kafka主题->RDD将具有与Kafka相同数量的分区,当我使用spark-kafka直接流集成时。 因此,如果我在主题中有一个分区和一个执行器核心,
我试图为我的Kafka Connect Sink指定一个主题分区。特别是,我正在使用DataStax Apache Kafka连接器。 有大量与为 Kafka 使用者指定主题分区相关的文档和资源,例如: https://kafka-tutorials.confluent.io/kafka-console-consumer-read-specific-offsets-partitions/kafka