当前位置: 首页 > 知识库问答 >
问题:

用于AWS SQS队列配置的Spring JMS侦听器,具有并发性

庄高谊
2023-03-14

我试图弄清楚如何配置jms侦听器来侦听AWS队列并在多个线程中处理消息(同时大约100个线程)。

下面是我的配置。

@Configuration
@EnableJms
public class JmsConfig {

    @Bean
    public DefaultJmsListenerContainerFactory jmsListenerContainerFactory(AmazonSQS amazonSQS) {
        ProviderConfiguration providerConfiguration = new ProviderConfiguration().withNumberOfMessagesToPrefetch(0);
        SQSConnectionFactory connectionFactory = new SQSConnectionFactory(providerConfiguration, amazonSQS);
        DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        factory.setDestinationResolver(new DynamicDestinationResolver());
        factory.setConcurrency("30-100");
        factory.setSessionAcknowledgeMode(Session.CLIENT_ACKNOWLEDGE);
        factory.setCacheLevel(DefaultMessageListenerContainer.CACHE_CONNECTION);
        factory.setErrorHandler(t -> {
        });
        return factory;
    }

}

随着这种混乱,我不断收到以下错误:

SQSMessage消费者-30秒后无法终止执行器服务消费者预取,一些正在运行的线程将立即关闭

此外,将消息发布到Amazon SQS实例需要20秒。

我尝试了NumberOfMessagesToPrefetch和CacheLevel的不同组合,但都无法正常工作。

CacheLevel=CACHE_CONSUMER没有错误,但一次处理1条消息。

请帮我配置这个。谢谢!

库:

>

  • aws java sdk:1.11.41

    spring jms:5.1.7

    Amazon-sq s-java-消息传递-lib:1.0.6

  • 共有1个答案

    空俊语
    2023-03-14

    @Boris I能够通过以下方式解决此问题。我不设置缓存级别。提供程序配置仍在禁用预取

    <代码>ProviderConfiguration ProviderConfiguration=新ProviderConfiguration()。withNumberOfMessagesToPrefetch(0)

    此外,我使用DefaultJmsListenerContainerFactory,然后设置以下值,以及未显示的其他值:

    <代码>工厂。setConcurrency(“5-10”)//这将取决于您的用例工厂。setSessionAcknowledgeMode(会话.客户端\u确认)//用于飞行中的错误消息工厂。setTaskExecutor(threadPoolTaskExecutor())//使用threadpooltask执行器,请参阅下面的工厂。setMaxMessagesPerTask(10)//不需要,但可以减少线程切换。这将取决于您的用例

    在使用ThreadPool Executor之前,请先阅读ThreadPool Executor,它写于12年前,但仍然有效。我没有在@JmsListener级别上设置并发,因为它会导致额外的复杂性JmsListener并发

     类似资料:
    • 问题内容: 我正在使用EJB 3.1,并且想配置一个MDB来侦听多个队列。 我更喜欢通过XML定义队列名称,而其他通过注释定义。 能做到吗? 问题答案: 实例化后,MDB只能侦听在其目标ActivationConfigProperty中指定的资源,但是您 可以 为同一MDB创建具有不同目标的多个实例(在您的情况下为队列)。 在ejb-jar.xml中创建两个条目,它们具有不同的目的地和ejb-na

    • 要运行Kafka,需要在文件。有两种设置我不理解。 有人可以解释侦听器和广告侦听器属性之间的区别吗? 留档说: 侦听器:套接字服务器侦听的地址。 和 advertised.listeners:主机名和端口代理将向生产者和消费者做广告。 我什么时候必须使用哪个设置?

    • 我有一个关于正确配置kafka侦听器属性的问题-侦听器和advertised.listers。 在我的配置中,我设置了以下道具: 客户端使用 进行连接。我是否需要在侦听器和广告侦听器中具有相同的值。这里 是指向运行 kafka 代理的主机的 dns 记录。 在什么情况下,我希望它们保持不变和不同? 谢谢!

    • 我试图让队列在laravel 5中工作,队列侦听器正在输出: 未定义索引:表 存在"作业"和"failed_jobs"表,config.php设置为"数据库"。 搜索laravel论坛和google都没有找到解决办法,艾米的想法去哪里找?

    • 我已经使用Spring Kafka创建了一个Kafka消费者,并将其部署在云铸造中。该主题有10个分区。我计划将应用程序扩展到10个实例,以便每个实例可以使用来自一个分区的消息。Spring Kafka支持并发消息侦听器容器,我猜它支持从每个分区创建多个线程来使用。例如,如果我有5个消费者实例,每个消费者实例可能有2个线程从分区消耗。因为我计划为每个分区创建一个应用实例,所以使用并发消费者有什么好

    • 兔子配置: 应用概述:每当gitRepository连接到我们的应用程序时,存储库名称就会成为交换名称,在这种情况下,然后存储库的每个分支都会创建自己的队列,这里有两个队列和。现在每次在开发分支中创建拉取请求时,我都需要将信息传递给开发队列,并且应该由特定的侦听器侦听,该侦听器应该仅注册用于开发。我看到了动态队列的示例,但我似乎找不到任何关于如何创建将使用不同线程执行的动态侦听器的示例,我如何实现