我错过了什么?
AMQ版本5.13.2 Java 1.8.0\u 74 Windows 10
给定一个简单的测试用例,传输两条Object消息,一条带有数据,另一条是数据结束标记。只有数据结束标记被接收。
队列在作业开始时创建,并在作业完成后销毁。
如果我运行更多的事务,我会看到大约50%的接收率。
日志清楚地显示接收器在第一条消息被放入队列之前就已启动,两条消息都被放入队列,但实际上只有第二条消息被接收。
发送方和接收方都在同一个JVM上。每个都有自己的会话和连接。
连接和队列设置代码:
@Override
public void beforeJob(JobExecution jobExecution) {
// TODO Auto-generated method stub
try {
jobParameters = jobExecution.getJobParameters();
readerConnection = connectionFactory.createConnection();
readerConnection.start();
writerConnection = connectionFactory.createConnection();
writerConnection.start();
jmsQueueManagementSession = writerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
queueName = jobParameters.getString("jobName") + "." + jobExecution.getId();
queue = jmsQueueManagementSession.createQueue(getQueueName());
} catch (JMSException ex) {
throw new MaxisRuntimeException(
MaxisCodeHelperImpl.generateCode("MXAR", MXMODULE, JMS_RECEIVER_INITIALIZATION_ERROR), null);
}
}
发件人设置代码:
@Override
public void beforeStep(StepExecution stepExecution) {
this.stepExecution = stepExecution;
this.setJobExecution(stepExecution.getJobExecution());
try {
this.connection = jmsJobExecutionListener.getWriterConnection();
this.session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
this.messageProducer = session.createProducer(jmsJobExecutionListener.getQueue());
} catch (JMSException ex) {
throw new RuntimeException(ex.getMessage(), ex);
}
}
接收器设置代码:
@Override
@BeforeStep
public void beforeStep(StepExecution stepExecution) {
this.stepExecution = stepExecution;
this.setJobExecution(stepExecution.getJobExecution());
try {
this.connection = jmsJobExecutionListener.getReaderConnection();
this.jmsSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
this.messageConsumer = jmsSession.createConsumer(jmsJobExecutionListener.getQueue());
}
catch (JMSException ex)
{
throw new RuntimeException(ex.getMessage(), ex);
}
}
传输代码:
private void doSomeStuffInTransaction (final Object model) {
transactionTemplate.execute(new TransactionCallbackWithoutResult() {
@Override
protected void doInTransactionWithoutResult(TransactionStatus status) {
try {
doSomeStuff ( model );
ObjectMessage message = session.createObjectMessage(
(model.getRoot() == null)
? null
: model.getRoot().getContents().getId());
messageProducer.send(message);
logger.debug("Sent: {}", message.toString());
}catch (Exception e) {
//use this to rollback exception in case of exception
status.setRollbackOnly();
throw new RuntimeException(e.getmessage(), e);
}
}});
}
接收方代码:
public Object read() throws Exception,
UnexpectedInputException, ParseException,
NonTransientResourceException {
Object result = null;
logger.debug("Attempting to receive message on connection: ", connection.toString());
ObjectMessage msg = (ObjectMessage) messageConsumer.receive();
logger.debug("Received: {}", msg.toString());
result = msg.getObject();
return result;
}
日志剪切:
DEBUG com.mylib.SelectedDataJmsReader - Attempting to receive message on connection:
... snip ...
*** First message ***
DEBUG org.apache.activemq.broker.region.Queue - localhost Message ID:zip-56502-1457640572818-4:2:2:1:1 sent to queue://Stuff via SQL.402
DEBUG org.apache.activemq.broker.region.Queue - queue://Stuff via SQL.402, subscriptions=2, memory=0%, size=1, pending=0 toPageIn: 1, Inflight: 0, pagedInMessages.size 0, pagedInPendingDispatch.size 0, enqueueCount: 1, dequeueCount: 0, memUsage:2214
DEBUG com.maxis.mxmove.core.SelectedDataJmsWriter - Sent: ActiveMQObjectMessage {commandId = 0, responseRequired = false, messageId = ID:zip-56502-1457640572818-4:2:2:1:1, originalDestination = null, originalTransactionId = null, producerId = null, destination = queue://Stuff via SQL.402, transactionId = null, expiration = 0, timestamp = 1457640610215, arrival = 0, brokerInTime = 0, brokerOutTime = 0, correlationId = null, replyTo = null, persistent = true, type = null, priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = org.apache.activemq.util.ByteSequence@78cb27fd, marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size = 0, properties = null, readOnlyProperties = false, readOnlyBody = false, droppable = false, jmsXGroupFirstForConsumer = false}
INFO com.maxis.mxmove.core.SelectedDataJmsWriter - Committed 1 stuff to redo log and JMS queue
*** Second Message ***
INFO com.maxis.mxmove.core.SourceSelectionReaderImpl - Returning empty stuff and end-of-stream placeholder.
DEBUG org.apache.activemq.broker.region.Queue - localhost Message ID:zip-56502-1457640572818-4:2:2:1:2 sent to queue://Stuff via SQL.402
DEBUG org.apache.activemq.broker.region.Queue - queue://Stuff via SQL.402, subscriptions=2, memory=0%, size=2, pending=0 toPageIn: 1, Inflight: 1, pagedInMessages.size 1, pagedInPendingDispatch.size 0, enqueueCount: 2, dequeueCount: 0, memUsage:3155
DEBUG com.maxis.mxmove.core.SelectedDataJmsWriter - Sent: ActiveMQObjectMessage {commandId = 0, responseRequired = false, messageId = ID:zip-56502-1457640572818-4:2:2:1:2, originalDestination = null, originalTransactionId = null, producerId = null, destination = queue://Stuff via SQL.402, transactionId = null, expiration = 0, timestamp = 1457640610375, arrival = 0, brokerInTime = 0, brokerOutTime = 0, correlationId = null, replyTo = null, persistent = true, type = null, priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = null, marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size = 0, properties = null, readOnlyProperties = false, readOnlyBody = false, droppable = false, jmsXGroupFirstForConsumer = false}
INFO com.maxis.mxmove.core.SelectedDataJmsWriter - Committed 1 stuff to redo log and JMS queue
*** We received the last message, not the first. We show two enqueues, and one dequeue.. ***
DEBUG com.maxis.mxmove.core.SelectedDataJmsReader - Received: ActiveMQObjectMessage {commandId = 7, responseRequired = true, messageId = ID:zip-56502-1457640572818-4:2:2:1:2, originalDestination = null, originalTransactionId = null, producerId = ID:zip-56502-1457640572818-4:2:2:1, destination = queue://Stuff via SQL.402, transactionId = null, expiration = 0, timestamp = 1457640610375, arrival = 0, brokerInTime = 1457640610375, brokerOutTime = 1457640610376, correlationId = null, replyTo = null, persistent = true, type = null, priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = null, marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size = 1024, properties = null, readOnlyProperties = true, readOnlyBody = true, droppable = false, jmsXGroupFirstForConsumer = false}
INFO com.maxis.mxmove.core.SelectedDataJmsReader - executed read, found end-of-stream marker, returning null
DEBUG org.apache.activemq.broker.region.Queue - queue://Stuff via SQL.402, subscriptions=2, memory=0%, size=1, pending=0 toPageIn: 0, Inflight: 1, pagedInMessages.size 1, pagedInPendingDispatch.size 0, enqueueCount: 2, dequeueCount: 1, memUsage:1107
在接收器设置代码中,请注意beforeStep()方法是用@beforeStep注释的。我认为这意味着接收器被设置了两次,并且可能进行了预取优化。这是经过验证的,因为日志显示了两个订阅。我不是一个笨重的JMS用户,我有一个错误的印象,一个是收件人,另一个是发件人。
@Override
@BeforeStep
public void beforeStep(StepExecution stepExecution) {
this.stepExecution = stepExecution;
this.setJobExecution(stepExecution.getJobExecution());
删除@BeforeStep注释后,日志仅显示一个订阅
DEBUG org.apache.activemq.broker.region.Queue - queue://Workorders via SQL.408, subscriptions=1, memory=0%, size=1, pending=0 toPageIn: 0, Inflight: 1, pagedInMessages.size 1, pagedInPendingDispatch.size 0, enqueueCount: 1370, dequeueCount: 1369, memUsage:1024
下次有人告诉我你使用注入时代码有多“干净”,我可能会考虑武功交换的应用
版本: SpringBoot: 2.3.12。发布 SpringCloud:Hoxton。SR12 SpringCloud Starter Sleuth: 3.0.3 骆驼: 3.4.6 我想将Sleuth添加到一个预先存在的项目中,该项目现在使用ActiveMQ,以前它使用JMS。当我这样做时,ActiceMQ消息中的值会被阻止/删除(其中一个是“filename”,它是S2请求的键值)。其他J
我有一个Spring JMS应用程序,它有一个JMS侦听器,在应用程序启动时连接到活动的MQ队列。这个JMS侦听器是an应用程序的一部分,该应用程序接收消息,用内容丰富消息,然后将消息传递给同一个ActiveMQ代理上的主题。 SessionTransact设置为true。我没有执行任何数据库事务,所以我没有@Transactional设置。从我所读到的内容来看,SessionTransact属性
我使用ActiveMQ Artemis 2.16.0作为我的代理和作为我的JMS客户端。感觉我随机丢失了一些信息,原因我不知道。我调查了我的Java代码,没有发现任何异常。 我有个方法 方法如下所示: : 如果。如果发送该消息失败(即发生异常),则会再次发送该消息,以此类推最多10次,然后再发送到DLQ。本质上,这是我大部分时间看到的,但在我的日志中的随机时刻,我只看到一个尝试重新传递消息,而在中
spring XML中的jmsTemplate定义: 有人对问题有什么建议吗/关于如何实现延迟消息传递的其他想法?谢了!
我编写了一个非常简单的Flink流媒体作业,它使用从Kafka获取数据。 这工作得很好,每当我在Kafka上将某些内容放入主题时,它都会被我的Flink作业接收并处理。现在我试图看看如果我的Flink作业由于某种原因不在线会发生什么。所以我关闭了flink作业并继续向Kafka发送消息。然后我再次开始我的Flink作业,并期望它会处理同时发送的消息。 然而,我得到了以下信息: 因此,它基本上忽略了
问题内容: 有没有一种方法可以抑制ActiveMQ服务器上定义的队列上的重复消息? 我尝试手动定义JMSMessageID((message.setJMSMessageID(“ uniqueid”)),但是服务器忽略此修改并使用内置的JMSMessageID传递消息。 根据规范,我没有找到有关如何删除邮件重复数据的参考。 在HornetQ中,要解决此问题,我们需要在消息定义中声明HQ特定的属性or