在将我们的一个服务升级到spring-cloud-stream
2.0.0.rc3时,当试图使用一个使用较旧版本spring-cloud-stream
-ditmars的服务生成的消息时,我们遇到了一个异常。发行版:
错误31241---[container-4-C-1]O.S.Integration.Handler.LoggingHandler:org.SpringFramework.Messaging.Converter.MessageConversionException:无法将GenericMessage[Payload=Byte[371],Headers={kafka_offset=1,kafka_Consumer=org.Apache.kafka_Clients.Consumer.kafka_Clients.Consumer@62029D0D,kafka_TimeStampType=Create_Time,21641760698,timestamp=1521641772477}]位于org.springframework.messaging.handler.annotation.support.payloadargumentresolver.resolveargument(payloadargumentresolver.java:144)位于org.springframework.messaging.handler.invocation.handler.methodargumentargumentresolver.java:144)位于org.springframework.messaging.handler.invocation.resolveargument(Tream.binding.dispatchingStreamListenerMessageHandler.Handler.RequestMessage(DispatchingStreamListenerMessageHandler.java:87)位于org.SpringFramework.Integration.Handler.AbstractReplyProducingMessageHandler.HandleMessageInternal(AbstractReplyProducingMessageHandler.109)位于org.SpringFramework.Integration.Handler.AbstractMessageHandler.AbstractMessage.157)位于L.send(AbstractMessageChannel.java:407)在org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:181)在org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.160)在org.springframework.messaging.core.GenericMessagingTemplate.doSend(n消息(KafkaMessageDrivenChannelAdapter.java:364)位于org.springframework.kafka.listener.kafkaMessageListenerContainer$listenerConsumer.doInvokeRecistener(KafkaMessageListenerContainer.kafkaMessageListener.java:1001)位于org.springframework.kafka.listener.kafkaMessageListener.java:1001)位于
原因似乎是与消息一起发送的ContentType
头是text/plain
,尽管它应该是application/json
。
生产者配置:
spring: cloud: stream: kafka: binder: brokers: kafka defaultBrokerPort: 9092 zkNodes: zookeeper defaultZkPort: 2181 minPartitionCount: 2 replicationFactor: 1 autoCreateTopics: true autoAddPartitions: true headers: type,message_id requiredAcks: 1 configuration: "[security.protocol]": PLAINTEXT #TODO: This is a workaround. Should be security.protocol bindings: user-output: producer: sync: true configuration: retries: 10000 default: binder: kafka contentType: application/json group: user-service consumer: maxAttempts: 1 producer: partitionKeyExtractorClass: com.watercorp.user_service.messaging.PartitionKeyExtractor bindings: user-output: destination: user producer: partitionCount: 5
null
spring: cloud: stream: kafka: binder: brokers: kafka defaultBrokerPort: 9092 minPartitionCount: 2 replicationFactor: 1 autoCreateTopics: true autoAddPartitions: true headers: type,message_id requiredAcks: 1 configuration: "[security.protocol]": PLAINTEXT #TODO: This is a workaround. Should be security.protocol bindings: user-input: consumer: autoRebalanceEnabled: true autoCommitOnError: true enableDlq: true default: binder: kafka contentType: application/json group: enrollment-service consumer: maxAttempts: 1 headerMode: embeddedHeaders producer: partitionKeyExtractorClass: com.watercorp.messaging.PartitionKeyExtractor headerMode: embeddedHeaders bindings: user-input: destination: user consumer: concurrency: 5 partitioned: true
消费者@StreamListener:
@StreamListener(target = UserInput.INPUT, condition = "headers['type']=='" + USERS_DELETED + "'") public void handleUsersDeletedMessage(@Valid UsersDeletedMessage usersDeletedMessage, @Header(value = "kafka_receivedPartitionId", required = false) String partitionId, @Header(value = KAFKA_TOPIC_HEADER_NAME, required = false) String topic, @Header(MESSAGE_ID_HEADER_NAME) String messageId) throws Throwable { logger.info(String.format("Received users deleted message message, message id: %s topic: %s partition: %s", messageId, topic, partitionId)); handleMessageWithRetry(_usersDeletedMessageHandler, usersDeletedMessage, messageId, topic); }
这是RC3中的一个bug;最近固定在主人身上;它将在本月底的GA版本中发布。同时,您能尝试使用2.0.0.build-Snapshot吗?
我能够重现这个问题,并用快照为我修复了它。
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
<version>2.0.0.BUILD-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka</artifactId>
<version>2.0.0.BUILD-SNAPSHOT</version>
<exclusions>
<exclusion>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka-core</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka-core</artifactId>
<version>2.0.0.BUILD-SNAPSHOT</version>
</dependency>
编辑
关于完整性:
Ditmars生产者
@SpringBootApplication
@EnableBinding(Source.class)
public class So49409104Application {
public static void main(String[] args) {
SpringApplication.run(So49409104Application.class, args);
}
@Bean
public ApplicationRunner runner(MessageChannel output) {
return args -> {
Foo foo = new Foo();
foo.setBar("bar");
output.send(new GenericMessage<>(foo));
};
}
public static class Foo {
private String bar;
public String getBar() {
return this.bar;
}
public void setBar(String bar) {
this.bar = bar;
}
@Override
public String toString() {
return "Foo [bar=" + this.bar + "]";
}
}
}
而且
spring:
cloud:
stream:
bindings:
output:
destination: so49409104a
content-type: application/json
producer:
header-mode: embeddedHeaders
埃尔姆赫斯特消费者:
@SpringBootApplication
@EnableBinding(Sink.class)
public class So494091041Application {
public static void main(String[] args) {
SpringApplication.run(So494091041Application.class, args);
}
@StreamListener(Sink.INPUT)
public void listen(Foo foo) {
System.out.println(foo);
}
public static class Foo {
private String bar;
public String getBar() {
return this.bar;
}
public void setBar(String bar) {
this.bar = bar;
}
@Override
public String toString() {
return "Foo [bar=" + this.bar + "]";
}
}
}
而且
spring:
cloud:
stream:
bindings:
input:
group: so49409104
destination: so49409104a
consumer:
header-mode: embeddedHeaders
content-type: application/json
结果:
Foo [bar=bar]
header-mode
是必需的,因为2.0中的默认值是native
,现在Kafka支持本机标头。
我在使用spring-jms模块转换来自RabbitMQ的消息时遇到了一些问题。以前,我使用Rest APIendpoint发送消息,该endpoint将消息发送到RabbitMQ队列,并使用@JMSListener方法处理它。 在内部,这种行为添加了一个字段来确定Java类型,由Spring库管理。但是,现在我想避免Rest API调用,因为它不是必需的,而且我可以直接将消息发送到RabbitM
Spring JMS中的类具有setter方法,它允许提供我们想要的任何转换器。 对于带注释的消息侦听器,这很有意义,因为我们可以直接定义 spring将负责将消息转换并传递给这个侦听器。 因此,这对于带注释的听者来说显然是有意义的。我的问题是,设置消息转换器对非注释消息侦听器有用吗?类似于 从我在docs/javadocs中的搜索和对源代码的有限理解来看,我认为为这种情况设置消息转换器是没有帮助
使用MVC Java编程配置方式时,如果你想替换Spring MVC提供的默认转换器,完全定制自己的HttpMessageConverter,这可以通过覆写configureMessageConverters()方法来实现。如果你只是想定制一下,或者想在默认转换器之外再添加其他的转换器,那么可以通过覆写extendMessageConverters()方法来实现。 下面是一段例子,它使用定制的Ob
我有一个JMS侦听器,它正在从另一个应用程序接收字节消息。当前应用程序正在使用Spring JMS。我想在这里介绍spring集成。因此,我添加了以下示例代码来侦听消息。 然后我得到如下类强制转换异常: 我得到了一个ByteMessage,但我没有找到一个关于如何提取带有字节数组有效负载的ByteMessage的好例子。我是Spring集成世界的新手。
我正在尝试使用Spring进行GET http请求。 我的主要班级: 使用CatMessage类: 我应该得到回来,因为我使用不工作的用户名和密码组合(和这个服务器部分工作正常),是: { } 我认为这应该行得通,因为我几乎是在复制Spring for Android的基本auth项目 但是发生的是解析的问题(我认为)。当然,我已经包含了Jackson和Spring依赖项,所以我不希望我的问题出现
我正在使用Spring Cloud Stream和RabbitMQ活页夹。它可以很好地处理字节[]负载和Java本机序列化,但我需要处理JSON负载。 这是我的处理器类。 输入到和输出到是带有Jackson注释的POJO。 如何配置JSON转换策略 消息头应该如何被接受和处理