什么是消息总线
1. 概念
在微服务架构中,通常会使用轻量级的消息代理来构建一个共用的消息主题来连接各个微服务实例, 它广播的消息会被所有在注册中心的微服务实例监听和消费,也称消息总线
2. SpringCloud Bus
SpringCloud中也有对应的解决方案,SpringCloud Bus 将分布式的节点用轻量的消息代理连接起来, 可以很容易搭建消息总线,配合SpringCloud config 实现微服务应用配置信息的动态更新。
3. 其他
消息代理属于中间件。设计代理的目的就是为了能够从应用程序中传入消息,并执行一些特别的操作。 开源产品很多如ActiveMQ、Kafka、RabbitMQ、RocketMQ等 目前springCloud仅支持RabbitMQ和Kafka。本文采用RabbitMQ实现这一功能。
搭建分布式配置中心
1. Config 架构
当一个系统中的配置文件发生改变的时候,我们需要重新启动该服务,才能使得新的配置文件生效,spring cloud config可以实现微服务中的所有系统的配置文件的统一管理,而且还可以实现当配置文件发生变化的时候,系统会自动更新获取新的配置。
2. Git 环境搭建
使用 码云 环境搭建 git
码云环境地址: https://gitee.com/guopf/springcloud_bus
3. Git服务器上传配置文件
命名规范 服务名称-版本.yml 例如configclient_dev.yml
4. 搭建 Eureka 服务注册中心
具体搭建环境随后补充,可以使用我自己部署的 http://47.105.86.222:8100 (配置地址http://47.105.86.222:8100/eureka)
5. 搭建 config-server 服务
1. maven 依赖
<dependencies> <!-- SpringBoot整合Web组件 --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <!--spring-cloud 整合 config-server --> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-config-server</artifactId> <version>2.0.2.RELEASE</version> </dependency> <!-- SpringBoot整合eureka客户端 --> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId> <version>2.0.2.RELEASE</version> </dependency> </dependencies>
2. 配置文件
### 服务地址 端口 server: port: 8800 ### 服务名称 spring: application: name: config_server cloud: config: server: git: ### git 地址 uri: https://gitee.com/guopf/springcloud_bus.git username: password: ### 配置读取文件夹 search-paths: config ### 分支 label: master ### eureka 配置 eureka: client: service-url: defaultZone: http://47.105.86.222:8100/eureka register-with-eureka: true fetch-registry: true
3. 启动
/** * @EnableEurekaClient : 开启 eureka 客户端 * @EnableConfigServer : 开启 config 服务端 * */ @SpringBootApplication @EnableEurekaClient @EnableConfigServer public class App { public static void main(String[] args) { SpringApplication.run(App.class,args); } }
搭建 config-client 服务
1. 手动更新
1. maven 依赖
<dependencies> <!-- SpringBoot整合Web组件 --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-config-client</artifactId> <version>2.0.2.RELEASE</version> </dependency> <!-- SpringBoot整合eureka客户端 --> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId> <version>2.0.2.RELEASE</version> </dependency> <!--核心jar包,集成rabbitMQ 消息总线 bus <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-bus-amqp</artifactId> </dependency> --> <!-- actuator监控中心 --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-actuator</artifactId> </dependency> </dependencies>
2. 配置文件(bootstrap.yml)
### 端口 server: port: 8801 ### eureka 配置中心 eureka: client: service-url: defaultZone: http://47.105.86.222:8100/eureka fetch-registry: true register-with-eureka: true ### 配置服务名称,要和config 配置中心文件保持一致 spring: application: name: configclient cloud: config: ### 读取配置 profile: dev discovery: ### enabled: true ### config 配置中心配置的服务名称 service-id: config_server management: endpoints: web: exposure: include: "*"
3. 启动
@SpringBootApplication @EnableEurekaClient @EnableDiscoveryClient public class App { public static void main(String[] args) { SpringApplication.run(App.class,args); } }
/** * 在读取配置文件信息的地方进行添加 @RefreshScope 注解 */ @RestController @RefreshScope public class AppController { @Value("${userAge}") private String userAge; @GetMapping("/userAge") public String config(){ System.out.println("userAge : " + userAge); return userAge; } }
修改git仓库中的配置,进行手动更新,post请求
http://127.0.0.1:8801/actuator/refresh 启动刷新器 从cofnig_server读取
2. 使用消息总线 bus 更新
1. 添加依赖信息
在 config_server,config_client 中添加
<dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-bus-amqp</artifactId> </dependency> <!-- actuator监控中心 --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-actuator</artifactId> </dependency>
2. 配置文件修改
添加对 rabbbitMQ的配置, rabbitMQ服务和config_server,config_client 在一个服务器上,使用默认配置即可
### rabbitmq 配置信息 rabbitmq: addresses: 47.105.86.222 username: guest password: guest port: 5672 virtual-host: /
3. 刷新
http://127.0.0.1:8801/actuator/bus-refresh post方式
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持小牛知识库。
主要内容:1 send源码入口,1.1 同步消息,1.2 单向消息,1.3 异步消息,2 sendDefaultImpl发送消息实现,2.1 makeSureStateOK确定生产者服务状态,2.2 checkMessage校验消息的合法性,2.3 tryToFindTopicPublishInfo查找topic的发布信息,2.4 计算发送次数timesTotal,2.5 selectOneMessageQueue选择消息队列,,,基于RocketMQ 4.9.3,详细的介绍了Producer发
我在Azure中托管了两个云服务辅助角色,一个使用NServiceBus(Azure服务总线传输)消耗消息,另一个生成消息。 昨天,我部署了一个新版本的生产者工作者角色,而队列中仍然有大量消息,因为我们正在处理早上遗留下来的大量消息。当生产者启动时,它似乎已经清空(或者可能重新创建)队列,许多重要的生产消息丢失。这似乎很奇怪,但日志显示,大约在生产者角色启动时,消费者没有处理进一步的消息,我们知道
问题内容: 抱歉,这个问题以前是否已经解决过。如果可以,请将我链接到该文件,但到目前为止,我还没有找到满意的答案。 我一直在寻找能使我的javax验证提供的错误消息更具体的方法。 在ValidationMessages.properties文件中指定了我目前用于@Min批注的消息: 并按预期打印 我想要的是消息还包括验证失败的变量(和类)的名称以及失败的变量的值。所以更像。 任何帮助将不胜感激。
我正在尝试在Azure中构建一个简单的WebAPI REST服务,后端有一个服务总线队列工作器。我可以从Web API向工作人员发送一条消息。然而,我试图发送更多的信息,只是为了看看一切是如何运作的。因此,我创建了一个简单的控制器,如下所示: 当我呼叫控制器时,我只收到工作人员接收到的大约1/2的消息。其余的似乎都被放弃了。
根据MS文档,从订阅接收消息并不困难。但是,如果我希望我的应用程序在每次发布新消息时都接收一条消息--一个恒定的轮询。因此,使用了SubscriptionClient类的OnMessage()方法。 MS文档说:“...当调用OnMessage时,客户端启动一个内部消息泵,该消息泵不断轮询队列或订阅。该消息泵由发出Receive()调用的无限循环组成。如果调用超时,它发出下一个Receive()调
主要内容:ensureValidRecordSize,原理图,源码剖析ensureValidRecordSize 原理图 每个消息根据分区号找到对应的队列(每个队列都有一个TopicPartition), kafka设计了内存池,每个批次的默认大小是16k,当批次发完出去之后,就把内存放入内存池,当需要内存时就从内存池拿取,减少了内存的GC; 源码剖析