当前位置: 首页 > 知识库问答 >
问题:

使用任务执行器的事务轮询器是否将数据库资源用于队列消息

蒋高扬
2023-03-14

我有一个服务激活器,它使用轮询器从通道中提取消息。通道有一个队列,该队列由数据库的持久存储支持。该轮询器还配置了一个任务执行器,以便为来自通道的消息处理添加一些并发性。

任务执行器配置有队列容量。

由于轮询器从数据库中的通道检索消息,并且这被配置为事务性的,那么如果任务执行器没有更多可用的线程,那么在任务执行器中排队的消息的事务会发生什么。任务执行器上的线程请求是排队的,由于这些消息没有自己的线程,事务会发生什么呢?我假设将提交由轮询器从持久通道存储区移除由任务执行器(在)排队的消息。因此,如果服务器在任务执行器中已排队的运行程序中出现故障,它们将丢失?

由于事务性持久通道队列的思想是确保在服务器发生故障时不会丢失消息,那么在通道数据库支持的队列/存储中,队列消息(在任务执行器中)的事务是如何被处理的呢?

    <bean id="store" class="org.springframework.integration.jdbc.store.JdbcChannelMessageStore">
    <property name="dataSource" ref="channelServerDataSource"/>
    <property name="channelMessageStoreQueryProvider" ref="queryProvider"/>
    <property name="region" value="${user.name}_${channel.queue.region:default}"/>
    <property name="usingIdCache" value="false"/>
</bean> 

<int:transaction-synchronization-factory id="syncFactory">
    <int:after-commit expression="@store.removeFromIdCache(headers.id.toString())" />
    <int:after-rollback expression="@store.removeFromIdCache(headers.id.toString())"/> 
</int:transaction-synchronization-factory>

<int:channel id="transacitonAsyncServiceQueue">
    <int:queue message-store="store"/> 
    <!--  <int:queue/>  --> 
</int:channel>

<bean id="rxPollingTrigger" class="org.springframework.scheduling.support.PeriodicTrigger">
    <constructor-arg value="500"/>
    <constructor-arg value="MILLISECONDS"/>
    <property name = "initialDelay" value = "30000"/> 
    <!-- initialDelay important to ensure channel doesnt start processing before the datasources have been initialised becuase we
         now persist transactions in the queue, at startup (restart) there maybe some ready to go which get processed before the
         connection pools have been created which happens when the servlet is first hit -->
</bean> 

<int:service-activator ref="asyncChannelReceiver" method="processMessage" input-channel="transacitonAsyncServiceQueue">
    <int:poller trigger="rxPollingTrigger" max-messages-per-poll="20"  task-executor="taskExecutor" receive-timeout="400">
        <int:transactional transaction-manager="transactionManagerAsyncChannel" /> 
    </int:poller>
    <int:request-handler-advice-chain>
        <ref bean="databaseSessionContext" />
    </int:request-handler-advice-chain>
</int:service-activator>

<task:executor id="taskExecutor" pool-size="100-200" queue-capacity="200" keep-alive="1" rejection-policy="CALLER_RUNS" />  

共有1个答案

赵英资
2023-03-14

...然后,如果任务执行器没有更多可用线程,那么在任务执行器中排队的消息的事务将发生什么。任务执行器上的线程请求是排队的,由于这些消息没有自己的线程,事务会发生什么呢?

它不是这样工作的--直到任务执行(包括来自通道的receive())之后,事务才开始。

轮询器调度任务,任务启动,事务启动,工作进行,事务提交/回滚。

请参见abstractPollingEndpoint-pollingTask.call()是事务启动的地方。

 类似资料:
  • 我正在学习Executor服务,并试图了解如何与线程池中的线程共享数据列表。我的runnable方法需要从列表中读取数据并进行处理。 池中的所有线程应该只处理一次列表中的不同元素

  • 问题内容: 对于需要快速刷新数据的应用程序,使用JavaScript轮询服务器的最佳实践是什么?我将jQuery用于前端,将Java Spring Framework用于后端。 刷新数据的示例可以是很快(每1秒)更新一次的项目列表。 问题答案: 您可能想使用jQuery的Ajax函数每秒钟左右轮询一次服务器。然后,服务器可以近乎实时地向浏览器发出指令响应。 您也可以考虑使用长轮询而不是上面的方法,

  • 问题内容: 我正在尝试学习AngularJS。我第一次尝试每秒钟获取新数据的工作: 当我通过使线程休眠5秒钟来模拟慢速服务器时,它将等待响应,然后再更新UI和设置另一个超时。问题是当我重写以上内容以使用Angular模块和DI进行模块创建时: 这仅在服务器响应速度很快时才有效。如果有任何延迟,它会在不等待响应的情况下每秒发出1个请求,并且似乎清除了UI。我想我需要使用回调函数。我试过了: 但是出现

  • 运行队列处理器 队列处理器的设置 Laravel 包含一个队列处理器,当新任务被推到队列中时它能处理这些任务。你可以通过 queue:work 命令来运行处理器。要注意,一旦 queue:work 命令开始,它将一直运行,直到你手动停止或者你关闭控制台: php artisan queue:work 可以指定队列处理器所使用的连接。 php artisan queue:work redis 可以自

  • 使用AzureJavaSDK列出资源组中存在的Azure数据库for PostgreSQL服务器的最佳和正确方法是什么?目前,我们有使用ARM模板进行的部署,一旦部署了资源,我们希望能够从Azure本身获取有关这些资源的信息。我尝试过以下方法: 但是返回的列表大小是。 但是,对于虚拟机,我可以通过以下方式获取信息: 那么,使用AzureJavaSDK获取有关的信息的正确方法是什么? P. S.一旦

  • 在代码里事务提交后方法结束,此时数据库是否已经执行了事务?因为事务提交返回了成功说明数据库已经处理了这个事务提交,但是此时数据库实际是否已经有执行完成这个事务?还是代码里事务提交完成后数据库实际并没有执行完成,只是先返回了成功的信息?现在碰到一个情况是方法提交结束后另外一个程序立刻调用存储过程查询数据会出现查不到的情况。