当前位置: 首页 > 知识库问答 >
问题:

使用Spring cloud stream kafka stream的binder存在的问题

农存
2023-03-14

我试图用Spring的云流Kafka流来阅读Kafka。然后我在一分钟的时间窗口内汇总事件,并将其转移到不同的主题。然后,我需要从主题中读取聚合事件,并将其写入另一个主题,同时将该主题与另一个Kafka集群中的不同主题绑定。但我得到了下面的例外。

org.springframework.context.ApplicationContextException: Failed to start bean 'outputBindingLifecycle'; nested exception is java.lang.IllegalStateException: The binder 'kafkaha' cannot bind a com.sun.proxy.$Proxy155
    at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:185)
    at org.springframework.context.support.DefaultLifecycleProcessor.access$200(DefaultLifecycleProcessor.java:53)
    at org.springframework.context.support.DefaultLifecycleProcessor$LifecycleGroup.start(DefaultLifecycleProcessor.java:360)
    at org.springframework.context.support.DefaultLifecycleProcessor.startBeans(DefaultLifecycleProcessor.java:158)
    at org.springframework.context.support.DefaultLifecycleProcessor.onRefresh(DefaultLifecycleProcessor.java:122)
    at org.springframework.context.support.AbstractApplicationContext.finishRefresh(AbstractApplicationContext.java:893)
    at org.springframework.boot.web.servlet.context.ServletWebServerApplicationContext.finishRefresh(ServletWebServerApplicationContext.java:163)
    at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:552)
    at org.springframework.boot.web.servlet.context.ServletWebServerApplicationContext.refresh(ServletWebServerApplicationContext.java:142)
    at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:775)
    at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:397)
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:316)
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:1260)
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:1248)
    at com.expediagroup.platform.StreamingApplication.main(StreamingApplication.java:11)
Caused by: java.lang.IllegalStateException: The binder 'kafkaha' cannot bind a com.sun.proxy.$Proxy155
    at org.springframework.util.Assert.state(Assert.java:73)
    at org.springframework.cloud.stream.binder.DefaultBinderFactory.doGetBinder(DefaultBinderFactory.java:194)
    at org.springframework.cloud.stream.binder.DefaultBinderFactory.getBinder(DefaultBinderFactory.java:130)
    at org.springframework.cloud.stream.binding.BindingService.getBinder(BindingService.java:337)
    at org.springframework.cloud.stream.binding.BindingService.bindProducer(BindingService.java:229)
    at org.springframework.cloud.stream.binding.BindableProxyFactory.createAndBindOutputs(BindableProxyFactory.java:287)
    at org.springframework.cloud.stream.binding.OutputBindingLifecycle.doStartWithBindable(OutputBindingLifecycle.java:58)
    at java.util.LinkedHashMap$LinkedValues.forEach(LinkedHashMap.java:608)
    at org.springframework.cloud.stream.binding.AbstractBindingLifecycle.start(AbstractBindingLifecycle.java:48)
    at org.springframework.cloud.stream.binding.OutputBindingLifecycle.start(OutputBindingLifecycle.java:34)
    at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:182)
    ... 14 common frames omitted

我按照链接中的示例,尝试了下面的代码。

应用属性

spring:
  applicaiton.name: eg-destination-attribute-store-ha-search-stream
  cloud:
    consul:
      host: localhost
      port: 8500
      discovery:
        instanceId: eg-destination-attribute-store-ha-search-stream
    stream:
      kafka:
        streams:
          timeWindow:
            length: 60000
            advanceBy: 60000
          bindings:
            inputKstream:
              consumer:
                autoCommitOffset: true
                startOffset: earliest
                keySerde: org.apache.kafka.common.serialization.Serdes$StringSerde
                valueSerde: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
            bridge:
              producer:
                keySerde: org.apache.kafka.common.serialization.Serdes$StringSerde
                valueSerde: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
          binder:
            brokers: kafka.us-east-1.stage.kafka.away.black:9092
            configuration:
              schema.registry.url: http://kafka-schema-registry.us-east-1.stage.kafka.away.black:8081
              commit.interval.ms: 1000
              application.id: eg-test-dev #its a random id to be identified uniquly
            autoAddPartitions: false
            minPartitionCount: 1
            num:
              stream:
                threads: 1
      bindings:
        inputKstream:
          destination: business-events-search-event
          binder: kafkaha
          group: grp-eg-destination-attribute-store-ha-search-stream-ha
          consumer:
            useNativeDecoding: true
        bridge:
          destination: business-events-search-event-agg
          binder: kafkaha
          #group: grp-eg-destination-attribute-store-ha-search-stream
          consumer:
            useNativeDecoding: true
        output:
          destination: business-events-search-event-agg
          binder: kafkaha
          group: grp-eg-destination-attribute-store-ha-search-stream-eg-in
          consumer:
            useNativeDecoding: true
        input:
          destination: business-events-search-event-eg
          binder: kafkaeg
          group: grp-eg-destination-attribute-store-ha-search-stream-eg
          consumer:
            useNativeDecoding: true

      binders:
        kafkaha:
          type: kafka
          environment:
            spring:
              cloud:
                stream:
                  kafka:
                    binder:
                      brokers: kafka.us-east-1.stage.kafka.away.black:9092
        kafkaeg:
          type: kafka
          environment:
            spring:
              cloud:
                stream:
                  kafka:
                    binder:
                      brokers: localhost:9092

哈格。JAVA


@Slf4j
@EnableBinding(EgSrcSinkProcessor.class)
public class ExecutorHaAgg {

    @Value("${spring.cloud.stream.kafka.streams.binder.configuration.schema.registry.url}")
    private String schemaRegistryUrl;

    @Autowired
    private LookNPersistService service;

    @Autowired
    private TimeWindows timeWindows;

    @Timed(value = "kstream.BusinessModelMaskActiveLogV2.process.time", percentiles = {0.5, 0.9, 0.99}, histogram = true)
    @StreamListener
    @SendTo("bridge")
    public KStream<Windowed<String>, ResultValue> process(@Input("inputKstream") KStream<String, SearchBusinessEvent> inputKstream) {

        final Map<String, String> schemaMap = Collections.singletonMap(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl);
        final SpecificAvroSerde<SearchBusinessEvent> searchBussinessEventSerde = new SpecificAvroSerde<>();
        searchBussinessEventSerde.configure(schemaMap, false);
        TransformedValueSerde transformedValueSerde = new TransformedValueSerde();
        ResultValueSerde resultValueSerde = new ResultValueSerde();
        return inputKstream
                .filter((k, v) -> (v.getVisitorUuid() != null && v.getSearchTermUUIDs() != null && v.getSearchTermUUIDs().size() > 0))
                .map((k, v) -> KeyValue.pair(StringUtil.getSearchTermFromUri(v.getSearchTermUUIDs().get(0)), new TransformedValue(v.getAvailabilityStart(), v.getAvailabilityEnd(), v.getHeader().getTime())))
                .groupBy((k, v) -> k, Serialized.with(Serdes.String(), transformedValueSerde))
                .windowedBy(timeWindows)
                .aggregate(ResultValue::new, ((key, value, aggregate) -> {
                    aggregate.setSearchTerm(key);
                    aggregate.setTime((aggregate.getTime() < value.getTime()) ? value.getTime() : aggregate.getTime());
                    aggregate.setDatedCount(StringUtil.isDatedStrNullAndEmpty(value.getStartDate(), value.getEndDate()) ? aggregate.getDatedCount() : 1 + aggregate.getDatedCount());
                    aggregate.setCount(1 + aggregate.getCount());
                    return aggregate;
                }), Materialized.with(Serdes.String(), resultValueSerde)).toStream();
    }
}

Transporter.java


@Slf4j
@EnableBinding(Processor.class)
public class Transporter {

    @StreamListener(Processor.INPUT)
    @SendTo(Processor.OUTPUT)
    public Object transfer(Object object){
        return object;
    }
}

EGSRC处理器。JAVA



public interface EgSrcSinkProcessor {

    @Input("inputKstream")
    KStream<?, ?> inputKstream();

    @Output("bridge")
    KStream<?, ?> bridgeKstream();
}

共有1个答案

云俊美
2023-03-14

在尝试将MessageChannel和KStream混合到同一绑定时,我也遇到了同样的问题。您的inputKstream应该绑定到kstream类型。我的意见如下:

management.endpoints.web.exposure.include=*
spring.profiles=kafka
spring.cloud.stream.bindings.output.binder=kafka1
spring.cloud.stream.bindings.output.destination=board-events
spring.cloud.stream.bindings.output.contentType=application/json
spring.cloud.stream.bindings.output.producer.header-mode=none
spring.cloud.stream.bindings.input.binder=kstream1
spring.cloud.stream.bindings.input.destination=board-events
spring.cloud.stream.bindings.input.contentType=application/json
spring.cloud.stream.bindings.input.group=command-board-events-group
spring.cloud.stream.bindings.input.consumer.useNativeDecoding=true
spring.cloud.stream.bindings.input.consumer.header-mode=none
spring.cloud.stream.kafka.streams.binder.brokers=localhost
spring.cloud.stream.default-binder=kafka1
spring.cloud.stream.binders.kafka1.type=kafka
spring.cloud.stream.binders.kafka1.environment.spring.cloud.stream.kafka.streams.binder.brokers=localhost
spring.cloud.stream.binders.kstream1.type=kstream
spring.cloud.stream.binders.kstream1.environment.spring.cloud.stream.kafka.streams.binder.brokers=localhost
 类似资料:
  • 我是春云和Kafka流的新手。我正在尝试使用 kafka 活页夹设置Spring云应用程序。我尝试在本地测试 kafka 流处理器,但我无法打印任何日志。 我的kafka消息将包含JSONObject。kafkaStreamListener类是: Application.properties: 问题:在调试模式下,断点直接到达过滤器步骤,然后不执行任何操作。它跳过了记录器和SOP。不知道可能是什么

  • 本文向大家介绍Android学习之介绍Binder的简单使用,包括了Android学习之介绍Binder的简单使用的使用技巧和注意事项,需要的朋友参考一下 前言 最近因为公司项目需求,需要远程调度启动客户端输入法输入内容。 这就是大致的需求流程,这篇首先讲远程与服务控制端通讯。首先控制服务端定义好一个Service,且在ServiceManager注册添加服务。 在这里我讲解远程端与服务控制端通讯

  • Binder Binder 主要有以下两个特征: 不会处理错误事件 确保绑定都是在给定 Scheduler 上执行(默认 MainScheduler) 一旦产生错误事件,在调试环境下将执行 fatalError,在发布环境下将打印错误信息。 示例 在介绍 AnyObserver 时,我们举了这样一个例子: let observer: AnyObserver<Bool> = AnyObserver

  • 我有一个问题,找到一个合适的表达式,给我的DL查询结果,我想要。对于下面的示例: 对于个人:CourseA和CourseB,我断言属性: 对于个人John,断言了以下3项财产:

  • 我使用intellij IDEA进行开发,我注意到IDEA在Java7上运行时,maven插件在下载依赖项时出现了问题(我在Windows7)。然而,在Java6上运行它就没有这个问题了。 以下是idea.log的摘录 我按照建议做了以下几点 > 确保我的/etc/hosts文件中有 将添加到我的idea64.exe.vmoptions,因此文件如下所示 谢谢你的帮助。

  • 我对jet和hazelcast有一些问题,但出于逻辑目的,我的问题是,我有一个类,它将带来所有数据,当我们启动客户端时,我们得到一个队列和两个映射,但另一个映射尚未调用,当我启动hazelcast jet实例并在使用接收器时处理所有数据时,我放了以下内容: 但这是错误的。。。。我知道当你调用这个实例时,它就像hazelcast的getDataStructure,喷射它的内部hazelcast,我以