<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-schema</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka-streams</artifactId>
</dependency>
spring.cloud.stream.bindings.input.destination=input
spring.cloud.stream.bindings.input.group=myGroup
spring.cloud.stream.bindings.input.consumer.useNativeDecoding=true
spring.cloud.stream.kafka.streams.bindings.input.consumer.enableDlq=true
spring.cloud.stream.kafka.streams.bindings.input.consumer.dlqName=input_deadletter
spring.cloud.stream.kafka.streams.bindings.input.consumer.autoCommitOnError=true
spring.cloud.stream.kafka.streams.bindings.input.consumer.keySerde=io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
spring.cloud.stream.kafka.streams.bindings.input.consumer.valueSerde=io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
spring.cloud.stream.bindings.output.destination=output
spring.cloud.stream.bindings.output.content-Type=application/*+avro
spring.cloud.stream.bindings.output.producer.useNativeEncoding=true
spring.cloud.stream.bindings.output.producer.errorChannelEnabled=true
spring.cloud.stream.kafka.streams.bindings.output.producer.keySerde=io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
spring.cloud.stream.kafka.streams.bindings.output.producer.valueSerde=io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
spring.cloud.stream.schemaRegistryClient.endpoint.schema.avro.schema-locations=classpath:avro/*.avsc
spring.cloud.stream.kafka.streams.binder.brokers=localhost
spring.cloud.stream.kafka.streams.binder.configuration.default.key.serde=org.apache.kafka.common.serialization.Serdes$StringSerde
spring.cloud.stream.kafka.streams.binder.configuration.default.value.serde=org.apache.kafka.common.serialization.Serdes$StringSerde
spring.cloud.stream.kafka.streams.binder.configuration.commit.interval.ms=1000
spring.cloud.stream.kafka.streams.binder.configuration.schema.registry.url=http://localhost:8082
spring.cloud.stream.kafka.streams.binder.application-id=myGroup
spring.cloud.stream.kafka.streams.binder.serdeError=sendtodlq
我可以在日志中看到service activator已经注册并订阅了错误通道。一旦发生运行时异常,所有流都将停止并进入关闭模式。
Registering beans for JMX exposure on startup
org.springframework.integration.monitor.IntegrationMBeanExporter - Registering MessageChannel input.myGroup.errors
org.springframework.integration.monitor.IntegrationMBeanExporter - Located managed bean 'org.springframework.integration:type=MessageChannel,name="input-myGroup.errors"': registering with JMX server as MBean [org.springframework.integration:type=MessageChannel,name="input.myGroup.errors"] org.springframework.integration.monitor.IntegrationMBeanExporter - Registering MessageChannel errorChannel
org.springframework.integration.monitor.IntegrationMBeanExporter - Located managed bean 'org.springframework.integration:type=MessageChannel,name=errorChannel': registering with JMX server as MBean [org.springframework.integration:type=MessageChannel,name=errorChannel]
org.springframework.integration.monitor.IntegrationMBeanExporter - Registering MessageChannel nullChannel
org.springframework.integration.monitor.IntegrationMBeanExporter - Located managed bean 'org.springframework.integration:type=MessageChannel,name=nullChannel': registering with JMX server as MBean [org.springframework.integration:type=MessageChannel,name=nullChannel]
org.springframework.integration.monitor.IntegrationMBeanExporter - Registering MessageHandler errorLogger
org.springframework.integration.monitor.IntegrationMBeanExporter - Located managed bean 'org.springframework.integration:type=MessageHandler,name=errorLogger,bean=internal': registering with JMX server as MBean [org.springframework.integration:type=MessageHandler,name=errorLogger,bean=internal]
org.springframework.integration.monitor.IntegrationMBeanExporter - Registering MessageHandler myTopicListener.error.serviceActivator
org.springframework.integration.monitor.IntegrationMBeanExporter - Located managed bean 'org.springframework.integration:type=MessageHandler,name=myTopicListener.error.serviceActivator,bean=endpoint': registering with JMX server as MBean [org.springframework.integration:type=MessageHandler,name=myTopicListener.error.serviceActivator,bean=endpoint]
org.springframework.integration.monitor.IntegrationMBeanExporter - Registering MessageHandler myTopicListener.errorGlobal.serviceActivator
org.springframework.integration.monitor.IntegrationMBeanExporter - Located managed bean 'org.springframework.integration:type=MessageHandler,name=myTopicListener.errorGlobal.serviceActivator,bean=endpoint': registering with JMX server as MBean [org.springframework.integration:type=MessageHandler,name=myTopicListener.errorGlobal.serviceActivator,bean=endpoint]
org.springframework.kafka.annotation.KafkaListenerAnnotationBeanPostProcessor - No @KafkaListener annotations found on bean type: class org.springf
@SendTo(MyStreams.OUTPUT)
public KStream<Key, MyEntity> process(KStream<Key, Envelope> myStreamObject) {
return myStreamObject.mapValues(this::transform);
}
@ServiceActivator(inputChannel = "input.myGroup.errors") //channel name 'input.myGroup.errors'
public void error(Message<?> message) {
System.out.println("Handling ERROR: " + message);
}
@ServiceActivator(inputChannel = "errorChannel")
public void errorGlobal(Message<?> message) {
System.out.println("Handling ERROR: GLOBAL " + message);
}
kafka streams绑定器不是基于MessageChannel
的,因此没有消息
可发送到错误通道。
标准的kafka绑定器是MessageChannelBinder
并支持错误通道。
对于Kafka流,您必须实现自己的错误处理。
关于errorChannel没有任何订阅者的投诉 Kafka生产者线程日志 编辑:以下是我的channels类的内容:
问题内容: 我已经有这个问题近两个月了,无法解决。问题是,如果我的应用程序正在运行,并且我从Eclipse运行(重新安装)我的应用程序,则会收到一条错误消息,指示我的应用程序已崩溃“不幸的是,已停止。”。我注意到,当我从PC / Eclipse上运行它时,也会发生这种情况,我认为只有在一段时间不运行它时,它才会发生。 仅当应用程序在第三个活动(BaseDiagramActivity)中处于活动状态
我在我的Android应用程序中面临奇怪的问题。它总是给我错误的名字 JAVAlang.RuntimeException:无法实例化应用程序io。日本动画。应用程序。应用程序:java。lang.ClassNotFoundException:在路径:DexPathList[[zip文件”//data/app/io.hitanime.app-957U27HR57arXVvYGSCFSQ==/base
问题内容: 尽管 通过终端执行相同的命令时没有问题 , 但 我却遇到了类似执行以下命令时的异常情况 。 我需要执行并返回以下命令的输出: 这是使用类执行命令的方法: 问题答案: Runtime.exec不使用外壳程序(例如);它将命令直接传递到操作系统。这意味着将不会理解通配符(例如)和管道(),因为(像所有Unix命令一样)不会对这些字符进行任何解析。您需要使用类似 或者,如果出于某些奇怪的原因
运行应用程序时的问题:在Consol屏幕中 java.lang.SecurityException:权限拒绝:starting Intent{act=Android.Intent.action.main cat=[Android.Intent.category.launcher]flg=0x10000000 cmp=com.test.sample/.setting}from null(PID=91
当运行我的Spring启动应用程序时,启动时低于异常。不确定是什么依赖导致了这一点。 Spring启动版本-2.3.1 IDE-Intellij 这是一个多模块maven项目。如果需要其他信息,请告诉我。 下面是我的父母pom.xml 4.0.0