The goal of this project is to implement a "News" processing pipeline composed of five Spring Boot
applications: producer-api
, categorizer-service
, collector-service
, publisher-api
and news-client
.
Spring Cloud Stream
to build highly scalable event-driven applications connected with shared messaging systems;
Spring Cloud Schema Registry
that supports schema evolution so that the data can be evolved over time; besides, it lets you store schema information in a textual format (typically JSON) and makes that information accessible to various applications that need it to receive and send data in binary format;
Spring Data Elasticsearch
to persist data in Elasticsearch
;
Spring Cloud OpenFeign
to write web service clients easily;
Thymeleaf
as HTML template;
Zipkin
to visualize traces between and within applications;
Eureka
as service registration and discovery.
Note
|
In docker-swarm-environment repository, it is shown how to deploy this project into a cluster of Docker Engines in swarm mode. |
producer-api
Spring Boot
Web Java application that creates news and pushes news events to producer.news
topic in Kafka
.
categorizer-service
Spring Boot
Web Java application that listens to news events in producer.news
topic in Kafka
, categorizes and pushes them to categorizer.news
topic.
collector-service
Spring Boot
Web Java application that listens for news events in categorizer.news
topic in Kafka
, saves them in Elasticsearch
and pushes the news events to collector.news
topic.
publisher-api
Spring Boot
Web Java application that reads directly from Elasticsearch
and exposes a REST API. It doesn’t listen from Kafka
.
news-client
Spring Boot
Web java application that provides a User Interface to see the news. It implements a Websocket
that consumes news events from the topic collector.news
. So, news are updated on the fly on the main page. Besides, news-client
communicates directly with publisher-api
whenever search for a specific news or news update are needed.
The Websocket
operation is shown in the short gif below. News is created in producer-api
and, immediately, it is shown in news-client
.
In a terminal, make sure you are in spring-cloud-stream-kafka-elasticsearch
root folder
Run the following command to generate NewsEvent
./mvnw clean install --projects commons-news
It will install commons-news-1.0.0.jar
in you local Maven
repository, so that it can be visible by all services.
Open a terminal and inside spring-cloud-stream-kafka-elasticsearch
root folder run
docker-compose up -d
Wait for containers to be with status running (healthy)
. To check it, run
docker-compose ps
Inside spring-cloud-stream-kafka-elasticsearch
root folder, run the following Maven
commands in different terminals
eureka-server
./mvnw clean spring-boot:run --projects eureka-server
producer-api
./mvnw clean spring-boot:run --projects producer-api -Dspring-boot.run.jvmArguments="-Dserver.port=9080"
categorizer-service
./mvnw clean spring-boot:run --projects categorizer-service -Dspring-boot.run.jvmArguments="-Dserver.port=9081"
collector-service
./mvnw clean spring-boot:run --projects collector-service -Dspring-boot.run.jvmArguments="-Dserver.port=9082"
publisher-api
./mvnw clean spring-boot:run --projects publisher-api -Dspring-boot.run.jvmArguments="-Dserver.port=9083"
news-client
./mvnw clean spring-boot:run --projects news-client
In a terminal, make sure you are in spring-cloud-stream-kafka-elasticsearch
root folder
In order to build the application’s docker images, run the following script
./docker-build.sh
producer-api
Environment Variable | Description |
---|---|
|
Specify host of the |
|
Specify port of the |
|
Specify host of the |
|
Specify port of the |
|
Specify host of the |
|
Specify port of the |
|
Specify host of the |
|
Specify port of the |
categorizer-service
Environment Variable | Description |
---|---|
|
Specify host of the |
|
Specify port of the |
|
Specify host of the |
|
Specify port of the |
|
Specify host of the |
|
Specify port of the |
|
Specify host of the |
|
Specify port of the |
collector-service
Environment Variable | Description |
---|---|
|
Specify host of the |
|
Specify nodes port of the |
|
Specify rest port of the |
|
Specify host of the |
|
Specify port of the |
|
Specify host of the |
|
Specify port of the |
|
Specify host of the |
|
Specify port of the |
|
Specify host of the |
|
Specify port of the |
publisher-api
Environment Variable | Description |
---|---|
|
Specify host of the |
|
Specify nodes port of the |
|
Specify rest port of the |
|
Specify host of the |
|
Specify port of the |
|
Specify host of the |
|
Specify port of the |
news-client
Environment Variable | Description |
---|---|
|
Specify host of the |
|
Specify port of the |
|
Specify host of the |
|
Specify port of the |
|
Specify host of the |
|
Specify port of the |
|
Specify host of the |
|
Specify port of the |
In a terminal, make sure you are inside spring-cloud-stream-kafka-elasticsearch
root folder
Run following script
./start-apps.sh
Application | URL |
---|---|
producer-api |
|
publisher-api |
|
news-client |
Eureka
Eureka
can be accessed at http://localhost:8761
Zipkin
Zipkin
can be accessed at http://localhost:9411
The figure below shows an example of the complete flow a news passes through. It goes since producer-api
, where the news is created, until news-client
.
Kafka Topics UI
Kafka Topics UI
can be accessed at http://localhost:8085
Kafka Manager
Kafka Manager
can be accessed at http://localhost:9000
The figure below shows the Kafka topics consumers. As we can see, the consumers are updated as the lag
is 0
Configuration
First, you must create a new cluster. Click on Cluster
(dropdown button on the header) and then on Add Cluster
Type the name of your cluster in Cluster Name
field, for example: MyCluster
Type zookeeper:2181
in Cluster Zookeeper Hosts
field
Enable checkbox Poll consumer information (Not recommended for large # of consumers if ZK is used for offsets tracking on older Kafka versions)
Click on Save
button at the bottom of the page.
Schema Registry UI
Schema Registry UI
can be accessed at http://localhost:8001
Elasticsearch REST API
Check ES is up and running
curl localhost:9200
Check indexes in ES
curl "localhost:9200/_cat/indices?v"
Check news index mapping
curl "localhost:9200/news/_mapping?pretty"
Simple search
curl "localhost:9200/news/_search?pretty"
To stop applications
If they were started with Maven
, go to the terminals where they are running and press Ctrl+C
If they were started as Docker containers, go to a terminal and, inside spring-cloud-stream-kafka-elasticsearch
root folder, run the script below
./stop-apps.sh
To stop and remove docker-compose containers, network and volumes, go to a terminal and, inside spring-cloud-stream-kafka-elasticsearch
root folder, run the following command
docker-compose down -v
To remove the Docker images created by this project, go to a terminal and, inside spring-cloud-stream-kafka-elasticsearch
root folder, run the script below
./remove-docker-images.sh
一、引入依赖包 org.springframework.cloud spring-cloud-stream org.springframework.cloud spring-cloud-stream-binder-kafka 二、自定义信息通道 官方提供了Sink(输入通道)、Source(输出通道)、Processor(集成Sink和Source通道),我们也可以自定义我们自己的信息通道。 @I
SpringCloud Sleuth Stream Zipkin Kafka Elasticsearch 实现简单链路跟踪 注意版本号zipkin使用的是2.4.2,SpringCloud版本Dalston.SR5 服务端主要配置 pom配置:: <!-- zipkin + kafka +es --> <dependency> <groupId>org.sp
在前面的文章中,我们已经成功的使用Zipkin收集了项目的调用链日志。但是呢,由于我们收集链路信息时采用的是http请求方式收集的,而且链路信息没有进行保存,ZipkinServer一旦重启后就会所有信息都会消失了。基于性能的考虑,我们可以对它进行改造,使用SpringCloud Stream进行消息传递,使用Elasticsearch进行消息的存储。 参考文章 Zipkin全链路监控 Sprin
jar包启动参数 java -jar -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=128m -Xms1024m -Xmx1024m -Xmn256m -Xss256k -XX:SurvivorRatio=8 -XX:+UseConcMarkSweepGC xxx.jar springboot配置 # --------------------------
一、背景 废话不多说,直接上代码 二、pom.xml <properties> <jdk.version>1.8</jdk.version> <java.version>1.8</java.version> <maven.compiler.source>8</maven.compiler.source> <maven.compiler.target>8</maven
我最近开始为Kafka研究Spring Cloud Stream,并且一直在努力使TestBinder与Kstreams一起工作。这是一个已知的限制,还是我忽略了什么? 这很好: 字符串处理器: 字符串测试: 但当我试图在流程中使用KStream时,我无法让TestBinder正常工作。 Kstream处理器: KStream测试: 正如您可能已经注意到的,我从Kstream处理器中省略了@Str
试图开发一个Spring云应用程序,使用kafka Kafka使用的配置是: 运行应用程序,我可以看到这些配置被选中 问题是以下错误消息: 如何配置这个“AdminClient”并将正确的主机/ip信息传递给它?查看了Spring Cloud Stream Kafka活页夹参考指南,但找不到答案。
Spring Cloud Stream应用程序的配置是否正确。我们有Spring Cloud Stream文档解释与Kafka的SSL连接吗?
我正在尝试使用Kafka使用Spring Boot云流设置一个项目。我设法构建了一个简单的示例,其中侦听器从一个主题获取消息,并在处理后将输出发送到另一个主题。 我的侦听器和频道配置如下: 此示例的问题在于,当服务启动时,它不会检查主题中已存在的消息,它只处理启动后发送的那些消息。我对 Springboot 流和 kafka 很陌生,但对于我所读到的内容,这种行为可能与我正在使用的事实相对应。例如
Spring cloud stream starter kafka在连接消费者时没有加载配置。以下是我在调试模式下运行控制台时在控制台中看到的配置: 我有以下引导yml文件的配置部分
使用Spring-Cloud-Stream的kafka绑定器,如何配置并发消息消费者(在单个消费者jvm中)?如果我没有理解错的话,在使用kafka时并发使用消息需要分区,但是s-c-s文档指出,要使用分区,您需要通过partitionKeyExpression或PartitionKeyExtractorClass在生成器中指定分区选择。Kafka博士提到循环分区。 s-c-s文档根本没有提到sp