package com.example.demo;
import com.fasterxml.jackson.annotation.JsonProperty;
import javax.persistence.Entity;
import javax.persistence.GeneratedValue;
import javax.persistence.GenerationType;
import javax.persistence.Id;
import java.io.Serializable;
@Entity
public class OutBox implements Serializable {
@Id
private String id;
private String aggregrate;
private String operation;
private String message;
public OutBox() {
}
public OutBox(@JsonProperty("id") String id, @JsonProperty("aggregrate") String aggregrate, @JsonProperty("operation") String operation, @JsonProperty("message") String message) {
this.id=id;
this.aggregrate=aggregrate;
this.operation=operation;
this.message=message;
}
@Override
public String toString() {
return String.format(
"OutBox{ id='%s', aggregrate='%s', operations='%s', message='%s' }",
id, aggregrate, operation, message);
}
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public String getAggregrate() {
return aggregrate;
}
public void setAggregrate(String aggregrate) {
this.aggregrate = aggregrate;
}
public String getOperation() {
return operation;
}
public void setOperation(String operation) {
this.operation = operation;
}
public String getMessage() {
return message;
}
public void setMessage(String message) {
this.message = message;
}
}
package com.example.demo;
import org.springframework.data.domain.Page;
import org.springframework.data.domain.Pageable;
import org.springframework.data.jpa.repository.JpaRepository;
public interface OutBoxRepository extends JpaRepository<OutBox, String> {
Page<OutBox> findAll(Pageable pageable);
}
package com.example.demo;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableScheduling;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class RabbitMQConfig {
@Value("${javainuse.rabbitmq.queue}")
String queueName;
@Value("${javainuse.rabbitmq.exchange}")
String exchange;
@Value("${javainuse.rabbitmq.routingkey}")
private String routingkey;
@Bean
Queue queue() {
return new Queue(queueName, false);
}
// @Bean
// CustomExchange delayExchange() {
// Map<String, Object> args = new HashMap<String, Object>();
// args.put("x-delayed-type", "direct");
// return new CustomExchange("my-exchange", "x-delayed-message", true, false, args);
// }
@Bean
DirectExchange exchange() {
return new DirectExchange(exchange);
}
@Bean
Binding binding(Queue queue, DirectExchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with(routingkey);
}
@Bean
public MessageConverter jsonMessageConverter() {
return new Jackson2JsonMessageConverter();
}
public AmqpTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
final RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setMessageConverter(jsonMessageConverter());
return rabbitTemplate;
}
// @Bean
// public AmqpTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
// final RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
// rabbitTemplate.setMessageConverter(jsonMessageConverter());
// return rabbitTemplate;
// }
}
package com.example.demo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.utils.SerializationUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import java.util.List;
import java.util.Optional;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.domain.Page;
import org.springframework.data.domain.PageRequest;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
@SpringBootApplication
public class AccessingDataJpaApplication {
private static final Logger log = LoggerFactory.getLogger(AccessingDataJpaApplication.class);
@Autowired
private AmqpTemplate rabbitTemplate;
@Autowired
private OutBoxRepository repository;
@Value("${javainuse.rabbitmq.exchange}")
private String exchange;
@Value("${javainuse.rabbitmq.routingkey}")
private String routingkey;
public static void main(String[] args) {
SpringApplication.run(AccessingDataJpaApplication.class);
}
@Scheduled(fixedRate=5000)
public void sendMessage(List<OutBox> message) {
log.info("Sending message...");
// rabbitTemplate.convertAndSend(exchange, routingkey,SerializationUtils.deserialize(message));
for(int i=0;i<message.size();i++)
rabbitTemplate.convertAndSend(exchange,routingkey, message.get(i));
}
@Bean
public CommandLineRunner demo(OutBoxRepository repository) {
return (args) -> {
repository.save(new OutBox("fsks-ghty-eryr-jghd","OO_FLOW_SCHEDULES","UPDATE","{ \"brand\" : \"Mercedes\", \"doors\" : 5 }"));
repository.save(new OutBox("fsks-bnmb-eryr-jghd","OO_FLOW_ENTITY","UPDATE","{ \"brand\" : \"BMW\", \"doors\" : 7 }"));
log.info("Customers found with findAll():");
log.info("--------------PAGE: 0-----------------");
int count = 0;
List<OutBox> lst=null;
for (OutBox outbox : repository.findAll()) {
Page<OutBox> u = repository.findAll(PageRequest.of(count, 5));
lst=u.getContent();
sendMessage(lst);
log.info(outbox.toString());
count ++;
}
log.info("");//log is to used for printing in console
Optional<OutBox> outbox = repository.findById("fsks-ghty-eryr-jite"); // L means of type long
log.info("Customer found with findById(1L):");
log.info("--------------------------------");
log.info(outbox.toString());
};
}
}
{"id":"fsks-ghty-eryr-jghd","aggregrate":"OO_FLOW_SCHEDULES","operation":"UPDATE","message":"{ \"brand\" : \"Mercedes\", \"doors\" : 5 }"}
错误是很明显的。
原因:java.lang.IllegalStateException:遇到无效的@seduled方法“send message”:只有non-arg方法可以使用@seduled进行注释
只能安排没有参数的方法。
如何延迟JMS消息发送或在不确定的时间内继续? 我使用的是Weblogic,正如您所知,在JMS发送之后,接收方将异步处理消息,但是,此时或有时外部资源还没有为接收方做好准备,因此,我想使用一些检查逻辑来延迟发送或处理消息。我猜例如:我将消息放入挂起队列,然后频繁检查资源可用性,一旦发送或继续消息? 大家都知道Weblogic是否支持这一点,或者如何实现它吗?
我需要在一定的持续时间后将消息发送给MessageListener,所以有没有任何方法可以使用SpringAMQP实现。 如。Producer生成消息并将消息发送到RabbitMQ Q,该消息立即被侦听器接收到,我想延迟消费者端接收到的消息,比如说在一些配置参数(比如1000ms)之后
null 谁能给我一个向RabbitMQ发送消息的标准程序的例子。我正在使用Spring Boot,也可以使用它的特性。
我试图在spring cloud stream中实现延迟消息(不使用rabbitmq插件),但它不起作用 我使用sping-boot实现了它,它工作得很好。下面是我在sping-boot中做的示例代码。 RabbitMQ中的延迟消息 我正试图在Spring的云流中做同样的事,但没有任何帮助。以下是属性。 输出通道-生产者 输入通道-消费者 当用spring boot实现时,我看到消息在指定的时间内
之前章节定义的SocketIO活动处理函数可以凭借send()函数和emit()函数来连接客户端 接下来的例子是将接收到的消息退回到发送它们的客户端: from flask_socketio import send, emit @socketio.on('message') def handle_message(message): send(message) @socketio.on('
本文向大家介绍RabbitMQ 怎么实现延迟消息队列?相关面试题,主要包含被问及RabbitMQ 怎么实现延迟消息队列?时的应答技巧和注意事项,需要的朋友参考一下 延迟队列的实现有两种方式: 通过消息过期后进入死信交换器,再由交换器转发到延迟消费队列,实现延迟功能; 使用 RabbitMQ-delayed-message-exchange 插件实现延迟功能。