内容类型和转换
要允许您传播关于已生成消息的内容类型的信息,默认情况下,Spring Cloud Stream附加contentType
标头到出站消息。对于不直接支持头文件的中间件,Spring Cloud Stream提供了自己的自动将邮件包裹在自己的信封中的机制。对于支持头文件的中间件,Spring Cloud Stream应用程序可以从非Spring Cloud Stream应用程序接收具有给定内容类型的消息。
Spring Cloud Stream可以通过两种方式处理基于此信息的消息:
- 通过其入站和出站渠道的
contentType
设置 - 通过对
@StreamListener
注释的方法执行的参数映射
Spring Cloud Stream允许您使用绑定的spring.cloud.stream.bindings.<channelName>.content-type
属性声明性地配置输入和输出的类型转换。请注意,一般类型转换也可以通过在应用程序中使用变压器轻松实现。目前,Spring Cloud Stream本机支持流中常用的以下类型转换:
- 来自/从POJO的JSON
- JSON /从org.springframework.tuple.Tuple
- 对象到/来自byte []:用于远程传输的原始字节序列化,应用程序发出的字节,或使用Java序列化转换为字节(要求对象为Serializable)
- 字符串到/来自byte []
- 对象到纯文本(调用对象的toString()方法)
其中JSON表示包含JSON的字节数组或字符串有效负载。目前,对象可以从JSON字节数组或字符串转换。转换为JSON总是产生一个String。
如果在出站通道上没有设置content-type
属性,则Spring Cloud Stream将使用基于Kryo序列化框架的序列化程序对有效负载进行序列化。在目的地反序列化消息需要在接收者的类路径上存在有效载荷类。
MIME类型
content-type
值被解析为媒体类型,例如application/json
或text/plain;charset=UTF-8
。MIME类型对于指示如何转换为String或byte []内容特别有用。Spring Cloud Stream还使用MIME类型格式来表示Java类型,使用具有type
参数的一般类型application/x-java-object
。例如,application/x-java-object;type=java.util.Map
或application/x-java-object;type=com.bar.Foo
可以设置为输入绑定的content-type
属性。此外,Spring Cloud Stream提供自定义MIME类型,特别是application/x-spring-tuple
来指定元组。
MIME类型和Java类型
类型转换Spring Cloud Stream提供的开箱即用如下表所示:“源有效载荷”是指转换前的有效载荷,“目标有效载荷”是指转换后的“有效载荷”。类型转换可以在“生产者”一侧(输出)或“消费者”一侧(输入)上进行。
来源有效载荷 | 目标有效载荷 | content-type 标题(来源讯息) | content-type 标题(转换后) | 注释 |
---|---|---|---|---|
POJO | JSON String | ignored | application/json | |
Tuple | JSON String | ignored | application/json | JSON是为Tuple量身定制的 |
POJO | String (toString()) | ignored | text/plain, java.lang.String | |
POJO | byte[] (java.io serialized) | ignored | application/x-java-serialized-object | |
JSON byte[] or String | POJO | application/json (or none) | application/x-java-object | |
byte[] or String | Serializable | application/x-java-serialized-object | application/x-java-object | |
JSON byte[] or String | Tuple | application/json (or none) | application/x-spring-tuple | |
byte[] | String | any | text/plain, java.lang.String | 将应用在content-type头中指定的任何Charset |
String | byte[] | any | application/octet-stream | 将应用在content-type头中指定的任何Charset |
注意 | 转换适用于需要类型转换的有效内容。例如,如果应用程序生成带有outputType = application / json的XML字符串,则该有效载荷将不会从XML转换为JSON。这是因为发送到出站通道的有效载荷已经是一个String,所以在运行时不会应用转换。同样重要的是要注意,当使用默认的序列化机制时,必须在发送和接收应用程序之间共享有效负载类,并且与二进制内容兼容。当应用程序代码在两个应用程序中独立更改时,这可能会产生问题,因为二进制格式和代码可能会变得不兼容。 |
提示 | 虽然入站和出站渠道都支持转换,但特别推荐将其用于转发出站邮件。对于入站邮件的转换,特别是当目标是POJO时, |
自定义邮件转换
除了支持开箱即用的转换,Spring Cloud Stream还支持注册您自己的邮件转换实现。这允许您以各种自定义格式(包括二进制)发送和接收数据,并将其与特定的contentTypes
关联。Spring Cloud Stream将所有类型为org.springframework.messaging.converter.MessageConverter
的bean注册为自定义消息转换器以及开箱即用消息转换器。
如果您的消息转换器需要使用特定的content-type
和目标类(用于输入和输出),则消息转换器需要扩展org.springframework.messaging.converter.AbstractMessageConverter
。对于使用@StreamListener
的转换,实现org.springframework.messaging.converter.MessageConverter
的消息转换器就足够了。
以下是在Spring Cloud Stream应用程序中创建消息转换器bean(内容类型为application/bar
)的示例:
@EnableBinding(Sink.class)
@SpringBootApplication
public static class SinkApplication {
...
@Bean
public MessageConverter customMessageConverter() {
return new MyCustomMessageConverter();
}
public class MyCustomMessageConverter extends AbstractMessageConverter {
public MyCustomMessageConverter() {
super(new MimeType("application", "bar"));
}
@Override
protected boolean supports(Class<?> clazz) {
return (Bar.class == clazz);
}
@Override
protected Object convertFromInternal(Message<?> message, Class<?> targetClass, Object conversionHint) {
Object payload = message.getPayload();
return (payload instanceof Bar ? payload : new Bar((byte[]) payload));
}
}
Spring Cloud Stream还为基于Avro的转换器和模式演进提供支持。详情请参阅具体章节。
@StreamListener
和讯息转换
@StreamListener
注释提供了一种方便的方式来转换传入的消息,而不需要指定输入通道的内容类型。在使用@StreamListener
注释的方法的调度过程中,如果参数需要转换,将自动应用转换。
例如,让我们考虑一个带有{"greeting":"Hello, world"}
的String内容的消息,并且在输入通道上收到application/json
的application/json
标题。让我们考虑接收它的以下应用程序:
public class GreetingMessage {
String greeting;
public String getGreeting() {
return greeting;
}
public void setGreeting(String greeting) {
this.greeting = greeting;
}
}
@EnableBinding(Sink.class)
@EnableAutoConfiguration
public static class GreetingSink {
@StreamListener(Sink.INPUT)
public void receive(Greeting greeting) {
// handle Greeting
}
}
该方法的参数将自动填充包含JSON字符串的未编组形式的POJO。