当前位置: 首页 > 知识库问答 >
问题:

我想在rabbitMQ中使用@Scheduler注释发送消息,延迟为5秒

唐煜
2023-03-14
package com.example.demo;


import com.fasterxml.jackson.annotation.JsonProperty;

import javax.persistence.Entity;
import javax.persistence.GeneratedValue;
import javax.persistence.GenerationType;
import javax.persistence.Id;
import java.io.Serializable;

@Entity
public class OutBox implements Serializable {
    @Id
    private String id;
    private String aggregrate;
    private String operation;
    private String message;

    public OutBox() {
    }
    public OutBox(@JsonProperty("id") String id, @JsonProperty("aggregrate") String aggregrate, @JsonProperty("operation") String operation, @JsonProperty("message") String message) {
        this.id=id;
        this.aggregrate=aggregrate;
        this.operation=operation;
        this.message=message;
    }

    @Override
    public String toString() {
        return String.format(
                "OutBox{ id='%s', aggregrate='%s', operations='%s', message='%s' }",
                id, aggregrate, operation, message);
    }

    public String getId() {
        return id;
    }

    public void setId(String id) {
        this.id = id;
    }

    public String getAggregrate() {
        return aggregrate;
    }

    public void setAggregrate(String aggregrate) {
        this.aggregrate = aggregrate;
    }

    public String getOperation() {
        return operation;
    }

    public void setOperation(String operation) {
        this.operation = operation;
    }

    public String getMessage() {
        return message;
    }

    public void setMessage(String message) {
        this.message = message;
    }
}
package com.example.demo;

import org.springframework.data.domain.Page;
import org.springframework.data.domain.Pageable;
import org.springframework.data.jpa.repository.JpaRepository;

public interface OutBoxRepository extends JpaRepository<OutBox, String> {
    Page<OutBox> findAll(Pageable pageable);
}
package com.example.demo;

import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableScheduling;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class RabbitMQConfig {
    @Value("${javainuse.rabbitmq.queue}")
    String queueName;

    @Value("${javainuse.rabbitmq.exchange}")
    String exchange;

    @Value("${javainuse.rabbitmq.routingkey}")
    private String routingkey;

    @Bean
    Queue queue() {
        return new Queue(queueName, false);
    }

//    @Bean
//    CustomExchange delayExchange() {
//        Map<String, Object> args = new HashMap<String, Object>();
//        args.put("x-delayed-type", "direct");
//        return new CustomExchange("my-exchange", "x-delayed-message", true, false, args);
//    }

    @Bean
    DirectExchange exchange() {
        return new DirectExchange(exchange);
    }

    @Bean
    Binding binding(Queue queue, DirectExchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with(routingkey);
    }

    @Bean
    public MessageConverter jsonMessageConverter() {
        return new Jackson2JsonMessageConverter();
    }


    public AmqpTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
        final RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        rabbitTemplate.setMessageConverter(jsonMessageConverter());
        return rabbitTemplate;
    }
//    @Bean
//    public AmqpTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
//        final RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
//        rabbitTemplate.setMessageConverter(jsonMessageConverter());
//        return rabbitTemplate;
//    }
}
package com.example.demo;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.utils.SerializationUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;

import java.util.List;
import java.util.Optional;

import org.springframework.context.annotation.Configuration;
import org.springframework.data.domain.Page;
import org.springframework.data.domain.PageRequest;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;

@SpringBootApplication
public class AccessingDataJpaApplication {

    private static final Logger log = LoggerFactory.getLogger(AccessingDataJpaApplication.class);

    @Autowired
    private AmqpTemplate rabbitTemplate;

    @Autowired
    private OutBoxRepository repository;

    @Value("${javainuse.rabbitmq.exchange}")
    private String exchange;

    @Value("${javainuse.rabbitmq.routingkey}")
    private String routingkey;

    public static void main(String[] args) {
        SpringApplication.run(AccessingDataJpaApplication.class);
    }

    @Scheduled(fixedRate=5000)
    public void sendMessage(List<OutBox> message) {

        log.info("Sending message...");
//        rabbitTemplate.convertAndSend(exchange, routingkey,SerializationUtils.deserialize(message));
        for(int i=0;i<message.size();i++)
            rabbitTemplate.convertAndSend(exchange,routingkey, message.get(i));
    }

    @Bean
    public CommandLineRunner demo(OutBoxRepository repository) {

        return (args) -> {
            repository.save(new OutBox("fsks-ghty-eryr-jghd","OO_FLOW_SCHEDULES","UPDATE","{ \"brand\" : \"Mercedes\", \"doors\" : 5 }"));
            repository.save(new OutBox("fsks-bnmb-eryr-jghd","OO_FLOW_ENTITY","UPDATE","{ \"brand\" : \"BMW\", \"doors\" : 7 }"));


            log.info("Customers found with findAll():");
            log.info("--------------PAGE: 0-----------------");


            int count = 0;
            List<OutBox> lst=null;
            for (OutBox outbox : repository.findAll()) {
                Page<OutBox> u = repository.findAll(PageRequest.of(count, 5));
                lst=u.getContent();
                sendMessage(lst);
                log.info(outbox.toString());
                count ++;
            }



            log.info("");//log is to used for printing in console

            Optional<OutBox> outbox = repository.findById("fsks-ghty-eryr-jite"); // L means of type long
            log.info("Customer found with findById(1L):");
            log.info("--------------------------------");
            log.info(outbox.toString());
        };

    }
}
{"id":"fsks-ghty-eryr-jghd","aggregrate":"OO_FLOW_SCHEDULES","operation":"UPDATE","message":"{ \"brand\" : \"Mercedes\", \"doors\" : 5 }"}

共有1个答案

充高扬
2023-03-14

错误是很明显的。

原因:java.lang.IllegalStateException:遇到无效的@seduled方法“send message”:只有non-arg方法可以使用@seduled进行注释

只能安排没有参数的方法。

 类似资料:
  • 如何延迟JMS消息发送或在不确定的时间内继续? 我使用的是Weblogic,正如您所知,在JMS发送之后,接收方将异步处理消息,但是,此时或有时外部资源还没有为接收方做好准备,因此,我想使用一些检查逻辑来延迟发送或处理消息。我猜例如:我将消息放入挂起队列,然后频繁检查资源可用性,一旦发送或继续消息? 大家都知道Weblogic是否支持这一点,或者如何实现它吗?

  • 我需要在一定的持续时间后将消息发送给MessageListener,所以有没有任何方法可以使用SpringAMQP实现。 如。Producer生成消息并将消息发送到RabbitMQ Q,该消息立即被侦听器接收到,我想延迟消费者端接收到的消息,比如说在一些配置参数(比如1000ms)之后

  • null 谁能给我一个向RabbitMQ发送消息的标准程序的例子。我正在使用Spring Boot,也可以使用它的特性。

  • 我试图在spring cloud stream中实现延迟消息(不使用rabbitmq插件),但它不起作用 我使用sping-boot实现了它,它工作得很好。下面是我在sping-boot中做的示例代码。 RabbitMQ中的延迟消息 我正试图在Spring的云流中做同样的事,但没有任何帮助。以下是属性。 输出通道-生产者 输入通道-消费者 当用spring boot实现时,我看到消息在指定的时间内

  • 之前章节定义的SocketIO活动处理函数可以凭借send()函数和emit()函数来连接客户端 接下来的例子是将接收到的消息退回到发送它们的客户端: from flask_socketio import send, emit @socketio.on('message') def handle_message(message): send(message) @socketio.on('

  • 本文向大家介绍RabbitMQ 怎么实现延迟消息队列?相关面试题,主要包含被问及RabbitMQ 怎么实现延迟消息队列?时的应答技巧和注意事项,需要的朋友参考一下 延迟队列的实现有两种方式: 通过消息过期后进入死信交换器,再由交换器转发到延迟消费队列,实现延迟功能; 使用 RabbitMQ-delayed-message-exchange 插件实现延迟功能。