我试图用Spring的云流Kafka流来阅读Kafka。然后我在一分钟的时间窗口内汇总事件,并将其转移到不同的主题。然后,我需要从主题中读取聚合事件,并将其写入另一个主题,同时将该主题与另一个Kafka集群中的不同主题绑定。但我得到了下面的例外。
org.springframework.context.ApplicationContextException: Failed to start bean 'outputBindingLifecycle'; nested exception is java.lang.IllegalStateException: The binder 'kafkaha' cannot bind a com.sun.proxy.$Proxy155
at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:185)
at org.springframework.context.support.DefaultLifecycleProcessor.access$200(DefaultLifecycleProcessor.java:53)
at org.springframework.context.support.DefaultLifecycleProcessor$LifecycleGroup.start(DefaultLifecycleProcessor.java:360)
at org.springframework.context.support.DefaultLifecycleProcessor.startBeans(DefaultLifecycleProcessor.java:158)
at org.springframework.context.support.DefaultLifecycleProcessor.onRefresh(DefaultLifecycleProcessor.java:122)
at org.springframework.context.support.AbstractApplicationContext.finishRefresh(AbstractApplicationContext.java:893)
at org.springframework.boot.web.servlet.context.ServletWebServerApplicationContext.finishRefresh(ServletWebServerApplicationContext.java:163)
at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:552)
at org.springframework.boot.web.servlet.context.ServletWebServerApplicationContext.refresh(ServletWebServerApplicationContext.java:142)
at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:775)
at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:397)
at org.springframework.boot.SpringApplication.run(SpringApplication.java:316)
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1260)
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1248)
at com.expediagroup.platform.StreamingApplication.main(StreamingApplication.java:11)
Caused by: java.lang.IllegalStateException: The binder 'kafkaha' cannot bind a com.sun.proxy.$Proxy155
at org.springframework.util.Assert.state(Assert.java:73)
at org.springframework.cloud.stream.binder.DefaultBinderFactory.doGetBinder(DefaultBinderFactory.java:194)
at org.springframework.cloud.stream.binder.DefaultBinderFactory.getBinder(DefaultBinderFactory.java:130)
at org.springframework.cloud.stream.binding.BindingService.getBinder(BindingService.java:337)
at org.springframework.cloud.stream.binding.BindingService.bindProducer(BindingService.java:229)
at org.springframework.cloud.stream.binding.BindableProxyFactory.createAndBindOutputs(BindableProxyFactory.java:287)
at org.springframework.cloud.stream.binding.OutputBindingLifecycle.doStartWithBindable(OutputBindingLifecycle.java:58)
at java.util.LinkedHashMap$LinkedValues.forEach(LinkedHashMap.java:608)
at org.springframework.cloud.stream.binding.AbstractBindingLifecycle.start(AbstractBindingLifecycle.java:48)
at org.springframework.cloud.stream.binding.OutputBindingLifecycle.start(OutputBindingLifecycle.java:34)
at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:182)
... 14 common frames omitted
我按照链接中的示例,尝试了下面的代码。
应用属性
spring:
applicaiton.name: eg-destination-attribute-store-ha-search-stream
cloud:
consul:
host: localhost
port: 8500
discovery:
instanceId: eg-destination-attribute-store-ha-search-stream
stream:
kafka:
streams:
timeWindow:
length: 60000
advanceBy: 60000
bindings:
inputKstream:
consumer:
autoCommitOffset: true
startOffset: earliest
keySerde: org.apache.kafka.common.serialization.Serdes$StringSerde
valueSerde: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
bridge:
producer:
keySerde: org.apache.kafka.common.serialization.Serdes$StringSerde
valueSerde: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
binder:
brokers: kafka.us-east-1.stage.kafka.away.black:9092
configuration:
schema.registry.url: http://kafka-schema-registry.us-east-1.stage.kafka.away.black:8081
commit.interval.ms: 1000
application.id: eg-test-dev #its a random id to be identified uniquly
autoAddPartitions: false
minPartitionCount: 1
num:
stream:
threads: 1
bindings:
inputKstream:
destination: business-events-search-event
binder: kafkaha
group: grp-eg-destination-attribute-store-ha-search-stream-ha
consumer:
useNativeDecoding: true
bridge:
destination: business-events-search-event-agg
binder: kafkaha
#group: grp-eg-destination-attribute-store-ha-search-stream
consumer:
useNativeDecoding: true
output:
destination: business-events-search-event-agg
binder: kafkaha
group: grp-eg-destination-attribute-store-ha-search-stream-eg-in
consumer:
useNativeDecoding: true
input:
destination: business-events-search-event-eg
binder: kafkaeg
group: grp-eg-destination-attribute-store-ha-search-stream-eg
consumer:
useNativeDecoding: true
binders:
kafkaha:
type: kafka
environment:
spring:
cloud:
stream:
kafka:
binder:
brokers: kafka.us-east-1.stage.kafka.away.black:9092
kafkaeg:
type: kafka
environment:
spring:
cloud:
stream:
kafka:
binder:
brokers: localhost:9092
哈格。JAVA
@Slf4j
@EnableBinding(EgSrcSinkProcessor.class)
public class ExecutorHaAgg {
@Value("${spring.cloud.stream.kafka.streams.binder.configuration.schema.registry.url}")
private String schemaRegistryUrl;
@Autowired
private LookNPersistService service;
@Autowired
private TimeWindows timeWindows;
@Timed(value = "kstream.BusinessModelMaskActiveLogV2.process.time", percentiles = {0.5, 0.9, 0.99}, histogram = true)
@StreamListener
@SendTo("bridge")
public KStream<Windowed<String>, ResultValue> process(@Input("inputKstream") KStream<String, SearchBusinessEvent> inputKstream) {
final Map<String, String> schemaMap = Collections.singletonMap(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl);
final SpecificAvroSerde<SearchBusinessEvent> searchBussinessEventSerde = new SpecificAvroSerde<>();
searchBussinessEventSerde.configure(schemaMap, false);
TransformedValueSerde transformedValueSerde = new TransformedValueSerde();
ResultValueSerde resultValueSerde = new ResultValueSerde();
return inputKstream
.filter((k, v) -> (v.getVisitorUuid() != null && v.getSearchTermUUIDs() != null && v.getSearchTermUUIDs().size() > 0))
.map((k, v) -> KeyValue.pair(StringUtil.getSearchTermFromUri(v.getSearchTermUUIDs().get(0)), new TransformedValue(v.getAvailabilityStart(), v.getAvailabilityEnd(), v.getHeader().getTime())))
.groupBy((k, v) -> k, Serialized.with(Serdes.String(), transformedValueSerde))
.windowedBy(timeWindows)
.aggregate(ResultValue::new, ((key, value, aggregate) -> {
aggregate.setSearchTerm(key);
aggregate.setTime((aggregate.getTime() < value.getTime()) ? value.getTime() : aggregate.getTime());
aggregate.setDatedCount(StringUtil.isDatedStrNullAndEmpty(value.getStartDate(), value.getEndDate()) ? aggregate.getDatedCount() : 1 + aggregate.getDatedCount());
aggregate.setCount(1 + aggregate.getCount());
return aggregate;
}), Materialized.with(Serdes.String(), resultValueSerde)).toStream();
}
}
Transporter.java
@Slf4j
@EnableBinding(Processor.class)
public class Transporter {
@StreamListener(Processor.INPUT)
@SendTo(Processor.OUTPUT)
public Object transfer(Object object){
return object;
}
}
EGSRC处理器。JAVA
public interface EgSrcSinkProcessor {
@Input("inputKstream")
KStream<?, ?> inputKstream();
@Output("bridge")
KStream<?, ?> bridgeKstream();
}
在尝试将MessageChannel和KStream混合到同一绑定时,我也遇到了同样的问题。您的inputKstream应该绑定到kstream类型。我的意见如下:
management.endpoints.web.exposure.include=*
spring.profiles=kafka
spring.cloud.stream.bindings.output.binder=kafka1
spring.cloud.stream.bindings.output.destination=board-events
spring.cloud.stream.bindings.output.contentType=application/json
spring.cloud.stream.bindings.output.producer.header-mode=none
spring.cloud.stream.bindings.input.binder=kstream1
spring.cloud.stream.bindings.input.destination=board-events
spring.cloud.stream.bindings.input.contentType=application/json
spring.cloud.stream.bindings.input.group=command-board-events-group
spring.cloud.stream.bindings.input.consumer.useNativeDecoding=true
spring.cloud.stream.bindings.input.consumer.header-mode=none
spring.cloud.stream.kafka.streams.binder.brokers=localhost
spring.cloud.stream.default-binder=kafka1
spring.cloud.stream.binders.kafka1.type=kafka
spring.cloud.stream.binders.kafka1.environment.spring.cloud.stream.kafka.streams.binder.brokers=localhost
spring.cloud.stream.binders.kstream1.type=kstream
spring.cloud.stream.binders.kstream1.environment.spring.cloud.stream.kafka.streams.binder.brokers=localhost
我是春云和Kafka流的新手。我正在尝试使用 kafka 活页夹设置Spring云应用程序。我尝试在本地测试 kafka 流处理器,但我无法打印任何日志。 我的kafka消息将包含JSONObject。kafkaStreamListener类是: Application.properties: 问题:在调试模式下,断点直接到达过滤器步骤,然后不执行任何操作。它跳过了记录器和SOP。不知道可能是什么
本文向大家介绍Android学习之介绍Binder的简单使用,包括了Android学习之介绍Binder的简单使用的使用技巧和注意事项,需要的朋友参考一下 前言 最近因为公司项目需求,需要远程调度启动客户端输入法输入内容。 这就是大致的需求流程,这篇首先讲远程与服务控制端通讯。首先控制服务端定义好一个Service,且在ServiceManager注册添加服务。 在这里我讲解远程端与服务控制端通讯
Binder Binder 主要有以下两个特征: 不会处理错误事件 确保绑定都是在给定 Scheduler 上执行(默认 MainScheduler) 一旦产生错误事件,在调试环境下将执行 fatalError,在发布环境下将打印错误信息。 示例 在介绍 AnyObserver 时,我们举了这样一个例子: let observer: AnyObserver<Bool> = AnyObserver
我有一个问题,找到一个合适的表达式,给我的DL查询结果,我想要。对于下面的示例: 对于个人:CourseA和CourseB,我断言属性: 对于个人John,断言了以下3项财产:
我使用intellij IDEA进行开发,我注意到IDEA在Java7上运行时,maven插件在下载依赖项时出现了问题(我在Windows7)。然而,在Java6上运行它就没有这个问题了。 以下是idea.log的摘录 我按照建议做了以下几点 > 确保我的/etc/hosts文件中有 将添加到我的idea64.exe.vmoptions,因此文件如下所示 谢谢你的帮助。
我对jet和hazelcast有一些问题,但出于逻辑目的,我的问题是,我有一个类,它将带来所有数据,当我们启动客户端时,我们得到一个队列和两个映射,但另一个映射尚未调用,当我启动hazelcast jet实例并在使用接收器时处理所有数据时,我放了以下内容: 但这是错误的。。。。我知道当你调用这个实例时,它就像hazelcast的getDataStructure,喷射它的内部hazelcast,我以