配置选项

优质
小牛编辑
136浏览
2023-12-01

Spring Cloud Stream支持常规配置选项以及绑定和绑定器的配置。一些绑定器允许额外的绑定属性来支持中间件特定的功能。

可以通过Spring Boot支持的任何机制将配置选项提供给Spring Cloud Stream应用程序。这包括应用程序参数,环境变量和YAML或.properties文件。

Spring Cloud Stream Properties

spring.cloud.stream.instanceCount

应用程序部署实例的数量。必须设置分区,如果使用Kafka。

默认值:1

spring.cloud.stream.instanceIndex

应用程序的实例索引:从0instanceCount - 1的数字。用于分区和使用Kafka。在Cloud Foundry中自动设置以匹配应用程序的实例索引。

spring.cloud.stream.dynamicDestinations

可以动态绑定的目标列表(例如,在动态路由方案中)。如果设置,只能列出目的地。

默认值:空(允许任何目的地绑定)。

spring.cloud.stream.defaultBinder

如果配置了多个绑定器,则使用默认的binder。请参阅Classpath上的Multiple Binders。

默认值:空。

spring.cloud.stream.overrideCloudConnectors

此属性仅适用于cloud配置文件激活且Spring Cloud连接器随应用程序一起提供。如果属性为false(默认值),绑定器将检测适合的绑定服务(例如,在Cloud Foundry中为RabbitMQ绑定器绑定的RabbitMQ服务),并将使用它来创建连接(通常通过Spring Cloud连接器)。当设置为true时,此属性指示绑定器完全忽略绑定的服务,并依赖Spring Boot属性(例如,依赖于RabbitMQ绑定器环境中提供的spring.rabbitmq.*属性)。当连接到多个系统时,此属性的典型用法将嵌套在定制环境中。

默认值:false。

绑定Properties

绑定属性使用格式spring.cloud.stream.bindings.<channelName>.<property>=<value>提供。<channelName>表示正在配置的通道的名称(例如Sourceoutput)。

为了避免重复,Spring Cloud Stream支持所有通道的设置值,格式为spring.cloud.stream.default.<property>=<value>

在下面的内容中,我们指出我们在哪里省略了spring.cloud.stream.bindings.<channelName>.前缀,并且只关注属性名称,但有一个理解,前缀将被包含在运行时。

Properties使用Spring Cloud Stream

以下绑定属性可用于输入和输出绑定,并且必须以spring.cloud.stream.bindings.<channelName>.为前缀,例如spring.cloud.stream.bindings.input.destination=ticktock

可以使用前缀spring.cloud.stream.default设置默认值,例如spring.cloud.stream.default.contentType=application/json

目的地

绑定中间件上的通道的目标目标(例如,RabbitMQ交换或Kafka主题)。如果通道绑定为消费者,则可以将其绑定到多个目标,并且目标名称可以指定为逗号分隔的字符串值。如果未设置,则使用通道名称。此属性的默认值不能被覆盖。

渠道的消费群体。仅适用于入站绑定。参见消费者群体。

默认值:null(表示匿名消费者)。

内容类型

频道的内容类型。

默认值:null(以便不执行类型强制)。

粘合剂

这种绑定使用的粘合剂。有关详细信息,请参阅Classpath上的Multiple Binders。

默认值:null(默认的binder将被使用,如果存在)。

消费者物业

以下绑定属性仅适用于输入绑定,并且必须以spring.cloud.stream.bindings.<channelName>.consumer.为前缀,例如spring.cloud.stream.bindings.input.consumer.concurrency=3

默认值可以使用前缀spring.cloud.stream.default.consumer设置,例如spring.cloud.stream.default.consumer.headerMode=raw

并发

入站消费者的并发性。

默认值:1

分区

消费者是否从分区生产者接收数据。

默认值:false

headerMode

设置为raw时,禁用输入头文件解析。仅适用于不支持消息头的消息中间件,并且需要头部嵌入。入站数据来自外部Spring Cloud Stream应用程序时很有用。

默认值:embeddedHeaders

maxAttempts

如果处理失败,则尝试处理消息的次数(包括第一个)。设置为1以禁用重试。

默认值:3

backOffInitialInterval

退避初始间隔重试。

默认值:1000

backOffMaxInterval

最大回退间隔。

默认值:10000

backOffMultiplier

退避倍数。

默认值:2.0

instanceIndex

当设置为大于等于零的值时,允许自定义此消费者的实例索引(如果与spring.cloud.stream.instanceIndex不同)。设置为负值时,它将默认为spring.cloud.stream.instanceIndex

默认值:-1

instanceCount

当设置为大于等于零的值时,允许自定义此消费者的实例计数(如果与spring.cloud.stream.instanceCount不同)。当设置为负值时,它将默认为spring.cloud.stream.instanceCount

默认值:-1

制作人Properties

以下绑定属性仅可用于输出绑定,并且必须以spring.cloud.stream.bindings.<channelName>.producer.为前缀,例如spring.cloud.stream.bindings.input.producer.partitionKeyExpression=payload.id

默认值可以使用前缀spring.cloud.stream.default.producer设置,例如spring.cloud.stream.default.producer.partitionKeyExpression=payload.id

partitionKeyExpression

一个确定如何分配出站数据的SpEL表达式。如果设置,或者如果设置了partitionKeyExtractorClass,则该通道上的出站数据将被分区,并且partitionCount必须设置为大于1的值才能生效。这两个选项是相互排斥的。请参阅分区支持。

默认值:null。

partitionKeyExtractorClass

一个PartitionKeyExtractorStrategy实现。如果设置,或者如果设置了partitionKeyExpression,则该通道上的出站数据将被分区,并且partitionCount必须设置为大于1的值才能生效。这两个选项是相互排斥的。请参阅分区支持。

默认值:null。

partitionSelectorClass

一个PartitionSelectorStrategy实现。与partitionSelectorExpression相互排斥。如果没有设置,则分区将被选为hashCode(key) % partitionCount,其中key通过partitionKeyExpressionpartitionKeyExtractorClass计算。

默认值:null。

partitionSelectorExpression

用于自定义分区选择的SpEL表达式。与partitionSelectorClass相互排斥。如果没有设置,则分区将被选为hashCode(key) % partitionCount,其中key通过partitionKeyExpressionpartitionKeyExtractorClass计算。

默认值:null。

partitionCount

如果启用分区,则数据的目标分区数。如果生产者被分区,则必须设置为大于1的值。在Kafka,解释为提示; 而是使用更大的和目标主题的分区计数。

默认值:1

requiredGroups

生成者必须确保消息传递的组合的逗号分隔列表,即使它们在创建之后启动(例如,通过在RabbitMQ中预先创建持久队列)。

headerMode

设置为raw时,禁用输出上的标题嵌入。仅适用于不支持消息头的消息中间件,并且需要头部嵌入。生成非Spring Cloud Stream应用程序的数据时很有用。

默认值:embeddedHeaders

useNativeEncoding

当设置为true时,出站消息由客户端库直接序列化,必须相应配置(例如设置适当的Kafka生产者值序列化程序)。当使用此配置时,出站消息编组不是基于绑定的contentType。当使用本地编码时,消费者有责任使用适当的解码器(例如:Kafka消费者价值解串器)来对入站消息进行反序列化。此外,当使用本机编码/解码时,headerMode属性将被忽略,标题不会嵌入到消息中。

默认值:false

使用动态绑定目的地

除了通过@EnableBinding定义的通道之外,Spring Cloud Stream允许应用程序将消息发送到动态绑定的目的地。这是有用的,例如,当目标目标需要在运行时确定。应用程序可以使用@EnableBinding注册自动注册的BinderAwareChannelResolver bean。

属性“spring.cloud.stream.dynamicDestinations”可用于将动态目标名称限制为预先已知的集合(白名单)。如果属性未设置,任何目的地都可以动态绑定。

可以直接使用BinderAwareChannelResolver,如以下示例所示,其中REST控制器使用路径变量来确定目标通道。

@EnableBinding
@Controller
public class SourceWithDynamicDestination {
	@Autowired
	private BinderAwareChannelResolver resolver;
	@RequestMapping(path = "/{target}", method = POST, consumes = "*/*")
	@ResponseStatus(HttpStatus.ACCEPTED)
	public void handleRequest(@RequestBody String body, @PathVariable("target") target,
	    @RequestHeader(HttpHeaders.CONTENT_TYPE) Object contentType) {
		sendMessage(body, target, contentType);
	}
	private void sendMessage(String body, String target, Object contentType) {
		resolver.resolveDestination(target).send(MessageBuilder.createMessage(body,
				new MessageHeaders(Collections.singletonMap(MessageHeaders.CONTENT_TYPE, contentType))));
	}
}

在默认端口8080上启动应用程序后,发送以下数据时:

curl -H "Content-Type: application/json" -X POST -d "customer-1" http://localhost:8080/customers
curl -H "Content-Type: application/json" -X POST -d "order-1" http://localhost:8080/orders

目的地的客户和“订单”是在经纪人中创建的(例如:在Rabbit的情况下进行交换,或者在Kafka的情况下为主题),其名称为“客户”和“订单”,数据被发布到适当的目的地。

BinderAwareChannelResolver是通用的Spring Integration DestinationResolver,可以注入其他组件。例如,在使用基于传入JSON消息的target字段的SpEL表达式的路由器中。

@EnableBinding
@Controller
public class SourceWithDynamicDestination {
	@Autowired
	private BinderAwareChannelResolver resolver;
	@RequestMapping(path = "/", method = POST, consumes = "application/json")
	@ResponseStatus(HttpStatus.ACCEPTED)
	public void handleRequest(@RequestBody String body, @RequestHeader(HttpHeaders.CONTENT_TYPE) Object contentType) {
		sendMessage(body, contentType);
	}
	private void sendMessage(Object body, Object contentType) {
		routerChannel().send(MessageBuilder.createMessage(body,
				new MessageHeaders(Collections.singletonMap(MessageHeaders.CONTENT_TYPE, contentType))));
	}
	@Bean(name = "routerChannel")
	public MessageChannel routerChannel() {
		return new DirectChannel();
	}
	@Bean
	@ServiceActivator(inputChannel = "routerChannel")
	public ExpressionEvaluatingRouter router() {
    ExpressionEvaluatingRouter router =
      new ExpressionEvaluatingRouter(new SpelExpressionParser().parseExpression("payload.target"));
		router.setDefaultOutputChannelName("default-output");
		router.setChannelResolver(resolver);
		return router;
	}
}