当前位置: 首页 > 软件库 > Web应用开发 > >

spring-cloud-stream-kafka-elasticsearch

授权协议 Readme
开发语言 Java
所属分类 Web应用开发
软件类型 开源软件
地区 不详
投 递 者 端木骞尧
操作系统 跨平台
开源组织
适用人群 未知
 软件概览

spring-cloud-stream-kafka-elasticsearch

The goal of this project is to implement a "News" processing pipeline composed of five Spring Boot applications: producer-api, categorizer-service, collector-service, publisher-api and news-client.

Technologies used

  • Spring Cloud Stream to build highly scalable event-driven applications connected with shared messaging systems;

  • Spring Cloud Schema Registry that supports schema evolution so that the data can be evolved over time; besides, it lets you store schema information in a textual format (typically JSON) and makes that information accessible to various applications that need it to receive and send data in binary format;

  • Spring Data Elasticsearch to persist data in Elasticsearch;

  • Spring Cloud OpenFeign to write web service clients easily;

  • Thymeleaf as HTML template;

  • Zipkin to visualize traces between and within applications;

  • Eureka as service registration and discovery.

Note
In docker-swarm-environment repository, it is shown how to deploy this project into a cluster of Docker Engines in swarm mode.

Project Architecture

Applications

  • producer-api

    Spring Boot Web Java application that creates news and pushes news events to producer.news topic in Kafka.

  • categorizer-service

    Spring Boot Web Java application that listens to news events in producer.news topic in Kafka, categorizes and pushes them to categorizer.news topic.

  • collector-service

    Spring Boot Web Java application that listens for news events in categorizer.news topic in Kafka, saves them in Elasticsearch and pushes the news events to collector.news topic.

  • publisher-api

    Spring Boot Web Java application that reads directly from Elasticsearch and exposes a REST API. It doesn’t listen from Kafka.

  • news-client

    Spring Boot Web java application that provides a User Interface to see the news. It implements a Websocket that consumes news events from the topic collector.news. So, news are updated on the fly on the main page. Besides, news-client communicates directly with publisher-api whenever search for a specific news or news update are needed.

    The Websocket operation is shown in the short gif below. News is created in producer-api and, immediately, it is shown in news-client.

Generate NewsEvent

  • In a terminal, make sure you are in spring-cloud-stream-kafka-elasticsearch root folder

  • Run the following command to generate NewsEvent

    ./mvnw clean install --projects commons-news

    It will install commons-news-1.0.0.jar in you local Maven repository, so that it can be visible by all services.

Start Environment

  • Open a terminal and inside spring-cloud-stream-kafka-elasticsearch root folder run

    docker-compose up -d
  • Wait for containers to be with status running (healthy). To check it, run

    docker-compose ps

Running Applications with Maven

Inside spring-cloud-stream-kafka-elasticsearch root folder, run the following Maven commands in different terminals

  • eureka-server

    ./mvnw clean spring-boot:run --projects eureka-server
  • producer-api

    ./mvnw clean spring-boot:run --projects producer-api -Dspring-boot.run.jvmArguments="-Dserver.port=9080"
  • categorizer-service

    ./mvnw clean spring-boot:run --projects categorizer-service -Dspring-boot.run.jvmArguments="-Dserver.port=9081"
  • collector-service

    ./mvnw clean spring-boot:run --projects collector-service -Dspring-boot.run.jvmArguments="-Dserver.port=9082"
  • publisher-api

    ./mvnw clean spring-boot:run --projects publisher-api -Dspring-boot.run.jvmArguments="-Dserver.port=9083"
  • news-client

    ./mvnw clean spring-boot:run --projects news-client

Running Applications as Docker containers

Build Application’s Docker Image

  • In a terminal, make sure you are in spring-cloud-stream-kafka-elasticsearch root folder

  • In order to build the application’s docker images, run the following script

    ./docker-build.sh

Application’s Environment Variables

  • producer-api

    Environment Variable Description

    KAFKA_HOST

    Specify host of the Kafka message broker to use (default localhost)

    KAFKA_PORT

    Specify port of the Kafka message broker to use (default 29092)

    SCHEMA_REGISTRY_HOST

    Specify host of the Schema Registry to use (default localhost)

    SCHEMA_REGISTRY_PORT

    Specify port of the Schema Registry to use (default 8081)

    EUREKA_HOST

    Specify host of the Eureka service discovery to use (default localhost)

    EUREKA_PORT

    Specify port of the Eureka service discovery to use (default 8761)

    ZIPKIN_HOST

    Specify host of the Zipkin distributed tracing system to use (default localhost)

    ZIPKIN_PORT

    Specify port of the Zipkin distributed tracing system to use (default 9411)

  • categorizer-service

    Environment Variable Description

    KAFKA_HOST

    Specify host of the Kafka message broker to use (default localhost)

    KAFKA_PORT

    Specify port of the Kafka message broker to use (default 29092)

    SCHEMA_REGISTRY_HOST

    Specify host of the Schema Registry to use (default localhost)

    SCHEMA_REGISTRY_PORT

    Specify port of the Schema Registry to use (default 8081)

    EUREKA_HOST

    Specify host of the Eureka service discovery to use (default localhost)

    EUREKA_PORT

    Specify port of the Eureka service discovery to use (default 8761)

    ZIPKIN_HOST

    Specify host of the Zipkin distributed tracing system to use (default localhost)

    ZIPKIN_PORT

    Specify port of the Zipkin distributed tracing system to use (default 9411)

  • collector-service

    Environment Variable Description

    ELASTICSEARCH_HOST

    Specify host of the Elasticsearch search engine to use (default localhost)

    ELASTICSEARCH_NODES_PORT

    Specify nodes port of the Elasticsearch search engine to use (default 9300)

    ELASTICSEARCH_REST_PORT

    Specify rest port of the Elasticsearch search engine to use (default 9200)

    KAFKA_HOST

    Specify host of the Kafka message broker to use (default localhost)

    KAFKA_PORT

    Specify port of the Kafka message broker to use (default 29092)

    SCHEMA_REGISTRY_HOST

    Specify host of the Schema Registry to use (default localhost)

    SCHEMA_REGISTRY_PORT

    Specify port of the Schema Registry to use (default 8081)

    EUREKA_HOST

    Specify host of the Eureka service discovery to use (default localhost)

    EUREKA_PORT

    Specify port of the Eureka service discovery to use (default 8761)

    ZIPKIN_HOST

    Specify host of the Zipkin distributed tracing system to use (default localhost)

    ZIPKIN_PORT

    Specify port of the Zipkin distributed tracing system to use (default 9411)

  • publisher-api

    Environment Variable Description

    ELASTICSEARCH_HOST

    Specify host of the Elasticsearch search engine to use (default localhost)

    ELASTICSEARCH_NODES_PORT

    Specify nodes port of the Elasticsearch search engine to use (default 9300)

    ELASTICSEARCH_REST_PORT

    Specify rest port of the Elasticsearch search engine to use (default 9200)

    EUREKA_HOST

    Specify host of the Eureka service discovery to use (default localhost)

    EUREKA_PORT

    Specify port of the Eureka service discovery to use (default 8761)

    ZIPKIN_HOST

    Specify host of the Zipkin distributed tracing system to use (default localhost)

    ZIPKIN_PORT

    Specify port of the Zipkin distributed tracing system to use (default 9411)

  • news-client

    Environment Variable Description

    KAFKA_HOST

    Specify host of the Kafka message broker to use (default localhost)

    KAFKA_PORT

    Specify port of the Kafka message broker to use (default 29092)

    SCHEMA_REGISTRY_HOST

    Specify host of the Schema Registry to use (default localhost)

    SCHEMA_REGISTRY_PORT

    Specify port of the Schema Registry to use (default 8081)

    EUREKA_HOST

    Specify host of the Eureka service discovery to use (default localhost)

    EUREKA_PORT

    Specify port of the Eureka service discovery to use (default 8761)

    ZIPKIN_HOST

    Specify host of the Zipkin distributed tracing system to use (default localhost)

    ZIPKIN_PORT

    Specify port of the Zipkin distributed tracing system to use (default 9411)

Run Application’s Docker Container

  • In a terminal, make sure you are inside spring-cloud-stream-kafka-elasticsearch root folder

  • Run following script

    ./start-apps.sh

Applications URLs

  • Eureka

    Eureka can be accessed at http://localhost:8761

  • Zipkin

    Zipkin can be accessed at http://localhost:9411

    The figure below shows an example of the complete flow a news passes through. It goes since producer-api, where the news is created, until news-client.

  • Kafka Topics UI

    Kafka Topics UI can be accessed at http://localhost:8085

  • Kafka Manager

    Kafka Manager can be accessed at http://localhost:9000

    The figure below shows the Kafka topics consumers. As we can see, the consumers are updated as the lag is 0

    Configuration

    • First, you must create a new cluster. Click on Cluster (dropdown button on the header) and then on Add Cluster

    • Type the name of your cluster in Cluster Name field, for example: MyCluster

    • Type zookeeper:2181 in Cluster Zookeeper Hosts field

    • Enable checkbox Poll consumer information (Not recommended for large # of consumers if ZK is used for offsets tracking on older Kafka versions)

    • Click on Save button at the bottom of the page.

  • Schema Registry UI

    Schema Registry UI can be accessed at http://localhost:8001

  • Elasticsearch REST API

    Check ES is up and running

    curl localhost:9200

    Check indexes in ES

    curl "localhost:9200/_cat/indices?v"

    Check news index mapping

    curl "localhost:9200/news/_mapping?pretty"

    Simple search

    curl "localhost:9200/news/_search?pretty"

Shutdown

  • To stop applications

    • If they were started with Maven, go to the terminals where they are running and press Ctrl+C

    • If they were started as Docker containers, go to a terminal and, inside spring-cloud-stream-kafka-elasticsearch root folder, run the script below

      ./stop-apps.sh
  • To stop and remove docker-compose containers, network and volumes, go to a terminal and, inside spring-cloud-stream-kafka-elasticsearch root folder, run the following command

    docker-compose down -v

Cleanup

To remove the Docker images created by this project, go to a terminal and, inside spring-cloud-stream-kafka-elasticsearch root folder, run the script below

./remove-docker-images.sh
  • 一、引入依赖包 org.springframework.cloud spring-cloud-stream org.springframework.cloud spring-cloud-stream-binder-kafka 二、自定义信息通道 官方提供了Sink(输入通道)、Source(输出通道)、Processor(集成Sink和Source通道),我们也可以自定义我们自己的信息通道。 @I

  • SpringCloud Sleuth Stream Zipkin Kafka Elasticsearch 实现简单链路跟踪 注意版本号zipkin使用的是2.4.2,SpringCloud版本Dalston.SR5 服务端主要配置 pom配置:: <!-- zipkin + kafka +es --> <dependency> <groupId>org.sp

  • 在前面的文章中,我们已经成功的使用Zipkin收集了项目的调用链日志。但是呢,由于我们收集链路信息时采用的是http请求方式收集的,而且链路信息没有进行保存,ZipkinServer一旦重启后就会所有信息都会消失了。基于性能的考虑,我们可以对它进行改造,使用SpringCloud Stream进行消息传递,使用Elasticsearch进行消息的存储。 参考文章 Zipkin全链路监控 Sprin

  • jar包启动参数 java -jar -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=128m -Xms1024m -Xmx1024m -Xmn256m -Xss256k -XX:SurvivorRatio=8 -XX:+UseConcMarkSweepGC xxx.jar springboot配置 # --------------------------

  • 一、背景 废话不多说,直接上代码 二、pom.xml <properties> <jdk.version>1.8</jdk.version> <java.version>1.8</java.version> <maven.compiler.source>8</maven.compiler.source> <maven.compiler.target>8</maven

 相关资料
  • 我最近开始为Kafka研究Spring Cloud Stream,并且一直在努力使TestBinder与Kstreams一起工作。这是一个已知的限制,还是我忽略了什么? 这很好: 字符串处理器: 字符串测试: 但当我试图在流程中使用KStream时,我无法让TestBinder正常工作。 Kstream处理器: KStream测试: 正如您可能已经注意到的,我从Kstream处理器中省略了@Str

  • 试图开发一个Spring云应用程序,使用kafka Kafka使用的配置是: 运行应用程序,我可以看到这些配置被选中 问题是以下错误消息: 如何配置这个“AdminClient”并将正确的主机/ip信息传递给它?查看了Spring Cloud Stream Kafka活页夹参考指南,但找不到答案。

  • Spring Cloud Stream应用程序的配置是否正确。我们有Spring Cloud Stream文档解释与Kafka的SSL连接吗?

  • 我正在尝试使用Kafka使用Spring Boot云流设置一个项目。我设法构建了一个简单的示例,其中侦听器从一个主题获取消息,并在处理后将输出发送到另一个主题。 我的侦听器和频道配置如下: 此示例的问题在于,当服务启动时,它不会检查主题中已存在的消息,它只处理启动后发送的那些消息。我对 Springboot 流和 kafka 很陌生,但对于我所读到的内容,这种行为可能与我正在使用的事实相对应。例如

  • Spring cloud stream starter kafka在连接消费者时没有加载配置。以下是我在调试模式下运行控制台时在控制台中看到的配置: 我有以下引导yml文件的配置部分

  • 使用Spring-Cloud-Stream的kafka绑定器,如何配置并发消息消费者(在单个消费者jvm中)?如果我没有理解错的话,在使用kafka时并发使用消息需要分区,但是s-c-s文档指出,要使用分区,您需要通过partitionKeyExpression或PartitionKeyExtractorClass在生成器中指定分区选择。Kafka博士提到循环分区。 s-c-s文档根本没有提到sp