入门
要开始创建Spring Cloud Stream应用程序,请访问Spring Initializr并创建一个名为“GreetingSource”的新Maven项目。在下拉菜单中选择Spring Boot {supported-spring-boot-version}。在“ 搜索依赖关系”文本框中键入Stream Rabbit
或Stream Kafka
,具体取决于您要使用的binder。
接下来,在与GreetingSourceApplication
类相同的包中创建一个新类GreetingSource
。给它以下代码:
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.integration.annotation.InboundChannelAdapter;
@EnableBinding(Source.class)
public class GreetingSource {
@InboundChannelAdapter(Source.OUTPUT)
public String greet() {
return "hello world " + System.currentTimeMillis();
}
}
@EnableBinding
注释是触发Spring Integration基础架构组件的创建。具体来说,它将创建一个Kafka连接工厂,一个Kafka出站通道适配器,并在Source界面中定义消息通道:
public interface Source {
String OUTPUT = "output";
@Output(Source.OUTPUT)
MessageChannel output();
}
自动配置还创建一个默认轮询器,以便每秒调用greet()
方法一次。标准的Spring Integration @InboundChannelAdapter
注释使用返回值作为消息的有效内容向源的输出通道发送消息。
要测试驱动此设置,请运行Kafka消息代理。一个简单的方法是使用Docker镜像:
# On OS X
$ docker run -p 2181:2181 -p 9092:9092 --env ADVERTISED_HOST=`docker-machine ip \`docker-machine active\`` --env ADVERTISED_PORT=9092 spotify/kafka
# On Linux
$ docker run -p 2181:2181 -p 9092:9092 --env ADVERTISED_HOST=localhost --env ADVERTISED_PORT=9092 spotify/kafka
构建应用程序:
./mvnw clean package
消费者应用程序以类似的方式进行编码。返回Initializr并创建另一个名为LoggingSink的项目。然后在与类LoggingSinkApplication
相同的包中创建一个新类LoggingSink
,并使用以下代码:
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
@EnableBinding(Sink.class)
public class LoggingSink {
@StreamListener(Sink.INPUT)
public void log(String message) {
System.out.println(message);
}
}
构建应用程序:
./mvnw clean package
要将GreetingSource应用程序连接到LoggingSink应用程序,每个应用程序必须共享相同的目标名称。启动这两个应用程序如下所示,您将看到消费者应用程序打印“hello world”和时间戳到控制台:
cd GreetingSource
java -jar target/GreetingSource-0.0.1-SNAPSHOT.jar --spring.cloud.stream.bindings.output.destination=mydest
cd LoggingSink
java -jar target/LoggingSink-0.0.1-SNAPSHOT.jar --server.port=8090 --spring.cloud.stream.bindings.input.destination=mydest
(不同的服务器端口可以防止两个应用程序中用于维护Spring Boot执行器端点的HTTP端口的冲突。)
LoggingSink应用程序的输出将如下所示:
[ main] s.b.c.e.t.TomcatEmbeddedServletContainer : Tomcat started on port(s): 8090 (http)
[ main] com.example.LoggingSinkApplication : Started LoggingSinkApplication in 6.828 seconds (JVM running for 7.371)
hello world 1458595076731
hello world 1458595077732
hello world 1458595078733
hello world 1458595079734
hello world 1458595080735