当前位置: 首页 > 工具软件 > zbus > 使用案例 >

Spring集成Zbus

贾建茗
2023-12-01
简单的记录下,项目中集成zbus的核心部分,做下简单的笔记,有误之处~还请指出

1.安装zbus,默认15555端口

2.客户端(生产者)

zbus配置文件:

<?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:tx="http://www.springframework.org/schema/tx" xmlns:context="http://www.springframework.org/schema/context" xmlns:aop="http://www.springframework.org/schema/aop" xsi:schemaLocation="
         http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd
         http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop.xsd
         http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
         http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx.xsd">

        <description>RPC服务</description>

        <!-- 切换至高可用模式,只需要把broker的实现改为HaBroker配置 -->
        <bean id="broker" class="org.zbus.broker.SingleBroker">
            <constructor-arg>
                <bean class="org.zbus.broker.BrokerConfig">
                    <property name="brokerAddress" value="${zbus.address}" />
                    <property name="maxTotal" value="20" />
                </bean>
            </constructor-arg>
        </bean>

        <bean id="wykRpcFactory" class="org.zbus.rpc.RpcFactory">
            <constructor-arg>
                <bean class="org.zbus.rpc.mq.MqInvoker">
                    <constructor-arg ref="broker" />
                    <constructor-arg value="WYK_RPC" />
                </bean>
            </constructor-arg>
        </bean>

        <!--rpc管理类-->
        <bean id="accountRpcManager" class="com.wyk.rpc.account.manager.AccountRpcManager" init-method="initService">
            <property name="wykRpcFactory" ref="wykRpcFactory" />
            <property name="pushTaskExecutor" ref="pushTaskExecutor" />
        </bean>


        <!--线程池异步推送消息 -->
        <bean id="pushTaskExecutor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
            <property name="corePoolSize" value="5" />
            <property name="maxPoolSize" value="10" />
            <property name="queueCapacity" value="128" />
            <property name="keepAliveSeconds" value="900"></property>
        </bean>
</beans>

rpc管理类:


public class AccountRpcManager {

       private static final Logger log = LoggerFactory.getLogger(AccountRpcManager.class);

        /**
         * rpc工厂
         */
        private RpcFactory wykRpcFactory;

        /**
         * 线程池
         */
        private AsyncTaskExecutor pushTaskExecutor;

        public void setWykRpcFactory(RpcFactory wykRpcFactory) {
            this.wykRpcFactory = wykRpcFactory;
        }

        public void setPushTaskExecutor(AsyncTaskExecutor pushTaskExecutor) {
            this.pushTaskExecutor = pushTaskExecutor;
        }

        private TestZbusService testZbusService;

        public void initService() {
            try {
                testZbusService = wykRpcFactory.getService(TestZbusService.class);
            } catch (Exception e) {
                //e.printStackTrace();
            }
        }

        public void register(ZbusTest zbusTest){
            this.pushTaskExecutor.submit(new ZbusTestThread(zbusTest));
        }


        private class ZbusTestThread implements Runnable {

            private ZbusTest zbusTest;

            private ZbusTestThread(ZbusTest zbusTest) {
                super();
                this.zbusTest = zbusTest;
            }
            @Override
            public void run() {
                testZbusService.register(zbusTest);
            }

        }

}

rpc服务的接口:

public interface TestZbusService {
    /**
     * 注册账户
     */
    public void register(ZbusTest account);
}

实体类:

public class ZbusTest implements Serializable {

        private static final long serialVersionUID = 1L;

        private String id;//用户id
        private String name;

        public ZbusTest() {
        }

        public String getId() {
            return id;
        }

        public void setId(String id) {
            this.id = id;
        }

        public String getName() {
            return name;
        }

        public void setName(String name) {
            this.name = name;
        }

        public ZbusTest(String id, String name) {
            super();
            this.id = id;
            this.name = name;
        }

}

控制类的入口:

/**产生数据并发布*/
@RequestMapping(value = "/zbusTest", method = RequestMethod.GET)
public Response registerZBusTest(@RequestParam(name ="id")String id,@RequestParam(name="name")String name){
        Response response = new Response();

        accountRpcManager.register(new ZbusTest(id,name));
        response.success();
        return response;
}

3.服务端(消费者)

zbus配置文件:

<?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:tx="http://www.springframework.org/schema/tx" xmlns:context="http://www.springframework.org/schema/context" xmlns:aop="http://www.springframework.org/schema/aop" xsi:schemaLocation="
         http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd
         http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop.xsd
         http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
         http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx.xsd">

        <description>RPC服务</description>

        <bean class="org.springframework.beans.factory.annotation.AutowiredAnnotationBeanPostProcessor" />

        <!-- 暴露账户服务接口实现 -->
        <bean id="testZbusService" class="com.wyk.rpc.account.service.impl.TestZbusServiceImpl"></bean>

        <bean id="serviceRcpProcessor" class="org.zbus.rpc.RpcProcessor">
            <constructor-arg>
                <list>
                    <!-- 放入需要的暴露的的接口 -->
                    <ref bean="testZbusService" />
                </list>
            </constructor-arg>
        </bean>


        <!-- 切换至高可用模式,只需要把broker的实现改为HaBroker配置 -->
        <bean id="broker" class="org.zbus.broker.SingleBroker">
            <constructor-arg>
                <bean class="org.zbus.broker.BrokerConfig">
                    <property name="brokerAddress" value="${zbus.address}" />
                    <property name="maxTotal" value="20" />
                </bean>
            </constructor-arg>
        </bean>

        <!-- 默认调用了start方法,由Spring容器直接带起来注册到zbus总线上 -->
        <bean id="zbusService" class="org.zbus.rpc.mq.Service" init-method="start">
            <constructor-arg>
                <bean class="org.zbus.rpc.mq.ServiceConfig">
                    <!-- 支持多总线注册 -->
                    <constructor-arg>
                        <list>
                            <ref bean="broker" />
                        </list>
                    </constructor-arg>
                    <property name="mq" value="WYK_RPC" />
                    <property name="consumerCount" value="2" />
                    <property name="messageProcessor" ref="serviceRcpProcessor" />
                </bean>
            </constructor-arg>
        </bean>

</beans>

rpc服务的接口:

public interface TestZbusService {
    void register(ZbusTest account);
}

rpc服务的接口实现类:

@Transactional
public class TestZbusServiceImpl implements TestZbusService {

        @Autowired
        private TestZbusDao testZbusDao;

        @Override
        public void register(ZbusTest account) {
            testZbusDao.save(account);
        }
}

用于数据库操作的dao层接口,CrudRepository是一个对数据库增删改查的封装接口,其实现类已实现增删改查等基本操作

@Repository
public interface TestZbusDao extends CrudRepository<ZbusTest, String> {}

实体类:

@Entity
@Table(name = "z_test")
public class ZbusTest implements Serializable {

        private static final long serialVersionUID = 1L;

        private String id;//用户id
        private String name;

        public ZbusTest() {
        }
        @Id
        public String getId() {
            return id;
        }

        public void setId(String id) {
            this.id = id;
        }

        public String getName() {
            return name;
        }

        public void setName(String name) {
            this.name = name;
        }

        public ZbusTest(String id, String name) {
            super();
            this.id = id;
            this.name = name;
        }

}

参考地址:

 类似资料: