当前位置: 首页 > 工具软件 > xxl-mq > 使用案例 >

springboot整合xxl-mq学习笔记

葛学民
2023-12-01

springboot整合xxl-mq学习笔记

首先xxl-mq是大神xuxueli开发的一个消息中间件框架:

与springboot整合过程:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <groupId>com.xuxueli</groupId>
        <artifactId>xxl-mq-samples</artifactId>
        <version>1.3.0-SNAPSHOT</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>
    <artifactId>xxl-mq-samples-springboot</artifactId>
    <packaging>jar</packaging>


    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-parent</artifactId>
                <version>${spring-boot.version}</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>


    <dependencies>

        <!-- starter-web:spring-webmvc + autoconfigure + logback + yaml + tomcat -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <!-- starter-test:junit + spring-test + mockito -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>

        <!-- freemarker-starter -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-freemarker</artifactId>
        </dependency>

        <!-- xxl-mq-client -->
        <dependency>
            <groupId>com.xuxueli</groupId>
            <artifactId>xxl-mq-client</artifactId>
            <version>${parent.version}</version>
        </dependency>

    </dependencies>


    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
                <version>${spring-boot.version}</version>
                <executions>
                    <execution>
                        <goals>
                            <goal>repackage</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>


</project>

2  propeties

### web
server.port=8081
server.context-path=/

### resources
spring.mvc.static-path-pattern=/static/**
spring.resources.static-locations=classpath:/static/

### freemarker
spring.freemarker.templateLoaderPath=classpath:/templates/
spring.freemarker.suffix=.ftl
spring.freemarker.charset=UTF-8
spring.freemarker.request-context-attribute=request
spring.freemarker.settings.number_format=0.##########

# xxl-mq, admin conf 这个配置时admin部署的位置
xxl.mq.admin.address=http://localhost:8080/xxl-mq-admin
### xxl-mq, access token
xxl.mq.accessToken=

index。html

<script src="${request.contextPath}/static/jquery/jquery.min.js"></script>
<body>

    <input type="button" class="send" _type="0" value="并行消费" />
    <br><br>

    <input type="button" class="send" _type="1" value="串行消费" />
    <br><br>

    <input type="button" class="send" _type="2" value="广播消息" />
    <br><br>

    <input type="button" class="send" _type="3" value="延时消息:5分钟后执行" />
    <br><br>

    <input type="button" class="send" _type="4" value="性能测试:批量发送10000条消息" />

    <hr>
    <div id="console"></div>

    <script>
        $(function(){
            $(".send").click(function () {
                var _type = $(this).attr("_type");
                $.post( '${request.contextPath}/produce', {'type':_type}, function(data,status){
                    var temp = "<br>" + new Date().format("yyyy-MM-dd HH:mm:ss") + ":  ";
                    temp += ("SUCCESS" == data)?('成功发送一条消息!'):data;
                    $("#console").prepend(temp);
                });
            });
        });

        // Format
        Date.prototype.format = function(fmt) {
            var o = {
                "M+" : this.getMonth()+1,                 //月份
                "d+" : this.getDate(),                    //日
                "h+" : this.getHours(),                   //小时
                "m+" : this.getMinutes(),                 //分
                "s+" : this.getSeconds(),                 //秒
                "q+" : Math.floor((this.getMonth()+3)/3), //季度
                "S"  : this.getMilliseconds()             //毫秒
            };
            if(/(y+)/.test(fmt))
                fmt=fmt.replace(RegExp.$1, (this.getFullYear()+"").substr(4 - RegExp.$1.length));
            for(var k in o)
                if(new RegExp("("+ k +")").test(fmt))
                    fmt = fmt.replace(RegExp.$1, (RegExp.$1.length==1) ? (o[k]) : (("00"+ o[k]).substr((""+ o[k]).length)));
            return fmt;
        }

    </script>

</body>

需要在juery下面引入:

jquery.min.js

日志管理配置:

logback。xml

<?xml version="1.0" encoding="UTF-8"?>
<configuration debug="false" scan="true" scanPeriod="1 seconds">

    <contextName>logback</contextName>
    <property name="log.path" value="/data/applogs/xxl-mq/xxl-mq-samples-springboot.log"/>

    <appender name="console" class="ch.qos.logback.core.ConsoleAppender">
        <encoder>
            <pattern>%d{HH:mm:ss.SSS} %contextName [%thread] %-5level %logger{36} - %msg%n</pattern>
        </encoder>
    </appender>

    <appender name="file" class="ch.qos.logback.core.rolling.RollingFileAppender">
        <file>${log.path}</file>
        <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
            <fileNamePattern>${log.path}.%d{yyyy-MM-dd}.zip</fileNamePattern>
        </rollingPolicy>
        <encoder>
            <pattern>%date %level [%thread] %logger{36} [%file : %line] %msg%n
            </pattern>
        </encoder>
    </appender>

    <root level="info">
        <appender-ref ref="console"/>
        <appender-ref ref="file"/>
    </root>

</configuration>

配置XxlMqConf.java

package com.xxl.mq.sample.springboot.conf;

import com.xxl.mq.client.factory.impl.XxlMqSpringClientFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;

@Component
public class XxlMqConf {

    // ---------------------- param ----------------------

    @Value("${xxl.mq.admin.address}")
    private String adminAddress;
    @Value("${xxl.mq.accessToken}")
    private String accessToken;


    @Bean
    public XxlMqSpringClientFactory getXxlMqConsumer(){

        XxlMqSpringClientFactory xxlMqSpringClientFactory = new XxlMqSpringClientFactory();
        xxlMqSpringClientFactory.setAdminAddress(adminAddress);
        xxlMqSpringClientFactory.setAccessToken(accessToken);

        return xxlMqSpringClientFactory;
    }

}

controller 根据传入的参数进行设置:



import com.xxl.mq.client.message.XxlMqMessage;
import com.xxl.mq.client.producer.XxlMqProducer;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.ExceptionHandler;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;

import java.util.Calendar;
import java.util.Date;

/**
 * index controller
 * @author xuxueli 2015-12-19 16:13:16
 */
@Controller
public class IndexController {

    @RequestMapping("/")
    public String index(){
        return "index";
    }

    @RequestMapping("/produce")
    @ResponseBody
    public String produce(int type){

        String topic = "topic_1";
        String data = "时间戳:" + System.currentTimeMillis();

        if (type == 0) {

            /**
             * 并行消费
             */
            XxlMqProducer.produce(new XxlMqMessage(topic, data));

        } else if (type == 1) {

            /**
             * 串行消费
             */
            XxlMqProducer.produce(new XxlMqMessage(topic, data, 1L));

        } else if (type == 2) {

            /**
             * 广播消费
             */
            XxlMqProducer.broadcast(new XxlMqMessage(topic, data));


        } else if (type == 3) {

            Calendar calendar = Calendar.getInstance();
            calendar.add(Calendar.MINUTE, 5);
            Date effectTime = calendar.getTime();

            /**
             * 延时消息
             */
            XxlMqProducer.produce(new XxlMqMessage(topic, data, effectTime));

        } else if (type == 4) {

            int msgNum = 10000;
            long start = System.currentTimeMillis();
            for (int i = 0; i < msgNum; i++) {
                XxlMqProducer.produce(new XxlMqMessage("topic_1", "No:"+i));
            }
            long end = System.currentTimeMillis();
            return "Cost = " + (end-start);

        } else {
            return "Type Error.";
        }

        return "SUCCESS";
    }

    @ExceptionHandler({Exception.class})
     public String exception(Exception e) {
         e.printStackTrace();
         return e.getMessage();
     }

}

对应的消费者:

import com.xxl.mq.client.consumer.IMqConsumer;
import com.xxl.mq.client.consumer.MqResult;
import com.xxl.mq.client.consumer.annotation.MqConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;

/**
 * Created by xuxueli on 16/8/28.
 */
@MqConsumer(topic = "topic_2")
@Service
public class Demo2MqComsumer implements IMqConsumer {
    private Logger logger = LoggerFactory.getLogger(Demo2MqComsumer.class);

    @Override
    public MqResult consume(String data) throws Exception {
        logger.info("[Demo2MqComsumer] 消费一条消息:{}", data);
        return MqResult.SUCCESS;
    }

}
import com.xxl.mq.client.consumer.IMqConsumer;
import com.xxl.mq.client.consumer.MqResult;
import com.xxl.mq.client.consumer.annotation.MqConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;

/**
 * Created by xuxueli on 16/8/28.
 */
@MqConsumer(topic = "topic_1")
@Service
public class DemoAMqComsumer implements IMqConsumer {
    private Logger logger = LoggerFactory.getLogger(DemoAMqComsumer.class);

    @Override
    public MqResult consume(String data) throws Exception {
        logger.info("[DemoAMqComsumer] 消费一条消息:{}", data);
        return MqResult.SUCCESS;
    }

}
import com.xxl.mq.client.consumer.IMqConsumer;
import com.xxl.mq.client.consumer.MqResult;
import com.xxl.mq.client.consumer.annotation.MqConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;

/**
 * Created by xuxueli on 16/8/28.
 */
@MqConsumer(topic = "topic_1")
@Service
public class DemoBMqComsumer implements IMqConsumer {
    private Logger logger = LoggerFactory.getLogger(DemoBMqComsumer.class);

    @Override
    public MqResult consume(String data) throws Exception {
        logger.info("[DemoBMqComsumer] 消费一条消息:{}", data);
        return MqResult.SUCCESS;
    }

}
import com.xxl.mq.client.consumer.IMqConsumer;
import com.xxl.mq.client.consumer.MqResult;
import com.xxl.mq.client.consumer.annotation.MqConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;

/**
 * Created by xuxueli on 16/8/28.
 */
@MqConsumer(topic = "topic_1", group = MqConsumer.EMPTY_GROUP)
@Service
public class DemoCMqComsumer implements IMqConsumer {
    private Logger logger = LoggerFactory.getLogger(DemoCMqComsumer.class);

    @Override
    public MqResult consume(String data) throws Exception {
        logger.info("[DemoCMqComsumer] 消费一条消息:{}", data);
        return MqResult.SUCCESS;
    }

}

 

posted @ 2019-02-20 13:31 动手的程序员 阅读( ...) 评论( ...) 编辑 收藏
 类似资料: