当前位置: 首页 > 知识库问答 >
问题:

使用Spring Boot和AMQP订阅Azure服务总线主题

哈雅珺
2023-03-14

我有一个名为“状态更改”的Azure服务总线主题,它有一个名为“混响”的订阅。我正在尝试使用@JmsListener设置订阅主题的方法,但出现错误:

2017-03-22 18:34:41.049  WARN 23356 --- [enerContainer-6] o.s.j.l.DefaultMessageListenerContainer  : Setup of JMS message listener invoker failed for destination 'state-changed' - trying to recover. Cause: The messaging entity 'sb://[MySERVICEBUS].servicebus.windows.net/state-changed' could not be found. TrackingId:d2b442f79e0f44bdb449861ea57155ce_G44, SystemTracker:gateway6, Timestamp:3/22/2017 6:34:37 PM

javax.jms.JMSException: The messaging entity 'sb://[MySERVICEBUS].servicebus.windows.net/state-changed' could not be found. TrackingId:d2b442f79e0f44bdb449861ea57155ce_G44, SystemTracker:gateway6, Timestamp:3/22/2017 6:34:37 PM
    at org.apache.qpid.amqp_1_0.jms.impl.TopicSubscriberImpl.createClientReceiver(TopicSubscriberImpl.java:111) ~[qpid-amqp-1-0-client-jms-0.32.jar:0.32]
    at org.apache.qpid.amqp_1_0.jms.impl.MessageConsumerImpl.<init>(MessageConsumerImpl.java:129) ~[qpid-amqp-1-0-client-jms-0.32.jar:0.32]
    at org.apache.qpid.amqp_1_0.jms.impl.TopicSubscriberImpl.<init>(TopicSubscriberImpl.java:46) ~[qpid-amqp-1-0-client-jms-0.32.jar:0.32]
    at org.apache.qpid.amqp_1_0.jms.impl.SessionImpl.createDurableSubscriber(SessionImpl.java:544) ~[qpid-amqp-1-0-client-jms-0.32.jar:0.32]
    at org.apache.qpid.amqp_1_0.jms.impl.SessionImpl.createDurableSubscriber(SessionImpl.java:59) ~[qpid-amqp-1-0-client-jms-0.32.jar:0.32]
    at org.springframework.jms.listener.AbstractMessageListenerContainer.createConsumer(AbstractMessageListenerContainer.java:870) ~[spring-jms-4.3.6.RELEASE.jar:4.3.6.RELEASE]
    at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.createListenerConsumer(AbstractPollingMessageListenerContainer.java:215) ~[spring-jms-4.3.6.RELEASE.jar:4.3.6.RELEASE]
    at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.initResourcesIfNecessary(DefaultMessageListenerContainer.java:1189) ~[spring-jms-4.3.6.RELEASE.jar:4.3.6.RELEASE]
    at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.invokeListener(DefaultMessageListenerContainer.java:1165) ~[spring-jms-4.3.6.RELEASE.jar:4.3.6.RELEASE]
    at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.executeOngoingLoop(DefaultMessageListenerContainer.java:1158) ~[spring-jms-4.3.6.RELEASE.jar:4.3.6.RELEASE]
    at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.run(DefaultMessageListenerContainer.java:1055) ~[spring-jms-4.3.6.RELEASE.jar:4.3.6.RELEASE]
    at java.lang.Thread.run(Unknown Source) [na:1.8.0_77]

我一直在使用这篇博客文章来尝试让一切正常运行:http://ramblingstechnical.blogspot.co.uk/p/using-azure-service-bus-with-spring-jms.html

我可以使用JmsTemplate向主题添加消息,并使用Azure文档中概述的普通Java JMS库从主题中读取消息:https://docs.microsoft.com/en-us/azure/service-bus-messaging/service-bus-java-how-to-use-jms-api-amqp所以我知道我的主题是可行的,并且是可访问的,当我用Spring配置它时,似乎我做错了什么。

我的配置看起来像:

@Configuration
public class JmsConfiguration
{

    @Bean
    public JmsListenerContainerFactory topicJmsListenerContainerFactory() throws NamingException
    {
        DefaultJmsListenerContainerFactory returnValue = new DefaultJmsListenerContainerFactory();

        Context context = context();
        ConnectionFactory cf = connectionFactory(context);

        returnValue.setConnectionFactory(cf);
        returnValue.setSubscriptionDurable(Boolean.TRUE);
        return returnValue;
    }

    private Context context() throws NamingException
    {
        Hashtable<String, String> env = new Hashtable<String, String>();
        env.put(Context.INITIAL_CONTEXT_FACTORY, "org.apache.qpid.amqp_1_0.jms.jndi.PropertiesFileInitialContextFactory");
        env.put(Context.PROVIDER_URL, "src/main/resources/servicebus.properties");
        Context context = new InitialContext(env);
        return context;
    }



    /**
     * @param context
     * @return
     * @throws NamingException
     */
    private ConnectionFactory connectionFactory(Context context) throws NamingException
    {
        ConnectionFactory cf = (ConnectionFactory) context.lookup("SBCF");
        return cf;
    }

}

服务总线。属性(已编辑用户名和密码等):

# servicebus.properties - sample JNDI configuration

# Register a ConnectionFactory in JNDI using the form:
# connectionfactory.[jndi_name] = [ConnectionURL]
connectionfactory.SBCF=amqps://[USER]:[PASSWORD]@[MYSERVICEBUS]

# Register some queues in JNDI using the form
# queue.[jndi_name] = [physical_name]
# topic.[jndi_name] = [physical_name]
queue.workflow = workflow
topic.state-changed = stage-changed

最后是我的听众课:

@Component
public class TestListener
{
    Logger logger = LoggerFactory.getLogger(LoggingWorkflowEventHandler.class);

    @JmsListener(destination = "state-changed", containerFactory = "topicJmsListenerContainerFactory", subscription = "reverb")
    public void onMessage(String message)
    {
        logger.info("Received message from topic: {}", message);
    }
}

如果有人设法让这个工作,我会感谢一些建议。

共有3个答案

薛修能
2023-03-14

创建或更改委托人:

当我们想要建立到服务总线的安全amqps连接时,我们需要将所有必需的SSL证书存储在信任库中。由于现有证书中似乎没有一个包含所需的证书,我(为了透明度)创建了一个新的证书,如下所示:

通过访问https获取所需的证书://

保存当前证书:

当要求导出格式时,选择“DER二进制”,并将其保存为“。ER”文件,例如“1.cer”

最有可能的是,您可以看到您的证书是基于证书链的,这意味着它依赖于其他证书。对于其中的每一个,请单击“显示证书”:

并以与之前相同的方式保存该文件。重复此操作,直到到达根证书。在本例中,您将得到三个*。cer文件。为了进一步的参考,我将它们称为1。cer,2。cer和3。cer

您现在应该为这些证书创建一个新的信任库文件

/opt/webMethods9/jvm/jvm/bin/keytool -import -file
/opt/webMethods9/IntegrationServer/instances/default/packages/DEV_fse/resources/1.cer -keystore azureTruststore.jks -alias "D-TRUST Root Class 3 CA 2 2009"

/opt/webMethods9/jvm/jvm/bin/keytool -import -file 
/opt/webMethods9/IntegrationServer/instances/default/packages/DEV_fse/resources/2.cer -keystore azureTruststore.jks -alias "D-TRUST SSL Class 3 CA 1 2009"

/opt/webMethods9/jvm/jvm/bin/keytool -import -file 
/opt/webMethods9/IntegrationServer/instances/default/packages/DEV_fse/resources/3.cer -keystore azureTruststore.jks -alias "servicebus.cloudapi.de"

第一次将要求您为此新创建的信任库设置密码。现在将信任库移动到/opt/webMethods9/IntegrationServer/config/certs/trusted(供以后参考)。您可以将其作为信任库添加到IS中(通过使用管理UI“安全性”)

为JNDI创建属性文件您需要创建一个servicebus.properties文件作为伪JNDI服务器的数据源。从技术上讲,您可以将该文件放在任何您想要的地方,但我建议将其放在“XXXXXXConnection”包的“资源”文件夹中。这应该是该文件的内容:

# servicebus.properties - sample JNDI configuration
# Register a ConnectionFactory in JNDI using the form:
# connectionfactory.[jndi_name] = [ConnectionURL]
connectionfactory.SBCF = amqps://XXXXXX.servicebus.cloudapi.de?jms.username=xxxxx&jms.password=xxxxxxx&amqp.idleTimeout=120000&jms.receiveLocalOnly=true&transport.trustStoreLocation=/opt/webMethods9/IntegrationServer/config/certs/trusted/azureTruststore.jks
# Register some queues in JNDI using the form 
# queue.[jndi_name] = [physical_name] 
# topic.[jndi_name] = [physical_name]
queue.QUEUE = myqueue
​

一些解释:

  1. SBCF将是连接工厂的JNDI-Lookup名称。JMS-Connection
  2. 中需要此名称
  3. xxxxxx.servicebus.cloudapi.de是服务总线的URL
  4. jms.username将由友好的Azure管理员提供
  5. jms.password将由友好的Azure管理员提供。但是请注意,在您可以在此URL中使用之前,您需要对您将从管理员获得的内容进行URL编码。例如,可以通过在Designer中手动调用IS Servicepub.string: URLEncode来完成。
  6. amqp.idleTimeout需要设置为120000(或更高),否则无法连接到SB
  7. jms.receiveLocalOnly需要设置为true,否则无法连接到SB
  8. transport.trustStorePlace需要指向包含创建到SB的安全(AMQPS)连接所需的所有SSL证书的信任库
  9. 队列。QUEUE:QUEUE是JNDI-Lookup名称,您稍后将在JMS-Client中使用它来发送消息,或者在JMS-Trigger中使用它来接收消息。你应该设定一个更有意义的东西。this的值(示例中的my队列)是SB上队列的名称,必须由Azure管理员提供。

仅有的两个重要价值是:

  1. "初始上下文工厂":org.apache.qpid.jms.jndi.Jms初始上下文工厂
  2. "提供者URL":必须指向您创建的servicebus.properties,例如file:/0025/webMEODS9/集成服务器/实例/默认/包/XXXXConnection/资源/servicebus.properties
苏淇
2023-03-14

如果您使用SpringBoot,您可以使用现成的Azure ServiceBus JMS SpringBoot启动器。

<dependency>
    <groupId>com.microsoft.azure</groupId>
    <artifactId>azure-servicebus-jms-spring-boot-starter</artifactId>
    <version>2.3.5</version>
</dependency>

请参阅教程页面https://docs.microsoft.com/en-us/azure/developer/java/spring-framework/configure-spring-boot-starter-java-app-with-azure-service-bus

曾瀚昂
2023-03-14

您的错误消息表示未找到目的地的名称(未找到消息实体)。请注意,需要以如下特定方式告诉Azure订阅名称:

<TopicName>/Subscriptions/<SubscriptionName>

就你而言:

state-changed/Subscriptions/reverb

希望有帮助

干杯Seb

 类似资料:
  • 我想将一个小的JSON消息放入中。消息将具有附加到它的“ProviderID”属性,并且根据筛选规则,该消息将被筛选到特定于提供程序的上 但是,我似乎无法在上指定共享访问策略,以限制第三方提供商仅连接到他们自己的 我假设应该在订阅上设置以便将这些消息发送到另一个并在那里应用特定于提供程序的安全性,这样做是否正确。 或者有其他/更好的/推荐的方法来做这件事。

  • 来自第三次订阅的消息会发生什么情况,是否会在TTL之后发送到死信队列 有没有办法找出消息未被使用的订阅

  • 这似乎是最简单的解决办法。让我们看看流程: 第三方向RESTful API发送请求,以获取Windows Azure服务总线连接字符串-凭据-。 一旦拥有连接字符串,第三方就会连接到Windows服务总线,并开始从某个主题订阅接收消息。注意:连接字符串是在服务器端加密的,只能由接受的客户端解密。 优点 null null 第三方请求一个类似于RESTful的TCP API,以便订阅一些Window

  • 我们使用服务总线主题作为pub/sub系统的引擎。我们的逻辑涉及我们的NodeJS服务用多个订阅连接到一个主题。对于每个订阅,我们删除$default(TrueFilter),并在消息头的Label属性上创建一个CorrelationFilter,并且不在订阅中应用AutoDeleteOnIdle设置,因为我们希望确保订阅服务器功能在服务启动之前一直运行。 这个问题可以归结为这样:某件事能导致规则

  • 我尝试将Azure服务总线与ApacheQPID和Spring与事务集成。 但Azure服务总线AMQP实现似乎不支持事务。这是真的吗?我没有找到相关信息。 这是我的JMS配置 这是我的spring集成片段: 它与session transact=“false”配合使用,但与session transact=“true”配合使用时会产生错误: QPID跟踪