入门

优质
小牛编辑
137浏览
2023-12-01

要开始创建Spring Cloud Stream应用程序,请访问Spring Initializr并创建一个名为“GreetingSource”的新Maven项目。在下拉菜单中选择Spring Boot {supported-spring-boot-version}。在“ 搜索依赖关系”文本框中键入Stream RabbitStream Kafka,具体取决于您要使用的binder。

接下来,在与GreetingSourceApplication类相同的包中创建一个新类GreetingSource。给它以下代码:

import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.integration.annotation.InboundChannelAdapter;
@EnableBinding(Source.class)
public class GreetingSource {
  @InboundChannelAdapter(Source.OUTPUT)
  public String greet() {
    return "hello world " + System.currentTimeMillis();
  }
}

@EnableBinding注释是触发Spring Integration基础架构组件的创建。具体来说,它将创建一个Kafka连接工厂,一个Kafka出站通道适配器,并在Source界面中定义消息通道:

public interface Source {
  String OUTPUT = "output";
  @Output(Source.OUTPUT)
  MessageChannel output();
}

自动配置还创建一个默认轮询器,以便每秒调用greet()方法一次。标准的Spring Integration @InboundChannelAdapter注释使用返回值作为消息的有效内容向源的输出通道发送消息。

要测试驱动此设置,请运行Kafka消息代理。一个简单的方法是使用Docker镜像:

# On OS X
$ docker run -p 2181:2181 -p 9092:9092 --env ADVERTISED_HOST=`docker-machine ip \`docker-machine active\`` --env ADVERTISED_PORT=9092 spotify/kafka
# On Linux
$ docker run -p 2181:2181 -p 9092:9092 --env ADVERTISED_HOST=localhost --env ADVERTISED_PORT=9092 spotify/kafka

构建应用程序:

./mvnw clean package

消费者应用程序以类似的方式进行编码。返回Initializr并创建另一个名为LoggingSink的项目。然后在与类LoggingSinkApplication相同的包中创建一个新类LoggingSink,并使用以下代码:

import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
@EnableBinding(Sink.class)
public class LoggingSink {
  @StreamListener(Sink.INPUT)
  public void log(String message) {
    System.out.println(message);
  }
}

构建应用程序:

./mvnw clean package

要将GreetingSource应用程序连接到LoggingSink应用程序,每个应用程序必须共享相同的目标名称。启动这两个应用程序如下所示,您将看到消费者应用程序打印“hello world”和时间戳到控制台:

cd GreetingSource
java -jar target/GreetingSource-0.0.1-SNAPSHOT.jar --spring.cloud.stream.bindings.output.destination=mydest
cd LoggingSink
java -jar target/LoggingSink-0.0.1-SNAPSHOT.jar --server.port=8090 --spring.cloud.stream.bindings.input.destination=mydest

(不同的服务器端口可以防止两个应用程序中用于维护Spring Boot执行器端点的HTTP端口的冲突。)

LoggingSink应用程序的输出将如下所示:

[      main] s.b.c.e.t.TomcatEmbeddedServletContainer : Tomcat started on port(s): 8090 (http)
[      main] com.example.LoggingSinkApplication    : Started LoggingSinkApplication in 6.828 seconds (JVM running for 7.371)
hello world 1458595076731
hello world 1458595077732
hello world 1458595078733
hello world 1458595079734
hello world 1458595080735