当前位置: 首页 > 知识库问答 >
问题:

我需要将消息保存在rabbit mq中,并在作业成功完成后将其删除

汪才
2023-03-14

我需要在rabbit mq中保存m条消息。我在SimpleMessageListenerContainer中将acknowledgeMode用作手册。这有助于我将值存储在unacked in rabbit mq中。但即使在作业完成后,这些消息仍然未经确认。我需要的邮件被删除后,作业成功完成。请帮我找到解决办法

<beans:bean id="PartitionHandler" class="org.springframework.batch.integration.partition.MessageChannelPartitionHandler" init-method="afterPropertiesSet" scope="job">
        <beans:property name="messagingOperations" ref="messagingTemplate"></beans:property>
        <beans:property name="stepName" value="slave" />
        <beans:property name="gridSize" value="${spring.gridsize}" />
        <beans:property name="pollInterval" value="5000"></beans:property>
        <beans:property name="jobExplorer" ref="jobExplorer"></beans:property>
        <beans:property name="replyChannel" ref="outboundReplies"></beans:property>
    </beans:bean>

    <beans:bean id="PeriodicTrigger" class="org.springframework.scheduling.support.PeriodicTrigger">
        <beans:constructor-arg value="5000"></beans:constructor-arg>
    </beans:bean> 


<beans:bean id="requestQueue" class="org.springframework.amqp.core.Queue">
    <beans:constructor-arg name="name" value="testQueue">
    </beans:constructor-arg>
    <beans:constructor-arg name="durable" value="true">
    </beans:constructor-arg> 
</beans:bean>

<int:poller id="PollerMetadata" default="true" trigger="PeriodicTrigger" task-executor="taskExecutor"></int:poller>  

<beans:bean id="amqptemplate" 
    class="org.springframework.amqp.rabbit.core.RabbitTemplate">
    <beans:property name="connectionFactory" ref="rabbitConnFactory" />
    <beans:property name="routingKey" value="testQueue"/>
    <beans:property name="queue" value="testQueue"/>
</beans:bean>

<beans:bean id="amqpOutboundEndpoint" class="org.springframework.integration.amqp.outbound.AmqpOutboundEndpoint">
    <beans:constructor-arg ref="amqptemplate"/>
    <beans:property name="expectReply" value="false"></beans:property>
    <beans:property name="routingKey" value="testQueue"></beans:property>
    <beans:property name="outputChannel" ref="inboundRequests"></beans:property>
</beans:bean>

<int:service-activator ref="amqpOutboundEndpoint" input-channel="outboundRequests"/>

<beans:bean id="SimpleMessageListenerContainer" class="org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer">
    <beans:constructor-arg ref="rabbitConnFactory"/>
    <beans:property name="queueNames" value="testQueue"></beans:property>
    <beans:property name="autoStartup" value="false"></beans:property> 
    <beans:property name="acknowledgeMode" value="MANUAL"></beans:property>
    <beans:property name="concurrentConsumers" value="5"></beans:property>
</beans:bean>



<beans:bean id="AmqpInboundChannelAdapter" class="org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter"  init-method="afterPropertiesSet">
    <beans:constructor-arg ref="SimpleMessageListenerContainer"/>
    <beans:property name="outputChannel" ref="inboundRequests"></beans:property>
</beans:bean>

<beans:bean id="StepExecutionRequestHandler" class="org.springframework.batch.integration.partition.StepExecutionRequestHandler">
    <beans:property name="jobExplorer" ref="jobExplorer"/>
    <beans:property name="stepLocator" ref="stepLocator"/>
</beans:bean>

<int:service-activator ref="StepExecutionRequestHandler" input-channel="inboundRequests" output-channel="outboundStaging"/>


<bean id="rabbitConnFactory" 
    class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory">
    <constructor-arg><value>localhost</value></constructor-arg>
    <property name="username" value="guest" />
    <property name="password" value="guest" />
    <property name="virtualHost" value="/" />
    <property name="port" value="5672" />
</bean>


<bean id="admin" class="org.springframework.amqp.rabbit.core.RabbitAdmin">
    <constructor-arg ref="rabbitConnFactory" />
</bean>

<bean id="messagingTemplate"
    class="org.springframework.integration.core.MessagingTemplate">
 <constructor-arg ref="outboundRequests" />
  <property name="receiveTimeout" value="60000000"/>
</bean>

<bean id="outboundRequests" class="org.springframework.integration.channel.DirectChannel" >
<property name="maxSubscribers" value="5"></property>
</bean>


<int:channel id="outboundReplies" scope="job"><int:queue/></int:channel>

<bean id="outboundStaging" class="org.springframework.integration.channel.NullChannel"></bean>

<bean id="inboundRequests" class="org.springframework.integration.channel.QueueChannel"></bean>

<bean id="stepLocator" class="org.springframework.batch.integration.partition.BeanFactoryStepLocator"/>

共有1个答案

邵耀
2023-03-14

使用手动确认时,您负责确认。

看我对这个问题的回答。

 类似资料:
  • 我有一项服务,每天在Kubernetes上部署数千个短期工作。我试图让Kubernetes在完成后使用这里描述的功能删除这些作业: https://kubernetes.io/docs/concepts/workloads/controllers/jobs-run-to-completion/#clean-up-finished-jobs-automatically 作业完成,但在表示的时间限制之

  • 我是RabbitMQ新手,我使用教程尝试了RabbitMQ示例https://www.rabbitmq.com/getstarted.html. 有人能解释一下如何按设置发布消息,而不是一次发布一条消息吗?此外,如何订阅消息从队列作为一个集合,而不是单一的消息一次。 也就是说,我们如何在RabbitMQ中实现发布和使用队列消息作为作业集? 提前谢谢。

  • 我在Thread上运行flink作业,我们使用命令行中的“fink run”将作业提交给Thread,有一天我们在flink作业上出现异常,因为我们没有启用flink重启策略,所以它只是失败了,但最终我们从Thread应用程序列表中发现作业状态为“成功”,我们预期为“失败”。 Flink CLI日志: Flink作业管理器日志: 有谁能帮我理解为什么塞恩说我的Flink工作是“成功的”?

  • 问题内容: 我使用Ant通过Jenkins启动/关闭JBoss 5服务器。Ant java的spawn和fork设置为“ true”,因此命令在后台执行。 Jenkins成功启动服务器,等待两分钟(Jenkins中的“ sleep”命令),然后在睡眠后由于某种奇怪的原因关闭服务器。sleep命令是构建作业的最后一步。关闭显示: 我用谷歌搜索并尝试了建议的-Xrs命令,但没有帮助。这是怎么回事 问题

  • 在主页上,我有一个按钮,用activeform调用引导模式。在提交此表单时,在我的数据库中添加新行。但是我看不出一个成功的消息,无论是在这个模态还是在主页上。取而代之的是,我进入mysite.ru/category/create页,上面有白色屏幕和对话框消息的文本(“对话框内容在这里......”)。如何不重定向用户在其他页面?只需在主页上关闭模态和/或重定向,并在此处显示关于成功添加行的闪存信息