1.安装zbus,默认15555端口
2.客户端(生产者)
zbus配置文件:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:tx="http://www.springframework.org/schema/tx" xmlns:context="http://www.springframework.org/schema/context" xmlns:aop="http://www.springframework.org/schema/aop" xsi:schemaLocation="
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd
http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop.xsd
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx.xsd">
<description>RPC服务</description>
<!-- 切换至高可用模式,只需要把broker的实现改为HaBroker配置 -->
<bean id="broker" class="org.zbus.broker.SingleBroker">
<constructor-arg>
<bean class="org.zbus.broker.BrokerConfig">
<property name="brokerAddress" value="${zbus.address}" />
<property name="maxTotal" value="20" />
</bean>
</constructor-arg>
</bean>
<bean id="wykRpcFactory" class="org.zbus.rpc.RpcFactory">
<constructor-arg>
<bean class="org.zbus.rpc.mq.MqInvoker">
<constructor-arg ref="broker" />
<constructor-arg value="WYK_RPC" />
</bean>
</constructor-arg>
</bean>
<!--rpc管理类-->
<bean id="accountRpcManager" class="com.wyk.rpc.account.manager.AccountRpcManager" init-method="initService">
<property name="wykRpcFactory" ref="wykRpcFactory" />
<property name="pushTaskExecutor" ref="pushTaskExecutor" />
</bean>
<!--线程池异步推送消息 -->
<bean id="pushTaskExecutor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
<property name="corePoolSize" value="5" />
<property name="maxPoolSize" value="10" />
<property name="queueCapacity" value="128" />
<property name="keepAliveSeconds" value="900"></property>
</bean>
</beans>
rpc管理类:
public class AccountRpcManager {
private static final Logger log = LoggerFactory.getLogger(AccountRpcManager.class);
/**
* rpc工厂
*/
private RpcFactory wykRpcFactory;
/**
* 线程池
*/
private AsyncTaskExecutor pushTaskExecutor;
public void setWykRpcFactory(RpcFactory wykRpcFactory) {
this.wykRpcFactory = wykRpcFactory;
}
public void setPushTaskExecutor(AsyncTaskExecutor pushTaskExecutor) {
this.pushTaskExecutor = pushTaskExecutor;
}
private TestZbusService testZbusService;
public void initService() {
try {
testZbusService = wykRpcFactory.getService(TestZbusService.class);
} catch (Exception e) {
//e.printStackTrace();
}
}
public void register(ZbusTest zbusTest){
this.pushTaskExecutor.submit(new ZbusTestThread(zbusTest));
}
private class ZbusTestThread implements Runnable {
private ZbusTest zbusTest;
private ZbusTestThread(ZbusTest zbusTest) {
super();
this.zbusTest = zbusTest;
}
@Override
public void run() {
testZbusService.register(zbusTest);
}
}
}
rpc服务的接口:
public interface TestZbusService {
/**
* 注册账户
*/
public void register(ZbusTest account);
}
实体类:
public class ZbusTest implements Serializable {
private static final long serialVersionUID = 1L;
private String id;//用户id
private String name;
public ZbusTest() {
}
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public ZbusTest(String id, String name) {
super();
this.id = id;
this.name = name;
}
}
控制类的入口:
/**产生数据并发布*/
@RequestMapping(value = "/zbusTest", method = RequestMethod.GET)
public Response registerZBusTest(@RequestParam(name ="id")String id,@RequestParam(name="name")String name){
Response response = new Response();
accountRpcManager.register(new ZbusTest(id,name));
response.success();
return response;
}
3.服务端(消费者)
zbus配置文件:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:tx="http://www.springframework.org/schema/tx" xmlns:context="http://www.springframework.org/schema/context" xmlns:aop="http://www.springframework.org/schema/aop" xsi:schemaLocation="
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd
http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop.xsd
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx.xsd">
<description>RPC服务</description>
<bean class="org.springframework.beans.factory.annotation.AutowiredAnnotationBeanPostProcessor" />
<!-- 暴露账户服务接口实现 -->
<bean id="testZbusService" class="com.wyk.rpc.account.service.impl.TestZbusServiceImpl"></bean>
<bean id="serviceRcpProcessor" class="org.zbus.rpc.RpcProcessor">
<constructor-arg>
<list>
<!-- 放入需要的暴露的的接口 -->
<ref bean="testZbusService" />
</list>
</constructor-arg>
</bean>
<!-- 切换至高可用模式,只需要把broker的实现改为HaBroker配置 -->
<bean id="broker" class="org.zbus.broker.SingleBroker">
<constructor-arg>
<bean class="org.zbus.broker.BrokerConfig">
<property name="brokerAddress" value="${zbus.address}" />
<property name="maxTotal" value="20" />
</bean>
</constructor-arg>
</bean>
<!-- 默认调用了start方法,由Spring容器直接带起来注册到zbus总线上 -->
<bean id="zbusService" class="org.zbus.rpc.mq.Service" init-method="start">
<constructor-arg>
<bean class="org.zbus.rpc.mq.ServiceConfig">
<!-- 支持多总线注册 -->
<constructor-arg>
<list>
<ref bean="broker" />
</list>
</constructor-arg>
<property name="mq" value="WYK_RPC" />
<property name="consumerCount" value="2" />
<property name="messageProcessor" ref="serviceRcpProcessor" />
</bean>
</constructor-arg>
</bean>
</beans>
rpc服务的接口:
public interface TestZbusService {
void register(ZbusTest account);
}
rpc服务的接口实现类:
@Transactional
public class TestZbusServiceImpl implements TestZbusService {
@Autowired
private TestZbusDao testZbusDao;
@Override
public void register(ZbusTest account) {
testZbusDao.save(account);
}
}
用于数据库操作的dao层接口,CrudRepository是一个对数据库增删改查的封装接口,其实现类已实现增删改查等基本操作
@Repository
public interface TestZbusDao extends CrudRepository<ZbusTest, String> {}
实体类:
@Entity
@Table(name = "z_test")
public class ZbusTest implements Serializable {
private static final long serialVersionUID = 1L;
private String id;//用户id
private String name;
public ZbusTest() {
}
@Id
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public ZbusTest(String id, String name) {
super();
this.id = id;
this.name = name;
}
}
参考地址: