我是rabbitmq的新手,目前正在尝试用非阻塞消费者实现非阻塞生产者。我构建了一些测试生成器,在其中我使用了typereference:
@Service
public class Producer {
@Autowired
private AsyncRabbitTemplate asyncRabbitTemplate;
public <T extends RequestEvent<S>, S> RabbitConverterFuture<S> asyncSendEventAndReceive(final T event) {
return asyncRabbitTemplate.convertSendAndReceiveAsType(QueueConfig.EXCHANGE_NAME, event.getRoutingKey(), event, event.getResponseTypeReference());
}
}
在其他一些地方,在RestController中调用的测试函数
@Autowired
Producer producer;
public void test() throws InterruptedException, ExecutionException {
TestEvent requestEvent = new TestEvent("SOMEDATA");
RabbitConverterFuture<TestResponse> reply = producer.asyncSendEventAndReceive(requestEvent);
log.info("Hello! The Reply is: {}", reply.get());
}
到目前为止,这非常简单,我现在陷入的困境是如何创建一个非阻塞的消费者。我当前的侦听器:
@RabbitListener(queues = QueueConfig.QUEUENAME)
public TestResponse onReceive(TestEvent event) {
Future<TestResponse> replyLater = proccessDataLater(event.getSomeData())
return replyLater.get();
}
据我所知,当使用@RabbitListener时,这个侦听器在它自己的线程中运行。我可以将MessageListener配置为为活动侦听器使用多个线程。因此,使用future.get()阻止侦听器线程并不能阻止应用程序本身。仍然可能存在所有线程现在都被阻止并且新事件卡在队列中的情况,而它们可能不需要。我想做的是只接收事件而不需要立即返回结果。这可能是@RabbitListener无法实现的。类似:
@RabbitListener(queues = QueueConfig.QUEUENAME)
public void onReceive(TestEvent event) {
/*
* Some fictional RabbitMQ API call where i get a ReplyContainer which contains
* the CorrelationID for the event. I can call replyContainer.reply(testResponse) later
* in the code without blocking the listener thread
*/
ReplyContainer replyContainer = AsyncRabbitTemplate.getReplyContainer()
// ProcessDataLater calls reply on the container when done with its action
proccessDataLater(event.getSomeData(), replyContainer);
}
在Spring用rabbitmq实现这种行为的最佳方式是什么?
编辑配置类:
@Configuration
@EnableRabbit
public class RabbitMQConfig implements RabbitListenerConfigurer {
public static final String topicExchangeName = "exchange";
@Bean
TopicExchange exchange() {
return new TopicExchange(topicExchangeName);
}
@Bean
public ConnectionFactory rabbitConnectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
connectionFactory.setHost("localhost");
return connectionFactory;
}
@Bean
public MappingJackson2MessageConverter consumerJackson2MessageConverter() {
return new MappingJackson2MessageConverter();
}
@Bean
public RabbitTemplate rabbitTemplate() {
final RabbitTemplate rabbitTemplate = new RabbitTemplate(rabbitConnectionFactory());
rabbitTemplate.setMessageConverter(producerJackson2MessageConverter());
return rabbitTemplate;
}
@Bean
public AsyncRabbitTemplate asyncRabbitTemplate() {
return new AsyncRabbitTemplate(rabbitTemplate());
}
@Bean
public Jackson2JsonMessageConverter producerJackson2MessageConverter() {
return new Jackson2JsonMessageConverter();
}
@Bean
Queue queue() {
return new Queue("test", false);
}
@Bean
Binding binding() {
return BindingBuilder.bind(queue()).to(exchange()).with("foo.#");
}
@Bean
public SimpleRabbitListenerContainerFactory myRabbitListenerContainerFactory() {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(rabbitConnectionFactory());
factory.setMaxConcurrentConsumers(5);
factory.setMessageConverter(producerJackson2MessageConverter());
factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
return factory;
}
@Override
public void configureRabbitListeners(final RabbitListenerEndpointRegistrar registrar) {
registrar.setContainerFactory(myRabbitListenerContainerFactory());
}
}
我现在没有时间测试它,但像这样的东西应该可以工作;大概您不想丢失消息,所以需要将ackMode设置为MANUAL,然后自己进行ACK(如图所示)。
使现代化
@SpringBootApplication
public class So52173111Application {
private final ExecutorService exec = Executors.newCachedThreadPool();
@Autowired
private RabbitTemplate template;
@Bean
public ApplicationRunner runner(AsyncRabbitTemplate asyncTemplate) {
return args -> {
RabbitConverterFuture<Object> future = asyncTemplate.convertSendAndReceive("foo", "test");
future.addCallback(r -> {
System.out.println("Reply: " + r);
}, t -> {
t.printStackTrace();
});
};
}
@Bean
public AsyncRabbitTemplate asyncTemplate(RabbitTemplate template) {
return new AsyncRabbitTemplate(template);
}
@RabbitListener(queues = "foo")
public void listen(String in, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag,
@Header(AmqpHeaders.CORRELATION_ID) String correlationId,
@Header(AmqpHeaders.REPLY_TO) String replyTo) {
ListenableFuture<String> future = handleInput(in);
future.addCallback(result -> {
Address address = new Address(replyTo);
this.template.convertAndSend(address.getExchangeName(), address.getRoutingKey(), result, m -> {
m.getMessageProperties().setCorrelationId(correlationId);
return m;
});
try {
channel.basicAck(tag, false);
}
catch (IOException e) {
e.printStackTrace();
}
}, t -> {
t.printStackTrace();
});
}
private ListenableFuture<String> handleInput(String in) {
SettableListenableFuture<String> future = new SettableListenableFuture<String>();
exec.execute(() -> {
try {
Thread.sleep(2000);
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
future.set(in.toUpperCase());
});
return future;
}
public static void main(String[] args) {
SpringApplication.run(So52173111Application.class, args);
}
}
问题内容: 我在芹菜中使用Python进行大量的(〜10 / sec)API调用(包括GET,POST,PUT,DELETE)。每个请求大约需要5-10秒才能完成。 我尝试在池中运行芹菜工人,并发数为1000。 由于正在阻塞进程,每个并发连接都在等待一个请求。 如何使异步? 问题答案: 使用eventlet Monkey patching使所有纯python库都无阻塞。 补丁单库 import e
问题内容: 我正在使用PHP从远程服务器下载一个(大)文件,并且此下载是通过单击网页上的下载按钮触发的。 因此,当我单击网页上的按钮时,就会向PHP函数发出请求(带有angulars )。该函数使用触发下载。 同时,我想使用Ajax向我的PHP网站提出其他请求。但是,只要下载正在进行,所有其他Ajax请求都会显示状态。 因此,基本上,下载阻止了对PHP的所有其他请求。有什么办法可以避免这种阻塞?
我有一个使用Spring Boot设计的RestFul Webservice。 web服务相当繁重,因为它必须在启动时进行大量的数据库调用,并且有些端到其他端进行大量的IO操作来提供结果。 我想让Restful Api成为异步的,这样它就可以更有伸缩性,而且它花时间来提供它的结果。 我甚至实现了这一点,但我无法测试这是否是异步的。 如果我想要 如果向url/all发出请求
我想用Tkinter创建一个非阻塞消息窗口。这是为了在另一个函数正在等待答复时显示等待消息。收到回复后,窗口可自动关闭。我设法在网上找到了一些信息,我做了以下工作: 当我将其作为主脚本执行时,这运行良好,但是当我在得到以下错误RuntimeError之后想要使用Tkinter运行另一个gui应用程序时:主线程不在主循环中 另外,当我在App.destroy()之后运行另一段代码时。然后应用程序窗口
当分别尝试锁定一个文件时,为什么Ruby的File#flock不能像预期的那样工作?在块中锁定文件不是解决此问题的正确方法,因为关键是显示锁定持久锁的行为。在块中使用File#flock会在块退出时释放锁,因此无法正确演示问题。 File#flock会以多种方式失败,特别是在请求非阻塞锁时。下面是一些例子。 > 当文件被独占锁定时,请求非阻塞锁会导致无效的参数异常。 留档表示#flock根据loc
是否有一个请求-回复模式应该与spring-cloud-stream一起使用?我在spring-cloud-stream上能找到的所有留档都面向MessageChannel.send即发即弃类型的生产者,我熟悉spring-集成中的@MessagingGateway,但我不确定这将如何与spring-cloud-stream一起使用。当您有一个REST POSTendpoint来保存具有分配标识符