我遵循了官方的Quarkus消息传递指南,创建了一个简单的示例来体验使用AMQP(Apache Artemis)的反应消息传递特性。
// start consumer side.
curl http://localhost:8080/messages -H "Accept:text/event-stream"
// start sending.
curl http://localhost:8080/messages -d "Hello, Quarkus" -H "Content-Type:text/plain"
// then the consumer exit.
// and sending a message will cause an exception.
2020-10-12 20:18:54,137 WARN [io.net.cha.AbstractChannelHandlerContext] (vert.x-eventloop-thread-6) Failed to mark a promise as failure because it has failed already: DefaultChannelPromise@4bda4836(failure: java.nio.channels.ClosedChannelException), unnotified cause: java.nio.channels.ClosedChannelException
at io.netty.channel.AbstractChannel$AbstractUnsafe.newClosedChannelException(AbstractChannel.java:957)
at io.netty.channel.AbstractChannel$AbstractUnsafe.write(AbstractChannel.java:865)
at io.netty.channel.DefaultChannelPipeline$HeadContext.write(DefaultChannelPipeline.java:1367)
at io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:717)
at io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:764)
at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:790)
at io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:758)
at io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:767)
at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:790)
at io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:758)
at io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:767)
at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:790)
at io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:758)
at io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:767)
at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:790)
at io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:758)
at io.vertx.core.net.impl.ConnectionBase.write(ConnectionBase.java:124)
at io.vertx.core.net.impl.ConnectionBase.lambda$queueForWrite$2(ConnectionBase.java:215)
at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164)
at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:472)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:500)
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.base/java.lang.Thread.run(Thread.java:832)
: io.netty.util.IllegalReferenceCountException: refCnt: 0, decrement: 1
at io.netty.util.internal.ReferenceCountUpdater.toLiveRealRefCnt(ReferenceCountUpdater.java:74)
at io.netty.util.internal.ReferenceCountUpdater.release(ReferenceCountUpdater.java:138)
at io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:100)
at io.netty.handler.codec.http.DefaultHttpContent.release(DefaultHttpContent.java:92)
at io.netty.util.ReferenceCountUtil.release(ReferenceCountUtil.java:88)
at io.netty.channel.AbstractChannel$AbstractUnsafe.write(AbstractChannel.java:867)
at io.netty.channel.DefaultChannelPipeline$HeadContext.write(DefaultChannelPipeline.java:1367)
at io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:717)
at io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:764)
at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:790)
at io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:758)
at io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:767)
at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:790)
at io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:758)
at io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:767)
at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:790)
at io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:758)
at io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:767)
at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:790)
at io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:758)
at io.vertx.core.net.impl.ConnectionBase.write(ConnectionBase.java:124)
at io.vertx.core.net.impl.ConnectionBase.lambda$queueForWrite$2(ConnectionBase.java:215)
at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164)
at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:472)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:500)
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.base/java.lang.Thread.run(Thread.java:832)
如果可以在Apache Artemis中缓存消息,即使没有连接消费者。
Apache ActiveMQ Artemis具有强大而灵活的寻址模型。如果您的场景是点对点消息传递,您可以自定义代理配置来定义地址,即:
<address name="messages">
<anycast>
<queue name="messages" />
</anycast>
</address>
代理将在启动时创建配置中定义的所有地址和队列,因此当发送方发送消息时,它将被路由到一个存在的队列。
我有一个生产者和一个消费者。消费者的多个实例正在运行。当生产者发布消息时,我的意图是通过所有实例消费该消息。所以,我使用的是直接交换。生产者将带有主题的消息发布到直接交换。消费者正在通过独占队列收听该主题。当消费者启动并且生产者发布消息时,此过程运行良好。但是当消费者关闭并且生产者发布消息时,消费者在启动时不会消费此消息。 我在谷歌上搜索了这个问题。建议使用命名队列。但是,如果使用命名队列,则消息
我遵循以下指南https://smallrye.io/smallrye-reactive-messaging/smallrye-reactive-messaging/2/testing/testing.html在没有kafka代理的情况下进行测试。我已设置了以下QuarKustestResource:
我在同一台机器上有两个应用程序实例(尽管它也可能在不同的机器上),两个Tomcat实例具有不同的端口,Apache ActiveMQ嵌入到应用程序中。 我已经配置了一个静态代理网络,这样来自一个实例的消息也可以被所有其他实例使用(每个实例可以是生产者和消费者)。 servlet: 配置: 发布者: 消费者: JNDI:activemq-jndi.properties ActiveMQStartup
我有Kafka server版本2.4,并设置log.retention.hours=168(这样主题中的消息将在7天后被删除)和auto.offset.reset=aresty(这样,如果消费者没有得到最后提交的偏移量,那么应该从一开始就处理它)。而且由于我使用的是Kafka2.4版本,所以默认值为offsets.retention.minutes=10080(因为我没有在应用程序中设置此属性)
我将ActiveMQ与Apache Camel一起使用。现在我遇到了这个问题,在ActiveMQ中有大量挂起的消息。消息处于挂起状态,出列过程非常缓慢。 我的理解正确吗?通常情况下,为了有那么多待处理的消息,每个消费者的调度队列的大小应该已经接近默认的预取限制(即1000)?但每个消费者只有20-80美元? 我对ActiveMq了解不多。那么我应该从哪里了解如何解决这个问题呢? 连接配置01是活动
我们有一个用例,其中我们只创建一个消费者来处理队列中的消息。消息处理器在确认之前积累一定数量的消息。以异步方式接收消息并使用事务会话。消息的大小非常小。 在一定数量的消息之后,主动MQ停止向唯一的消费者发送进一步的消息,并等待确认。我们尝试过像consumer.prefetchSize,consumer . maximumpendingmessagelimit;但是什么都不管用。我们用一个只有一个