springcloud系类代码:spring-boot-autoconfigure-rocketmq-client-groovy

容磊
2023-12-01
  • Groovy 是
    用于Java虚拟机的一种敏捷的动态语言,它是一种成熟的面向对象编程语言,既可以用于面向对象编程,又可以用作纯粹的脚本语言。使用该种语言不必编写过多的代码,同时又具有闭包和动态语言中的其他特性。
  • Groovy是JVM的一个替代语言(替代是指可以用 Groovy 在Java平台上进行 Java 编程),使用方式基本与使用
    Java代码的方式相同,该语言特别适合与Spring的动态语言支持一起使用,设计时充分考虑了Java集成,这使 Groovy 与 Java
    代码的互操作很容易。(注意:不是指Groovy替代java,而是指Groovy和java很好的结合编程。
package programb.abel.queue;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;


@SpringBootApplication
public class Application {
    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }

}

package programb.abel.queue.config;


public class JmsConfig {
    /**
     * Name Server 地址,因为是集群部署 所以有多个用 分号 隔开
     */
    public static final String NAME_SERVER = "10.33.20.223:9876";
    /**
     * 主题名称 主题一般是服务器设置好 而不能在代码里去新建topic( 如果没有创建好,生产者往该主题发送消息 会报找不到topic错误)
     */
    public static final String TOPIC = "stock_change";
}

package programb.abel.queue.controller;

import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;

import programb.abel.queue.config.JmsConfig;
import programb.abel.queue.service.ProducerService;
import programb.abel.queue.service.ReceiverService;

import java.util.ArrayList;
import java.util.List;


@RestController
@RequestMapping("/publisher")
public class PublisherController {
    @Autowired
    private ProducerService producerService;
    @Autowired
    private ReceiverService receiverService;

    private List<String> mesList;

    /**
     * 初始化消息
     */
    public PublisherController() {
        mesList = new ArrayList<>();
        mesList.add("programb");
        mesList.add("programb");
        mesList.add("programb");
        mesList.add("programb");
        mesList.add("programb");

    }

    @RequestMapping("/text/rocketmq")
    public Object callback() throws Exception {
        //总共发送五次消息
        for (String s : mesList) {
            //创建生产信息
            Message message = new Message(JmsConfig.TOPIC, "testtag", ("programb:" + s).getBytes());
            //发送
            SendResult sendResult = producerService.getProducer().send(message);
            System.out.println(String.format("programb={ %s }",sendResult));
        }
        return "成功";
    }
}

package programb.abel.queue.service;

import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.springframework.stereotype.Component;
import org.springframework.stereotype.Service;

import programb.abel.queue.config.JmsConfig;


@Service
@Component
public class ProducerService {
    private String producerGroup = "test_producer";
    private DefaultMQProducer producer;

    public ProducerService(){
        //示例生产者
        producer = new DefaultMQProducer(producerGroup);
        //不开启vip通道 开通口端口会减2
        producer.setVipChannelEnabled(false);
        //绑定name server
        producer.setNamesrvAddr(JmsConfig.NAME_SERVER);
        start();
    }
    /**
     * 对象在使用之前必须要调用一次,只能初始化一次
     */
    public void start(){
        try {
            this.producer.start();
        } catch (MQClientException e) {
            e.printStackTrace();
        }
    }

    public DefaultMQProducer getProducer(){
        return this.producer;
    }
    /**
     * 一般在应用上下文,使用上下文监听器,进行关闭
     */
    public void shutdown(){
        this.producer.shutdown();
    }
}
package programb.abel.queue.service;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.Message;
import org.springframework.stereotype.Service;

import programb.abel.queue.config.JmsConfig;

import java.io.UnsupportedEncodingException;


@Service
public class ReceiverService {
    /**
     * 消费者实体对象
     */
    private DefaultMQPushConsumer consumer;
    /**
     * 消费者组
     */
    public static final String CONSUMER_GROUP = "test_consumer";
    /**
     * 通过构造函数 实例化对象
     */
    public ReceiverService() throws MQClientException {

        consumer = new DefaultMQPushConsumer(CONSUMER_GROUP);
        consumer.setNamesrvAddr(JmsConfig.NAME_SERVER);
        //消费模式:一个新的订阅组第一次启动从队列的最后位置开始消费 后续再启动接着上次消费的进度开始消费
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
        //订阅主题和 标签( * 代表所有标签)下信息
        consumer.subscribe(JmsConfig.TOPIC, "*");
        // //注册消费的监听 并在此监听中消费信息,并返回消费的状态信息
        consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
            // msgs中只收集同一个topic,同一个tag,并且key相同的message
            // 会把不同的消息分别放置到不同的队列中
            try {
                for (Message msg : msgs) {

                    //消费者获取消息 这里只输出 不做后面逻辑处理
                    String body = new String(msg.getBody(), "utf-8");
                    System.out.println(String.format("Consumer-获取消息-主题topic为={ %s }, 消费消息为={ %s }", msg.getTopic(), body));
                }
            } catch (UnsupportedEncodingException e) {
                e.printStackTrace();
                return ConsumeConcurrentlyStatus.RECONSUME_LATER;
            }
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        });

        consumer.start();
        System.out.println("消费者 启动成功=======");
    }
}


## tomcat\u914D\u7F6E
server.port=9666
#server.tomcat.maxHttpHeaderSize=8192
server.tomcat.uri-encoding=UTF-8
spring.http.encoding.charset=UTF-8
spring.http.encoding.enabled=true
spring.http.encoding.force=true
spring.messages.encoding=UTF-8
# tomcat\u6700\u5927\u7EBF\u7A0B\u6570\uFF0C\u9ED8\u8BA4\u4E3A200
server.tomcat.max-threads=800
# session\u6700\u5927\u8D85\u65F6\u65F6\u95F4(\u5206\u949F)\uFF0C\u9ED8\u8BA4\u4E3A30
server.session-timeout=60

## spring \u914D\u7F6E
spring.application.name=rocketmq-queue
application.main=programb.abel.queue.Application

## LOG
logging.file=./logs/rocketmq-queue.log



<?xml version="1.0" encoding="UTF-8"?>
<configuration>
    <contextName>demo</contextName>

    <!-- 控制台输入日志信息 -->
    <appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender">
        <encoder>
            <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</pattern>
        </encoder>
    </appender>

    <appender name="FILE" class="ch.qos.logback.core.rolling.RollingFileAppender">
        <!-- 当日日志归档文件 -->
        <file>${LOG_FILE}</file>
        <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
            <!--过期日志转存的文件名格式 -->
            <FileNamePattern>${LOG_FILE}.%d{yyyy-MM-dd}</FileNamePattern>
            <!-- 日志保留天数 -->
            <MaxHistory>60</MaxHistory>
        </rollingPolicy>
        <layout class="ch.qos.logback.classic.PatternLayout">
            <!--格式化输出:%d表示日期,%thread表示线程名,%-5level:级别从左显示5个字符宽度%msg:日志消息,%n是换行符 -->
            <Pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</Pattern>
        </layout>
    </appender>


    <root level="INFO">
        <appender-ref ref="FILE"/>
        <appender-ref ref="CONSOLE"/>
    </root>

</configuration>
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>programb.abel</groupId>
    <artifactId>springboot-rocketmq</artifactId>
    <version>1.0-SNAPSHOT</version>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.0.4.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>

    <dependencies>
        <!--springboot-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>



        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-pool2</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-autoconfigure</artifactId>
        </dependency>

        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-client</artifactId>
            <version>4.3.0</version>
        </dependency>
        <dependency>
            <groupId>org.codehaus.groovy</groupId>
            <artifactId>groovy</artifactId>
        </dependency>


    </dependencies>
    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-surefire-plugin</artifactId>
                <version>2.18.1</version>
                <configuration>
                    <skipTests>true</skipTests>
                </configuration>
            </plugin>
        </plugins>
    </build>
</project>
 类似资料: