spring-cloud-stream-reactive是spring-cloud-stream发布订阅消息驱动的响应式编程组件。提供异步非阻塞消息发布订阅。
下面是一大坨说明=-=
通过output输出管道 和 input输入管道来发布和订阅消息,spring-cloud-stream的binder负责将管道适配到指定的消息中间件,并负责与消息中间件交互(相当于在管道和消息中间件之间加了一层缓冲,可以使上层不需要关注底层中间件的差异, 中间件的变动只需要替换binder即可,业务代码不需要任何改动)。此外需要借助spring-integration来连接消息代理中间件以实现消息事件驱动,目前提供rabbitMq和kafka两个消息中间件的binder实现,只需引入就会自动绑定。主要有发布-订阅、消费组、分区的三个核心概念。
发布-订阅:由生产者发布消息到队列或者某个管道或者消息中间件,消费者从指定队列或者管道消费处理消息
消费组:当同一个消费者程序,启动多个同时消费相同的队列或者消息组件中的消息时,会存在重复消费,消费组就是将同一个消费者的多个实例配置到相同的组,达到同一组内只有一个消费者会消费到同一个消息达到避免重复消费的问题。
消费分区:当设置了消费组之后,在某些场景下,需要让消息内容中包含某个数据的消息发到同一个实例上消费,而不是被随机的某个实例消费到(类似Nginx的ip hash,同一个客户端的请求发到固定的一个后端服务上),此时可以通过消费分区,通过为消费者配置开启分区,生产者配置分区key表达式来实现。(此消费分区是由spring-cloud-stream实现的应用层消费分区,所以对于不支持消费分区的消息中间件也是可以实现消费分区的)
个人理解,有误需打脸纠正?
下面是一个spring-cloud-stream-reactive响应式发布订阅的例子:相关注解使用请查资料
主要依赖:spring-cloud版本 Greenwich.RELEASE。
//spring-cloud-stream 自动绑定rabbitmq的依赖
implementation 'org.springframework.cloud:spring-cloud-starter-stream-rabbit'
//reactive响应式编程
implementation "org.springframework.cloud:spring-cloud-stream-reactive"
//lombok idea(settings->compiler->enable annotation processor
annotationProcessor 'org.projectlombok:lombok'
compileOnly "org.projectlombok:lombok"
生产者:
package tom.spring.cloud.stream.reactive.sender;
import lombok.extern.slf4j.Slf4j;
import org.reactivestreams.Publisher;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.reactive.FluxSender;
import org.springframework.cloud.stream.reactive.StreamEmitter;
import org.springframework.context.annotation.Bean;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import reactor.core.publisher.Flux;
import tom.spring.cloud.msg.MyInput;
import tom.spring.cloud.msg.MyOutput;
import java.time.Duration;
import java.util.Date;
import java.util.concurrent.TimeUnit;
/**
* reactive 生产者
*/
@Slf4j
@SpringBootApplication
@EnableBinding({MyOutput.class, MyInput.class})
public class RxProducer {
/**
* 响应式发送StreamEmitter
* 方式1
* @return
*/
@StreamEmitter
@Output(MyOutput.OUTPUT)
public Flux<Date> emit010() {
return Flux.interval(Duration.ofSeconds(2)).map(i -> {Date date = new Date();
log.info("emit010 : {}", date); return date;});
}
/**
* 方式2
* @param output
*/
@StreamEmitter
@Output(MyOutput.OUTPUT)
public void emit011(FluxSender output) {
output.send(Flux.interval(Duration.ofSeconds(2)).map(i -> {Date date = new Date();
log.info("emit011 : {}", date); return date;}));
}
/**
* 方式3
* @param output
*/
@StreamEmitter
public void emit012(@Output(MyOutput.OUTPUT) FluxSender output) {
output.send(Flux.interval(Duration.ofSeconds(2)).map(i -> {Date date = new Date();
log.info("emit012 : {}", date); return date;}));
}
/**
* 方式4
* @return
*/
@Bean
@StreamEmitter
@Output(MyOutput.OUTPUT)
public Publisher<Message<Date>> emit013() {
return IntegrationFlows.from(
() -> {Date date = new Date();log.info("emit013 : {}", date); return MessageBuilder.withPayload(date).build();},
e -> e.poller(p -> p.fixedRate(2, TimeUnit.SECONDS))
).toReactivePublisher();
}
/**
* 监听反馈, INPUT对应的channel为output_1,消费者反馈信息到该通道
* @param flux
*/
@StreamListener
public void feedback(@Input(MyInput.INPUT) Flux<String> flux) {
flux.subscribe(m -> log.info("feedback : {}", m));
}
public static void main(String[] args) {
String[] defArgs = new String[args.length + 1];
defArgs[args.length] = "--spring.profiles.active=producer";
SpringApplication.run(RxProducer.class, defArgs);
}
}
消费者:
package tom.spring.cloud.stream.reactive.receiver;
import lombok.extern.slf4j.Slf4j;
import org.apache.http.client.utils.DateUtils;
import org.apache.tomcat.util.buf.StringUtils;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.reactive.FluxSender;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.SubscribableChannel;
import org.springframework.messaging.handler.annotation.SendTo;
import reactor.core.publisher.Flux;
import tom.spring.cloud.msg.MyInput;
import tom.spring.cloud.msg.MyOutput;
import java.util.Date;
/**
* reactive 消费者
* https://docs.spring.io/spring-cloud-stream/docs/current/reference/htmlsingle/#spring-cloud-stream-overview-reactive-programming-support
* spring-cloud-stream-reactive 响应式编程
* Spring-cloud-stream-rxjava 早期版本,已废弃
*/
@Slf4j
@SpringBootApplication
@EnableBinding({MyInput.class, MyOutput.class})
public class RxConsumer {
/**
* 响应式编程方式1
* @return
*/
@StreamListener
public @Output(MyOutput.OUTPUT) Flux<String> receive020(@Input(MyInput.INPUT) Flux<Date> input) {
//聚合3次接收做一次反馈
return input.map(d -> DateUtils.formatDate(d, "HH:mm:ss"))
.log().buffer(3).map(ds -> "rx receive1 : " + StringUtils.join(ds, ','));
}
/**
* 方式2
* @param input
* @return
*/
@StreamListener(MyInput.INPUT)
@SendTo(MyOutput.OUTPUT)
public Flux<String> receive021(Flux<Date> input) {
//聚合3次接收做一次反馈
return input.map(d -> DateUtils.formatDate(d, "HH:mm:ss"))
.log().buffer(3).map(ds -> "rx receive2 : " + StringUtils.join(ds, ','));
}
/**
* 方式3
* @param input
* @param output
*/
@StreamListener
public void receive022(@Input(MyInput.INPUT) Flux<Date> input, @Output(MyOutput.OUTPUT) FluxSender output) {
//聚合3次接收做一次反馈
output.send(
input.map(d -> DateUtils.formatDate(d, "HH:mm:ss"))
.log().buffer(3).map(ds -> "rx receive3 : " + StringUtils.join(ds, ','))
);
}
public static void main(String[] args) {
String[] defArgs = new String[args.length + 1];
defArgs[args.length] = "--spring.profiles.active=consumer";
SpringApplication.run(RxConsumer.class, defArgs);
}
}
两个管道接口的定义,可以使用spring自带的Sink和Source或者Processor,这里自己定义的为了学习而已
/**
* 输出管道,消息发送到消息中间件,生产者,管道返回类型必须为MessageChannel
*
* @author tom
*/
public interface MyOutput {
String OUTPUT = "myOutput";
@Output(OUTPUT)
MessageChannel output();
}
/**
* 输入管道,监听接收消息,消息消费者,管道返回类型必须为SubscribableChannel
* @author to
*/
public interface MyInput {
String INPUT = "myInput";
@Input(INPUT)
SubscribableChannel input();
}
application-producer.yml:
server:
#RandomValuePropertySource
port: ${random.int[1024,65535]}
spring:
cloud:
stream:
#BindingServiceProperties
bindings:
myInput:
#kafka中的topic, rabbit中的exchange
#生产者接收回馈信息
destination: output_1
#消费组,处于同一个消费组的消费者,消息只会被其中一个消费
#默认情况下每个消费者处于独立的匿名组
group: consumer_g_1
#producer:
#消费者开启消费分区后,生产者执行分区表达式,来计算通过什么来分区
#SPEL表达式
#partitionKeyExpression: payload
#分区数
#partitionCount: 2
myOutput:
#kafka中的topic, rabbit中的exchange
#生产者生产输出
destination: input_1
application:
name: stream02-producer
#RabbitProperties 以下为默认配置
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
logging:
pattern:
file: '%d{yyyy/MM/dd-HH:mm:ss} [%thread] %-5level %logger- %msg%n'
console: '%d{yyyy/MM/dd-HH:mm:ss} [%thread] %-5level %logger- %msg%n'
file: ./log/${spring.application.name}.log
level:
tom.spring: DEBUG
application-consumer.yml:
server:
#RandomValuePropertySource
port: ${random.int[1024,65535]}
spring:
cloud:
stream:
#BindingServiceProperties
bindings:
myInput:
#kafka中的topic, rabbit中的exchange
#消费者接收信息通道
destination: input_1
#消费组,多实例处于同一个消费组的消费者,消息只会被其中一个消费者实例消费
#默认情况下每个消费者处于独立的匿名组
group: consumer_g_1
#consumer:
#开启消费分区,根据某个标识,将有一类相同标识的数据由同一个消费者消费
#partitioned: true
#实例数和实例索引,不同的实例instanceIndex必须不同
#instanceCount: 2
#instanceIndex: 1
myOutput:
#kafka中的topic, rabbit中的exchange
#消费者回馈信息通道
destination: output_1
application:
name: stream02-consumer
#RabbitProperties 以下为默认配置
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
logging:
pattern:
file: '%d{yyyy/MM/dd-HH:mm:ss} [%thread] %-5level %logger- %msg%n'
console: '%d{yyyy/MM/dd-HH:mm:ss} [%thread] %-5level %logger- %msg%n'
file: ./log/${spring.application.name}.log
level:
tom.spring: DEBUG