我有一个Spring集成应用程序,附加到AMQP经纪人。
我想从amqp队列接收消息,并更新数据库记录。
为了提高性能,我有一组工作人员允许同时进行多个更新。
我有以下配置:
<int-amqp:inbound-channel-adapter queue-names="pricehub.fixtures.priceUpdates.queue"
channel="pricehub.fixtures.priceUpdates.channel"
message-converter="jsonMessageConverter"/>
<int:channel id="pricehub.fixtures.priceUpdates.channel">
<int:queue />
</int:channel>
<int:service-activator ref="updatePriceAction"
method="updatePrices"
input-channel="pricehub.instruments.priceUpdates.channel">
<int:poller fixed-delay="50" time-unit="MILLISECONDS" task-executor="taskExecutor" />
</int:service-activator>
<task:executor id="taskExecutor" pool-size="5-50" keep-alive="120" queue-capacity="500"/>
如果我开始运行此程序,而AMQP通道上没有任何入站消息要处理,则我很快会看到thredpool耗尽,并开始拒绝。
这是日志:
[Thu Apr 2013 23:41:51.153] DEBUG [] (org.springframework.amqp.rabbit.listener.BlockingQueueConsumer:185) - Retrieving delivery for Consumer: tag=[amq.ctag-w4qPp60jVEQOIEovR4cERv], channel=Cached Rabbit Channel: AMQChannel(amqp://guest@127.0.0.1:5672/,1), acknowledgeMode=AUTO local queue size=0
[Thu Apr 2013 23:41:51.160] DEBUG [] (org.springframework.amqp.rabbit.listener.BlockingQueueConsumer:185) - Retrieving delivery for Consumer: tag=[amq.ctag-Q3Lq4R9g9E8WBNVLYzaFmq], channel=Cached Rabbit Channel: AMQChannel(amqp://guest@127.0.0.1:5672/,2), acknowledgeMode=AUTO local queue size=0
[Thu Apr 2013 23:41:51.166] DEBUG [] (org.springframework.amqp.rabbit.listener.BlockingQueueConsumer:185) - Retrieving delivery for Consumer: tag=[amq.ctag-w8bg7ltEV2mot8QXDPCmfK], channel=Cached Rabbit Channel: AMQChannel(amqp://guest@127.0.0.1:5672/,3), acknowledgeMode=AUTO local queue size=0
[Thu Apr 2013 23:41:51.170] DEBUG [] (org.springframework.amqp.rabbit.listener.BlockingQueueConsumer:185) - Retrieving delivery for Consumer: tag=[amq.ctag-A-0KdqhFjpc-Hvjmv7aZAc], channel=Cached Rabbit Channel: AMQChannel(amqp://guest@127.0.0.1:5672/,4), acknowledgeMode=AUTO local queue size=0
[Thu Apr 2013 23:41:51.180] DEBUG [] (org.springframework.integration.endpoint.PollingConsumer:71) - Received no Message during the poll, returning 'false'
[Thu Apr 2013 23:41:51.180] DEBUG [] (org.springframework.integration.endpoint.PollingConsumer:71) - Received no Message during the poll, returning 'false'
[Thu Apr 2013 23:41:51.199] DEBUG [] (org.springframework.integration.endpoint.PollingConsumer:71) - Received no Message during the poll, returning 'false'
[Thu Apr 2013 23:41:51.200] DEBUG [] (org.springframework.integration.endpoint.PollingConsumer:71) - Received no Message during the poll, returning 'false'
[Thu Apr 2013 23:41:51.220] DEBUG [] (org.springframework.integration.endpoint.PollingConsumer:71) - Received no Message during the poll, returning 'false'
很快,线程池开始拒绝执行:
[Thu Apr 2013 23:47:15.363] ERROR [] (org.springframework.integration.handler.LoggingHandler:126) - org.springframework.core.task.TaskRejectedException: Executor [java.util.concurrent.ThreadPoolExecutor@6ff3cb0e] did not accept task: org.springframework.integration.util.ErrorHandlingTaskExecutor$1@78615c8b
at org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor.execute(ThreadPoolTaskExecutor.java:244)
at org.springframework.integration.util.ErrorHandlingTaskExecutor.execute(ErrorHandlingTaskExecutor.java:49)
at org.springframework.integration.endpoint.AbstractPollingEndpoint$Poller.run(AbstractPollingEndpoint.java:231)
at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:53)
at org.springframework.scheduling.concurrent.ReschedulingRunnable.run(ReschedulingRunnable.java:81)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:439)
at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
at java.util.concurrent.FutureTask.run(FutureTask.java:138)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:98)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:206)
at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
at java.lang.Thread.run(Thread.java:680)
Caused by: java.util.concurrent.RejectedExecutionException
at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:1768)
at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:767)
at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:658)
at org.springframework.sched
uling.concurrent.ThreadPoolTaskExecutor.execute(ThreadPoolTaskExecutor.java:241)…还有12个
我怀疑罪魁祸首就在这里:BlockingQueueConsumer
-表示每次对消息的轮询都会阻塞线程,直到消息到达为止…导致线程池迅速耗尽。
什么是正确的配置方式?
QueueChannel
为什么不简单地增加concurrent-consumers
入站适配器上的属性,而不是使用and轮询器?
<xsd:attribute name="concurrent-consumers" type="xsd:string">
<xsd:annotation>
<xsd:documentation>
Specify the number of concurrent consumers to create. Default is 1.
Raising the number of concurrent consumers is recommended in order to scale the consumption of messages coming in
from a queue. However, note that any ordering guarantees are lost once multiple consumers are registered. In
general, stick with 1 consumer for low-volume queues.
</xsd:documentation>
</xsd:annotation>
</xsd:attribute>
并且,删除<queue/>
和<poller/>
。
另外,我总是建议在日志中包含线程名称(%t
对于log4J);它使调试线程问题变得更加容易。
编辑:
使用轮询器时,线程不足的原因是轮询器的默认receive- timeout
值为1秒。您每50ms调度一个线程,但是每个线程等待QueueChannel
1秒。最终,您的任务队列已满。
为避免这种情况,如果希望继续使用此技术,只需将设置receive- timeout
为0
on,<poller/>
但是在适配器中使用更高的并发效率更高,因为没有轮询或切换到另一个线程。
我发现JVM只有一个线程池用于并行处理流。我们在一个大的流上有一个I/O阻塞的函数,这导致了与不相关的并行流一起使用的不相关的或者快速的函数的活跃度问题。 stream上没有允许使用备用线程池的方法。 有没有一种简单的方法来避免这个问题,也许是以某种方式指定要使用哪个线程池?
Nuttx工作线程(LP和HP)具有轮询间隔,仅针对工作线程0。我想知道为什么需要轮询间隔? 当有人将一个新工作排入工作队列时,将向一个辅助线程发出处理它的信号。如果所有工作线程都忙,则当线程完成当前工作并再次检查队列时,将处理排队的工作。 与sched_garbage_collection()工作一样,工作线程由sched_signal_free()发出信号。 那么当需要轮询间隔时会是什么情况呢
问题 你有一个线程队列集合,想为到来的元素轮询它们, 就跟你为一个客户端请求去轮询一个网络连接集合的方式一样。 解决方案 对于轮询问题的一个常见解决方案中有个很少有人知道的技巧,包含了一个隐藏的回路网络连接。 本质上讲其思想就是:对于每个你想要轮询的队列,你创建一对连接的套接字。 然后你在其中一个套接字上面编写代码来标识存在的数据, 另外一个套接字被传给 select() 或类似的一个轮询数据到达
在我的主流程中,我有一个线程池ExecutorService,我用我调用的“已知”数量的可调用项来填充它。 此外,还有另一个ExecutorService对象称为“全球池”(我认为ExecutorService是线程安全的,我可以从不同的线程向其添加任务)。 现在,上面的每个Callable都会产生新的任务,并将它们(提交)到这个共享的“全球池”。 问题是,我无法知道(无法阻止)所有任务何时完成,
我正计划创建可调整队列大小的可调整线程池。我正在使用unbounded LinkedBlockingQueue和一个外部设置,该设置控制排队的消息数量。最初,my corepoolsize和maxpoolsize是相等的。现在,如果我想在运行时更新我的线程池大小,我通过一个公共设置将corepoolsize和maxpoolsize设置为不同的值。我想知道你对这种做法有什么看法。 当maxpools
我在项目中使用ApacheTomcat JDBC连接池。我很困惑,因为在重负下,我一直看到以下错误: 我的期望是,使用池,新连接的请求将被保留在队列中,直到连接可用。相反,当池达到容量时,请求似乎会被拒绝。这种行为可以改变吗? 谢谢, 达尔 这是我的池配置: