当前位置: 首页 > 编程笔记 >

SpringCloud之消息总线Spring Cloud Bus实例代码

郑佐
2023-03-14
本文向大家介绍SpringCloud之消息总线Spring Cloud Bus实例代码,包括了SpringCloud之消息总线Spring Cloud Bus实例代码的使用技巧和注意事项,需要的朋友参考一下

一、简介

在微服务架构的系统中,我们通常会使用轻量级的消息代理来构建一个共用的消息主题让系统中所有微服务实例都连接上来,由于该主题中产生的消息会被所有实例监听和消费,所以我们称它为消息总线。

二、消息代理

消息代理(Message Broker)是一种消息验证、传输、路由的架构模式。它在应用程序之间起到通信调度并最小化应用之间的依赖的作用,使得应用程序可以高效地解耦通信过程。消息代理是一个中间件产品,它的核心是一个消息的路由程序,用来实现接收和分发消息, 并根据设定好的消息处理流来转发给正确的应用。 它包括独立的通信和消息传递协议,能够实现组织内部和组织间的网络通信。设计代理的目的就是为了能够从应用程序中传入消息,并执行一些特别的操作,下面这些是在企业应用中,我们经常需要使用消息代理的场景:

  1. 将消息路由到一个或多个目的地。
  2. 消息转化为其他的表现方式。
  3. 执行消息的聚集、消息的分解,并将结果发送到它们的目的地,然后重新组合响应返回给消息用户。
  4. 调用Web服务来检索数据
  5. 响应事件或错误。
  6. 使用发布-订阅模式来提供内容或基千主题的消息路由。

目前已经有非常多的开源产品可以供大家使用, 比如:

  1. ActiveMQKafka
  2. RabbitMQ
  3. RocketMQ
  4. 等......

三、SpringCloud+RabbitMQ

(1)RabbitMQ简介、安装不赘述。

(2)pom.xml

<dependencies> 
 <dependency> 
 <groupId>org.springframework.boot</groupId> 
 <artifactId>spring-boot-starter-amqp</artifactId> 
 </dependency> 
 
 <dependency> 
 <groupId>org.springframework.boot</groupId> 
 <artifactId>spring-boot-starter-test</artifactId> 
 <scope>test</scope> 
 </dependency> 
</dependencies> 

(3)application.yml

spring: 
 application: 
 name: rabbitmq-hello 
 rabbitmq: 
 host: ***.***.***.*** 
 port: 5672 
 username: guest 
 password: guest 

(4)发送者Sender

@Component 
public class Sender { 
 
 private static final Logger log = LoggerFactory.getLogger(Sender.class); 
 @Autowired 
 private AmqpTemplate amqpTemplate; 
 
 public void send() { 
 String context = "hello " + new Date(); 
 log.info("Sender : " + context); 
 this.amqpTemplate.convertAndSend("hello", context); 
 } 
} 

(5)接受者Receiver

@Component 
@RabbitListener(queues = "hello") 
public class Receiver { 
 
 private static final Logger log = LoggerFactory.getLogger(Receiver.class); 
 
 @RabbitHandler 
 public void process(String hello) { 
 log.info("Receiver : " + hello); 
 } 
} 

(6)创建RabbitMQ的配置类 RabbitConfig

@Configuration 
public class RabbitConfig { 
 
 @Bean 
 public Queue helloQueue(){ 
 return new Queue("hello"); 
 } 
} 

(7)创建单元测试类, 用来调用消息生产

@RunWith(SpringJUnit4ClassRunner.class) 
@SpringBootTest(classes = SpringcloudbusrabbitmqApplication.class) 
public class HelloApplicationTests { 
 
 @Autowired 
 private Sender sender; 
 
 @Test 
 public void hello() throws Exception { 
 sender.send(); 
 } 
} 

(8)测试,执行HelloApplicationTests


(9)访问host:15672


四、改造Config-Client(整合springcloud bus)

(1)pom.xml

<dependencies> 
 <dependency> 
 <groupId>org.springframework.cloud</groupId> 
 <artifactId>spring-cloud-starter-config</artifactId> 
 </dependency> 
 <dependency> 
 <groupId>org.springframework.boot</groupId> 
 <artifactId>spring-boot-starter-web</artifactId> 
 </dependency> 
 <dependency> 
 <groupId>org.springframework.cloud</groupId> 
 <artifactId>spring-cloud-starter-eureka</artifactId> 
 </dependency> 
 <dependency> 
 <groupId>org.springframework.cloud</groupId> 
 <artifactId>spring-cloud-starter-bus-amqp</artifactId> 
 </dependency> 
 <dependency> 
 <groupId>org.springframework.boot</groupId> 
 <artifactId>spring-boot-starter-actuator</artifactId> 
 </dependency> 
 
 <dependency> 
 <groupId>org.springframework.boot</groupId> 
 <artifactId>spring-boot-starter-test</artifactId> 
 <scope>test</scope> 
 </dependency> 
</dependencies> 

(2)bootstrap.properties

spring.application.name=configspace 
spring.cloud.config.label=master 
spring.cloud.config.profile=dev 
spring.cloud.config.uri= http://localhost:5588/ 
eureka.client.serviceUrl.defaultZone=http://localhost:5555/eureka/ 
 
server.port=5589 
 
spring.rabbitmq.host=118.89.237.88 
spring.rabbitmq.port= 5672 
spring.rabbitmq.username=guest 
spring.rabbitmq.password=guest 
 
management.security.enabled=false 

(3)其他不用改变

五、测试

(1)测试准备

一个服务注册中心,EUREKASERVER,端口为5555;

一个分布式配置中心,ConfigServer,端口为5588;

二个分布式配置,ConfigClient,端口为5589、5590;(2)访问http://localhost:5589/from


(3)访问http://localhost:5590/from


RabbitMQ:


(4)去仓库修改password的值

from=git-dev-v1.0 by springcloud config-server 
username=springcloud 
password=1234567890 

(5)POST请求http://localhost:5589/bus/refresh或者http://localhost:5590/bus/refresh


成功请求后config-client会重新读取配置文件


(6)再次访问

  1. 如果POST请求的是:http://localhost:5589/bus/refresh,请访问http://localhost:5590/from
  2. 如果访问出现401,则配置需要加上management.security.enabled=false


如果POST请求的是:http://localhost:5590/bus/refresh,请访问http://localhost:5589/from


另/bus/refresh接口可以指定服务,即使用“username”参数,比如 “/bus/refresh?destination=username:**”即刷新服务名为username的所有服务,不管ip地址。

(7)架构


(8)架构调整

既然SpringCloud Bus的/bus/refresh接口提供了针对服务和实例进行配置更新的参数,那么我们的架构也可以相应做出一些调整。在之前的架构中,服务的配置更新需要通过向具体服务中的某个实例发送请求,再触发对整个服务集群的配置更新。虽然能实现功能,但是这样的结果是,我们指定的应用实例会不同千集群中的其他应用实例,这样会增加集群内部的复杂度,不利于将来的运维工作。比如, 需要对服务实例进行迁移,那么我们不得不修改Web Hook中的配置等。所以要尽可能地让服务集群中的各个节点是对等的。

因此, 我们将之前的架构做了 一些调整, 如下图所示:


主要做了以下这些改动:

  1. 在ConfigServer中也引入SpringCloud Bus,将配置服务端也加入到消息总线中来。
  2. /bus/refresh请求不再发送到具体服务实例上,而是发送给Config Server,并通过des巨nation参数来指定需要更新配置的服务或实例。

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持小牛知识库。

 类似资料:
  • 本文向大家介绍SpringCloud Bus 消息总线的具体使用,包括了SpringCloud Bus 消息总线的具体使用的使用技巧和注意事项,需要的朋友参考一下 什么是消息总线 1. 概念 在微服务架构中,通常会使用轻量级的消息代理来构建一个共用的消息主题来连接各个微服务实例, 它广播的消息会被所有在注册中心的微服务实例监听和消费,也称消息总线 2. SpringCloud Bus Spring

  • 我在Azure Service Bus中使用代理消息传递(主题/订阅),我很好奇如何(或者是否)使用SSL保护通信。

  • 本文向大家介绍SpringCloud之服务注册与发现Spring Cloud Eureka实例代码,包括了SpringCloud之服务注册与发现Spring Cloud Eureka实例代码的使用技巧和注意事项,需要的朋友参考一下 一、Spring Cloud简介 Spring Cloud是一个基千SpringBoot实现的微服务架构开发 工具。它为微服务架构中涉及的 配置管理、服务治理、 断路器

  • 我在Azure中托管了两个云服务辅助角色,一个使用NServiceBus(Azure服务总线传输)消耗消息,另一个生成消息。 昨天,我部署了一个新版本的生产者工作者角色,而队列中仍然有大量消息,因为我们正在处理早上遗留下来的大量消息。当生产者启动时,它似乎已经清空(或者可能重新创建)队列,许多重要的生产消息丢失。这似乎很奇怪,但日志显示,大约在生产者角色启动时,消费者没有处理进一步的消息,我们知道

  • 我正在尝试在Azure中构建一个简单的WebAPI REST服务,后端有一个服务总线队列工作器。我可以从Web API向工作人员发送一条消息。然而,我试图发送更多的信息,只是为了看看一切是如何运作的。因此,我创建了一个简单的控制器,如下所示: 当我呼叫控制器时,我只收到工作人员接收到的大约1/2的消息。其余的似乎都被放弃了。

  • 本文向大家介绍SpringCloud之Feign示例详解,包括了SpringCloud之Feign示例详解的使用技巧和注意事项,需要的朋友参考一下 Feign简介 Feign 是一个声明web服务客户端,这便得编写web服务客户端更容易,使用Feign 创建一个接口并对它进行注解,它具有可插拔的注解支持包括Feign注解与JAX-RS注解,Feign还支持可插拔的编码器与解码器,Spring Cl