当前位置: 首页 > 知识库问答 >
问题:

在一个Kafka Streams拓扑(Spring Cloud Stream)中可以使用多个代理吗?

端木阳荣
2023-03-14
spring:
  cloud:
    stream:
      kafka:
        streams:
          bindings:
            doob-output-topic-out:
              applicationId: doob-output-topic-out
              producer:
                keySerde: org.apache.kafka.common.serialization.Serdes$StringSerde
                valueSerde: tr.com.havelsan.doob.cloud.framework.service.track.base.core.serde.BaseDataSerde
                topic:
                  properties:
                    retention.bytes: 300000000
                    segment.bytes: 300000000
            doob-input-topic-in:
              consumer:
                applicationId: doob-input-topic-in
                keySerde: org.apache.kafka.common.serialization.Serdes$StringSerde
                valueSerde: tr.com.havelsan.doob.cloud.framework.service.track.base.core.serde.BaseDataSerde
                topic:
                  properties:
                    retention.bytes: 300000000
                    segment.bytes: 300000000
      binders:
        outputKafka:
          type: kstream
          environment:
            spring:
              cloud:
                stream:
                  kafka:
                    streams:
                      binder:
                        brokers: ${1kafka.brokers1}
                        autoCreateTopics: true
                        autoAddPartitions: true
                        minPartitionCount: 8
                        configuration:
                          commit.interval.ms: 1000
        inputKafka:
          type: kstream
          environment:
            spring:
              cloud:
                stream:
                  kafka:
                    streams:
                      binder:
                        brokers: ${2kafka.brokers2}
                        autoCreateTopics: true
                        autoAddPartitions: true
                        minPartitionCount: 8
                        configuration:
                          commit.interval.ms: 1000
                          max:
                            request:
                              size: 20000000
      bindings:
        doob-output-topic-out:
          destination: outputTopic
          binder: outputKafka
          producer:
            partition-count: 8
        doob-input-topic-in:
          destination: inputTopic
          binder: inputKafka

manage:
  storeName: trackList15

源代码:

    @StreamListener(BASE_TOPIC_INPUT)
    @SendTo(BASE_TOPIC_OUTPUT)
    public KStream<String, BaseData> consumeTrackFromSynchronization(KStream<String, BaseData> baseDataStream) {
        return baseDataStream.filter((s, baseData) -> BaseUtil.getTrackType(baseData).equals(BaseTypeEnum.FK)).groupByKey()
                .reduce((baseData, s1) -> s1, Materialized.<String, BaseData, KeyValueStore<Bytes, byte[]>>as(storeName)
                        .withKeySerde(Serdes.String()).
                                withValueSerde(baseDataSerde)).toStream()
                .peek((s, baseData) -> baseServiceHelper.processBase(baseData, BaseTypeEnum.FK));
    }

共有1个答案

卜季萌
2023-03-14

在单个Kafka Streams处理器中,不可能从一个集群读写另一个集群。但是,在单个应用程序(JVM)中,可以有多个处理器,每个处理器与单个Kafka集群交互。

有关更多细节,请参见此线程。

下面是使用Spring Cloud Stream的一个解决方案。

    null
 类似资料:
  • 我们有一个Spring Boot Kafka Streams处理器。由于各种原因,我们可能会遇到需要进程启动和运行的情况,但是没有我们希望订阅的主题。在这种情况下,我们只希望进程“Hibernate”,因为其他活动/环境检查器依赖于它的运行。此外,它是RedHat OCP集群的一部分,我们不希望pod不断地执行崩溃退避循环。我完全理解,在使用有效主题重新启动之前,它永远不会真正做任何事情,但没关系

  • 我正在研究一个storm拓扑,需要为不同的客户端位置构建多个拓扑。 谢谢你的回复。

  • 我正在学习storm。我对Apache storm上一次可以运行的拓扑数有疑问。我在storm集群上提交了两个拓扑,但一次只能运行一个拓扑。我需要杀死或停用已经存在的拓扑才能运行任何新的拓扑。 我用的是Storm0.9.4 动物园管理员3.4.6 我在附上我的暴风UI的截图。

  • 我试着把这个理论与缩放工人做比较。 但是,使用版本1.2.1时,storm Kafka spout在多个不同的拓扑中的行为并不像我预期的那样。 为单个主题的所有拓扑中的kafka spout使用者设置一个公共client.id和group.id,每个拓扑仍然订阅所有可用的分区和重复的元组,并在重新提交已提交的元组时抛出错误。 如果有人能解释一下 Kafka喷口的这种行为的实现逻辑是什么? 有解决此

  • 我正在运行一个Kafka Streams应用程序,它有三个子拓扑。活动的阶段大致如下: 主题A 主题A、B和C都是物化的,这意味着如果每个主题有40个分区,我的最大并行度是120。 起初,我运行5个流应用程序,每个线程8个。在这种设置下,我遇到了不一致的性能。似乎某些共享同一线程的子拓扑比其他子拓扑更渴望CPU,过了一会儿,我会得到这个错误:组[consumer_group]中的中删除。一切都会重

  • 在上图中,左边的源元素生成类型为“requirement”的对象(它是从代理类继承的自定义类)。这个类表示一个矩阵,其中包含每个产品“PI”和每个客户“CI”的要求(给出了矩阵的一个示例): 这个矩阵可以被看作是一个代理的集合,因为每行都是与我的电路块的第一个其他代理有关的代理(逻辑上,它包含关于要从产品供应商订购的产品数量Pi的信息),每列都是与我的电路块的第二个其他代理有关的代理(逻辑上,它包