配置选项
Spring Cloud Stream支持常规配置选项以及绑定和绑定器的配置。一些绑定器允许额外的绑定属性来支持中间件特定的功能。
可以通过Spring Boot支持的任何机制将配置选项提供给Spring Cloud Stream应用程序。这包括应用程序参数,环境变量和YAML或.properties文件。
Spring Cloud Stream Properties
- spring.cloud.stream.instanceCount
应用程序部署实例的数量。必须设置分区,如果使用Kafka。
默认值:
1
。- spring.cloud.stream.instanceIndex
应用程序的实例索引:从
0
到instanceCount
- 1的数字。用于分区和使用Kafka。在Cloud Foundry中自动设置以匹配应用程序的实例索引。- spring.cloud.stream.dynamicDestinations
可以动态绑定的目标列表(例如,在动态路由方案中)。如果设置,只能列出目的地。
默认值:空(允许任何目的地绑定)。
- spring.cloud.stream.defaultBinder
如果配置了多个绑定器,则使用默认的binder。请参阅Classpath上的Multiple Binders。
默认值:空。
- spring.cloud.stream.overrideCloudConnectors
此属性仅适用于
cloud
配置文件激活且Spring Cloud连接器随应用程序一起提供。如果属性为false(默认值),绑定器将检测适合的绑定服务(例如,在Cloud Foundry中为RabbitMQ绑定器绑定的RabbitMQ服务),并将使用它来创建连接(通常通过Spring Cloud连接器)。当设置为true时,此属性指示绑定器完全忽略绑定的服务,并依赖Spring Boot属性(例如,依赖于RabbitMQ绑定器环境中提供的spring.rabbitmq.*
属性)。当连接到多个系统时,此属性的典型用法将嵌套在定制环境中。默认值:false。
绑定Properties
绑定属性使用格式spring.cloud.stream.bindings.<channelName>.<property>=<value>
提供。<channelName>
表示正在配置的通道的名称(例如Source
的output
)。
为了避免重复,Spring Cloud Stream支持所有通道的设置值,格式为spring.cloud.stream.default.<property>=<value>
。
在下面的内容中,我们指出我们在哪里省略了spring.cloud.stream.bindings.<channelName>.
前缀,并且只关注属性名称,但有一个理解,前缀将被包含在运行时。
Properties使用Spring Cloud Stream
以下绑定属性可用于输入和输出绑定,并且必须以spring.cloud.stream.bindings.<channelName>.
为前缀,例如spring.cloud.stream.bindings.input.destination=ticktock
。
可以使用前缀spring.cloud.stream.default
设置默认值,例如spring.cloud.stream.default.contentType=application/json
。
- 目的地
绑定中间件上的通道的目标目标(例如,RabbitMQ交换或Kafka主题)。如果通道绑定为消费者,则可以将其绑定到多个目标,并且目标名称可以指定为逗号分隔的字符串值。如果未设置,则使用通道名称。此属性的默认值不能被覆盖。
- 组
渠道的消费群体。仅适用于入站绑定。参见消费者群体。
默认值:null(表示匿名消费者)。
- 内容类型
频道的内容类型。
默认值:null(以便不执行类型强制)。
- 粘合剂
这种绑定使用的粘合剂。有关详细信息,请参阅Classpath上的Multiple Binders。
默认值:null(默认的binder将被使用,如果存在)。
消费者物业
以下绑定属性仅适用于输入绑定,并且必须以spring.cloud.stream.bindings.<channelName>.consumer.
为前缀,例如spring.cloud.stream.bindings.input.consumer.concurrency=3
。
默认值可以使用前缀spring.cloud.stream.default.consumer
设置,例如spring.cloud.stream.default.consumer.headerMode=raw
。
- 并发
入站消费者的并发性。
默认值:
1
。- 分区
消费者是否从分区生产者接收数据。
默认值:
false
。- headerMode
设置为
raw
时,禁用输入头文件解析。仅适用于不支持消息头的消息中间件,并且需要头部嵌入。入站数据来自外部Spring Cloud Stream应用程序时很有用。默认值:
embeddedHeaders
。- maxAttempts
如果处理失败,则尝试处理消息的次数(包括第一个)。设置为1以禁用重试。
默认值:
3
。- backOffInitialInterval
退避初始间隔重试。
默认值:
1000
。- backOffMaxInterval
最大回退间隔。
默认值:
10000
。- backOffMultiplier
退避倍数。
默认值:
2.0
。- instanceIndex
当设置为大于等于零的值时,允许自定义此消费者的实例索引(如果与
spring.cloud.stream.instanceIndex
不同)。设置为负值时,它将默认为spring.cloud.stream.instanceIndex
。默认值:
-1
。- instanceCount
当设置为大于等于零的值时,允许自定义此消费者的实例计数(如果与
spring.cloud.stream.instanceCount
不同)。当设置为负值时,它将默认为spring.cloud.stream.instanceCount
。默认值:
-1
。
制作人Properties
以下绑定属性仅可用于输出绑定,并且必须以spring.cloud.stream.bindings.<channelName>.producer.
为前缀,例如spring.cloud.stream.bindings.input.producer.partitionKeyExpression=payload.id
。
默认值可以使用前缀spring.cloud.stream.default.producer
设置,例如spring.cloud.stream.default.producer.partitionKeyExpression=payload.id
。
- partitionKeyExpression
一个确定如何分配出站数据的SpEL表达式。如果设置,或者如果设置了
partitionKeyExtractorClass
,则该通道上的出站数据将被分区,并且partitionCount
必须设置为大于1的值才能生效。这两个选项是相互排斥的。请参阅分区支持。默认值:null。
- partitionKeyExtractorClass
一个
PartitionKeyExtractorStrategy
实现。如果设置,或者如果设置了partitionKeyExpression
,则该通道上的出站数据将被分区,并且partitionCount
必须设置为大于1的值才能生效。这两个选项是相互排斥的。请参阅分区支持。默认值:null。
- partitionSelectorClass
一个
PartitionSelectorStrategy
实现。与partitionSelectorExpression
相互排斥。如果没有设置,则分区将被选为hashCode(key) % partitionCount
,其中key
通过partitionKeyExpression
或partitionKeyExtractorClass
计算。默认值:null。
- partitionSelectorExpression
用于自定义分区选择的SpEL表达式。与
partitionSelectorClass
相互排斥。如果没有设置,则分区将被选为hashCode(key) % partitionCount
,其中key
通过partitionKeyExpression
或partitionKeyExtractorClass
计算。默认值:null。
- partitionCount
如果启用分区,则数据的目标分区数。如果生产者被分区,则必须设置为大于1的值。在Kafka,解释为提示; 而是使用更大的和目标主题的分区计数。
默认值:
1
。- requiredGroups
生成者必须确保消息传递的组合的逗号分隔列表,即使它们在创建之后启动(例如,通过在RabbitMQ中预先创建持久队列)。
- headerMode
设置为
raw
时,禁用输出上的标题嵌入。仅适用于不支持消息头的消息中间件,并且需要头部嵌入。生成非Spring Cloud Stream应用程序的数据时很有用。默认值:
embeddedHeaders
。- useNativeEncoding
当设置为
true
时,出站消息由客户端库直接序列化,必须相应配置(例如设置适当的Kafka生产者值序列化程序)。当使用此配置时,出站消息编组不是基于绑定的contentType
。当使用本地编码时,消费者有责任使用适当的解码器(例如:Kafka消费者价值解串器)来对入站消息进行反序列化。此外,当使用本机编码/解码时,headerMode
属性将被忽略,标题不会嵌入到消息中。默认值:
false
。
使用动态绑定目的地
除了通过@EnableBinding
定义的通道之外,Spring Cloud Stream允许应用程序将消息发送到动态绑定的目的地。这是有用的,例如,当目标目标需要在运行时确定。应用程序可以使用@EnableBinding
注册自动注册的BinderAwareChannelResolver
bean。
属性“spring.cloud.stream.dynamicDestinations”可用于将动态目标名称限制为预先已知的集合(白名单)。如果属性未设置,任何目的地都可以动态绑定。
可以直接使用BinderAwareChannelResolver
,如以下示例所示,其中REST控制器使用路径变量来确定目标通道。
@EnableBinding
@Controller
public class SourceWithDynamicDestination {
@Autowired
private BinderAwareChannelResolver resolver;
@RequestMapping(path = "/{target}", method = POST, consumes = "*/*")
@ResponseStatus(HttpStatus.ACCEPTED)
public void handleRequest(@RequestBody String body, @PathVariable("target") target,
@RequestHeader(HttpHeaders.CONTENT_TYPE) Object contentType) {
sendMessage(body, target, contentType);
}
private void sendMessage(String body, String target, Object contentType) {
resolver.resolveDestination(target).send(MessageBuilder.createMessage(body,
new MessageHeaders(Collections.singletonMap(MessageHeaders.CONTENT_TYPE, contentType))));
}
}
在默认端口8080上启动应用程序后,发送以下数据时:
curl -H "Content-Type: application/json" -X POST -d "customer-1" http://localhost:8080/customers curl -H "Content-Type: application/json" -X POST -d "order-1" http://localhost:8080/orders
目的地的客户和“订单”是在经纪人中创建的(例如:在Rabbit的情况下进行交换,或者在Kafka的情况下为主题),其名称为“客户”和“订单”,数据被发布到适当的目的地。
BinderAwareChannelResolver
是通用的Spring Integration DestinationResolver
,可以注入其他组件。例如,在使用基于传入JSON消息的target
字段的SpEL表达式的路由器中。
@EnableBinding
@Controller
public class SourceWithDynamicDestination {
@Autowired
private BinderAwareChannelResolver resolver;
@RequestMapping(path = "/", method = POST, consumes = "application/json")
@ResponseStatus(HttpStatus.ACCEPTED)
public void handleRequest(@RequestBody String body, @RequestHeader(HttpHeaders.CONTENT_TYPE) Object contentType) {
sendMessage(body, contentType);
}
private void sendMessage(Object body, Object contentType) {
routerChannel().send(MessageBuilder.createMessage(body,
new MessageHeaders(Collections.singletonMap(MessageHeaders.CONTENT_TYPE, contentType))));
}
@Bean(name = "routerChannel")
public MessageChannel routerChannel() {
return new DirectChannel();
}
@Bean
@ServiceActivator(inputChannel = "routerChannel")
public ExpressionEvaluatingRouter router() {
ExpressionEvaluatingRouter router =
new ExpressionEvaluatingRouter(new SpelExpressionParser().parseExpression("payload.target"));
router.setDefaultOutputChannelName("default-output");
router.setChannelResolver(resolver);
return router;
}
}