当前位置: 首页 > 知识库问答 >
问题:

使用AMQP的Azure服务总线事务

公孙高轩
2023-03-14

我尝试将Azure服务总线与ApacheQPID和Spring与事务集成。

但Azure服务总线AMQP实现似乎不支持事务。这是真的吗?我没有找到相关信息。

这是我的JMS配置

<bean id="serviceBusConnectionFactory" class="org.apache.qpid.jms.JmsConnectionFactory">
    <constructor-arg value="amqps://${serviceBus.host}?amqp.idleTimeout=1200000"/>
    <property name="clientID" value="${serviceBus.clientId}"/>
    <property name="username" value="${serviceBus.sharedAccessPolicyName}"/>
    <property name="password" value="${serviceBus.sharedAccessPolicyKey}"/>
    <property name="receiveLocalOnly" value="true"/>
    <property name="receiveNoWaitLocalOnly" value="true"/>
</bean>
<bean id="jmsCachingConnectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
    <property name="targetConnectionFactory" ref="serviceBusConnectionFactory" />
</bean>

这是我的spring集成片段:

<int-jms:inbound-channel-adapter id="resOrdJmsIn"
                                 destination-name="${serviceBus.destination-name}"
                                 channel="resOrdIncoming"
                                 connection-factory="jmsCachingConnectionFactory"
                                 acknowledge="client"
                                 session-transacted="true"   >
    <int:poller fixed-rate="1000"/>
</int-jms:inbound-channel-adapter>

它与session transact=“false”配合使用,但与session transact=“true”配合使用时会产生错误:

2017-07-03 10:06:27.237 ERROR 21575 --- [ask-scheduler-2] o.s.integration.handler.LoggingHandler   : org.springframework.jms.UncategorizedJmsException: Uncategorized exception occurred during JMS processing; nested exception is javax.jms.JMSException: An AMQP error occurred (condition='amqp:internal-error'). [condition = amqp:internal-error]
    at org.springframework.jms.support.JmsUtils.convertJmsAccessException(JmsUtils.java:316)
    at org.springframework.jms.support.JmsAccessor.convertJmsAccessException(JmsAccessor.java:169)
    at org.springframework.jms.core.JmsTemplate.execute(JmsTemplate.java:487)
    at org.springframework.jms.core.JmsTemplate.receiveSelected(JmsTemplate.java:754)
    at org.springframework.integration.jms.JmsDestinationPollingSource.doReceiveJmsMessage(JmsDestinationPollingSource.java:138)
    at org.springframework.integration.jms.JmsDestinationPollingSource.receive(JmsDestinationPollingSource.java:111)
    at org.springframework.integration.endpoint.SourcePollingChannelAdapter.receiveMessage(SourcePollingChannelAdapter.java:224)
    at org.springframework.integration.endpoint.AbstractPollingEndpoint.doPoll(AbstractPollingEndpoint.java:245)
    at org.springframework.integration.endpoint.AbstractPollingEndpoint.access$000(AbstractPollingEndpoint.java:58)
    at org.springframework.integration.endpoint.AbstractPollingEndpoint$1.call(AbstractPollingEndpoint.java:190)
    at org.springframework.integration.endpoint.AbstractPollingEndpoint$1.call(AbstractPollingEndpoint.java:186)
    at org.springframework.integration.endpoint.AbstractPollingEndpoint$Poller$1.run(AbstractPollingEndpoint.java:353)
    at org.springframework.integration.util.ErrorHandlingTaskExecutor$1.run(ErrorHandlingTaskExecutor.java:55)
    at org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:50)
    at org.springframework.integration.util.ErrorHandlingTaskExecutor.execute(ErrorHandlingTaskExecutor.java:51)
    at org.springframework.integration.endpoint.AbstractPollingEndpoint$Poller.run(AbstractPollingEndpoint.java:344)
    at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54)
    at org.springframework.scheduling.concurrent.ReschedulingRunnable.run(ReschedulingRunnable.java:81)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:748)
Caused by: javax.jms.JMSException: An AMQP error occurred (condition='amqp:internal-error'). [condition = amqp:internal-error]
    at org.apache.qpid.jms.provider.amqp.AmqpSupport.convertToException(AmqpSupport.java:148)
    at org.apache.qpid.jms.provider.amqp.AmqpSupport.convertToException(AmqpSupport.java:103)
    at org.apache.qpid.jms.provider.amqp.builders.AmqpResourceBuilder.handleClosed(AmqpResourceBuilder.java:167)
    at org.apache.qpid.jms.provider.amqp.builders.AmqpResourceBuilder.processRemoteClose(AmqpResourceBuilder.java:113)
    at org.apache.qpid.jms.provider.amqp.AmqpProvider.processUpdates(AmqpProvider.java:795)
    at org.apache.qpid.jms.provider.amqp.AmqpProvider.access$1900(AmqpProvider.java:92)
    at org.apache.qpid.jms.provider.amqp.AmqpProvider$17.run(AmqpProvider.java:699)
    ... 7 more

QPID跟踪

2017-07-03 10:37:29.262 TRACE 24726 --- [us.windows.net]] o.a.qpid.jms.provider.amqp.AmqpProvider  : New Proton Event: LINK_FINAL
2017-07-03 10:37:29.262 TRACE 24726 --- [us.windows.net]] o.a.qpid.jms.provider.amqp.AmqpProvider  : New Proton Event: SESSION_INIT
2017-07-03 10:37:29.262 TRACE 24726 --- [us.windows.net]] o.a.qpid.jms.provider.amqp.AmqpProvider  : New Proton Event: SESSION_LOCAL_OPEN
2017-07-03 10:37:29.262 TRACE 24726 --- [us.windows.net]] o.a.qpid.jms.provider.amqp.AmqpProvider  : New Proton Event: SESSION_REMOTE_OPEN
2017-07-03 10:37:29.262 TRACE 24726 --- [us.windows.net]] o.apache.qpid.jms.provider.amqp.FRAMES   : SENT: Attach{name='qpid-jms:coordinator:ID:fe87e12c-1fdc-4e7c-8cf1-11861b90de19:1:5', handle=0, role=SENDER, sndSettleMode=UNSETTLED, rcvSettleMode=FIRST, source=Source{address='null', durable=NONE, expiryPolicy=SESSION_END, timeout=0, dynamic=false, dynamicNodeProperties=null, distributionMode=null, filter=null, defaultOutcome=null, outcomes=null, capabilities=null}, target=Coordinator{capabilities=[amqp:local-transactions]}, unsettled=null, incompleteUnsettled=false, initialDeliveryCount=0, maxMessageSize=null, offeredCapabilities=null, desiredCapabilities=null, properties=null}
2017-07-03 10:37:29.262 TRACE 24726 --- [us.windows.net]] o.a.q.j.t.netty.NettyTcpTransport        : Attempted write of: 127 bytes
2017-07-03 10:37:29.328 TRACE 24726 --- [ntLoopGroup-2-1] o.a.q.j.t.netty.NettyTcpTransport        : New data read: 103 bytes incoming: UnpooledHeapByteBuf(ridx: 0, widx: 103, cap: 165)
2017-07-03 10:37:29.329 DEBUG 24726 --- [us.windows.net]] o.a.qpid.proton.engine.impl.SaslImpl     : SaslImpl [_outcome=PN_SASL_OK, state=PN_SASL_PASS, done=true, role=CLIENT] about to call plain input
2017-07-03 10:37:29.329 TRACE 24726 --- [us.windows.net]] o.apache.qpid.jms.provider.amqp.FRAMES   : RECV: Attach{name='qpid-jms:coordinator:ID:fe87e12c-1fdc-4e7c-8cf1-11861b90de19:1:5', handle=0, role=RECEIVER, sndSettleMode=UNSETTLED, rcvSettleMode=FIRST, source=null, target=null, unsettled=null, incompleteUnsettled=false, initialDeliveryCount=null, maxMessageSize=18446744073709551615, offeredCapabilities=null, desiredCapabilities=null, properties=null}
2017-07-03 10:37:29.329 TRACE 24726 --- [us.windows.net]] o.a.qpid.jms.provider.amqp.AmqpProvider  : New Proton Event: LINK_INIT
2017-07-03 10:37:29.329 TRACE 24726 --- [us.windows.net]] o.a.qpid.jms.provider.amqp.AmqpProvider  : New Proton Event: LINK_LOCAL_OPEN
2017-07-03 10:37:29.329 TRACE 24726 --- [us.windows.net]] o.a.qpid.jms.provider.amqp.AmqpProvider  : New Proton Event: LINK_REMOTE_OPEN
2017-07-03 10:37:29.432 TRACE 24726 --- [ntLoopGroup-2-1] o.a.q.j.t.netty.NettyTcpTransport        : New data read: 103 bytes incoming: UnpooledHeapByteBuf(ridx: 0, widx: 103, cap: 165)
2017-07-03 10:37:29.433 DEBUG 24726 --- [us.windows.net]] o.a.qpid.proton.engine.impl.SaslImpl     : SaslImpl [_outcome=PN_SASL_OK, state=PN_SASL_PASS, done=true, role=CLIENT] about to call plain input

共有1个答案

段兴为
2023-03-14

Spring集成代码段,通过JMS与Azure Service Bus协同工作,由AMPQ以事务方式支持:

  <int-jms:message-driven-channel-adapter id="jmsIn"
                                            destination-name="${serviceBus.destination-name}"
                                            connection-factory="jmsCachingConnectionFactory"
                                            acknowledge="client"
                                            channel="channel-si"/>
 类似资料:
  • 我使用此url中的示例http://azure.microsoft.com/en-us/documentation/articles/service-bus-java-how-to-use-jms-api-amqp/.我有两个问题: 1、ACS或SAS 连接URL, 用户名和密码来自ACS身份验证,但Azure服务总线已将其身份验证从ACS更改为SAS。它是否也支持SAS身份验证?就像用户名是SA

  • 我有一个名为“状态更改”的Azure服务总线主题,它有一个名为“混响”的订阅。我正在尝试使用设置订阅主题的方法,但出现错误: 我一直在使用这篇博客文章来尝试让一切正常运行:http://ramblingstechnical.blogspot.co.uk/p/using-azure-service-bus-with-spring-jms.html 我可以使用向主题添加消息,并使用Azure文档中概述

  • 我正在开发一个客户端,它可以从Windows服务总线读取消息,该消息是使用发送的。净额。客户端是使用Java开发的,据我所知,它创建会话,但当它创建会话时,使用者抛出一个JMSException,它只告诉我以下消息:amqp:不允许 有线索吗? 顺致敬意,

  • 除了用户名和密码(例如来自ACS的令牌)之外,是否还有其他使用AMQP对Azure服务总线进行授权的方法? 在我的场景中,我希望能够在不公开凭据的情况下为资源级客户端提供对服务总线的访问权限。

  • 在如何将AMQP 1.0与服务总线一起使用后,我正在尝试使用AMQP连接到Azure服务总线队列“queue1”。NET API。 在azure管理门户中,我可以看到队列“queue1”已成功创建,如果我省略连接字符串扩展名“TransportType=Amqp”,则连接确实有效。但是,一旦我想通过将属性“TransportType”添加到连接字符串来使用amqp,我就会收到目标计算机(因此是az

  • 很多关于Service Bus的资料都提到它使用AMQP 1.0。然而,我发现的示例使用了隐藏AMQP层的库,因此不清楚如何使用通用AMQP 1.0客户端并执行此服务总线示例中列出的操作 在几种情况下,能够使用独立的AMQP客户端而不是提供的更高级别客户端可能很重要。例如,提供的客户端可能在某些平台上不可用,或者通用客户端可能为特定问题提供更合适的工作方式。 可能相关问题