当前位置: 首页 > 知识库问答 >
问题:

使用Java程序接收Azure服务总线队列中的挂起消息?

廉展鹏
2023-03-14

我想在我的服务总线队列中接收挂起的消息。我浏览了一些关于接收消息的链接(https://docs.microsoft.com/en-us/azure/service-bus-messaging/service-bus-java-how-to-use-queues),但其中不包含有关如何接收和查看这些消息的信息。

从服务总线队列接收消息的Java代码如下[我是Java新手]:-

public class Test2 {

public static void main(String[] args) throws ServiceException {

    String namespace        = "SampleNamespace";
    String sharedKeyName    = "RootManageSharedAccessKey";
    String sharedSecretKey  = "t+U5ERMAnIyxgEUDUouGOKn6ADM/CuLWzEJZtauwVsc=";
    String queueName        = "QueueName";      

    // Azure Service Bus Service
    com.microsoft.windowsazure.Configuration config = ServiceBusConfiguration.configureWithSASAuthentication(namespace, sharedKeyName, sharedSecretKey, ".servicebus.windows.net");
    ServiceBusContract service = ServiceBusService.create(config);

    // Receive and Delete Messages
    ReceiveMessageOptions opts = ReceiveMessageOptions.DEFAULT;
    opts.setReceiveMode(ReceiveMode.RECEIVE_AND_DELETE);

    while (true) {

        ReceiveQueueMessageResult resultQM = service.receiveQueueMessage(queueName , opts);
        BrokeredMessage message = resultQM.getValue();
        if (message != null && message.getMessageId() != null) {
             System.out.println("Body: " + message.toString());
            System.out.println("MessageID: " + message.getMessageId());
        } else {
            System.out.println("No more messages.");
            break;
        }
    }
}
}

但当我运行此代码时,我得到以下错误:-

    Exception in thread "main" java.lang.NoClassDefFoundError: javax/ws/rs/WebApplicationException
at java.lang.Class.getDeclaredConstructors0(Native Method)
at java.lang.Class.privateGetDeclaredConstructors(Class.java:2671)
at java.lang.Class.getConstructors(Class.java:1651)
at com.microsoft.windowsazure.core.DefaultBuilder.findInjectConstructor(DefaultBuilder.java:67)
at com.microsoft.windowsazure.core.DefaultBuilder.add(DefaultBuilder.java:94)
at com.microsoft.windowsazure.services.servicebus.Exports.register(Exports.java:34)
at com.microsoft.windowsazure.core.DefaultBuilder.create(DefaultBuilder.java:46)
at com.microsoft.windowsazure.Configuration.<init>(Configuration.java:80)
at com.microsoft.windowsazure.Configuration.load(Configuration.java:100)
at com.microsoft.windowsazure.Configuration.getInstance(Configuration.java:90)
at com.microsoft.windowsazure.services.servicebus.ServiceBusConfiguration.configureWithSASAuthentication(ServiceBusConfiguration.java:252)
at com.rocky.servicebus.queue.Test2.main(Test2.java:24)
Caused by: java.lang.ClassNotFoundException: javax.ws.rs.WebApplicationException
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:335)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)

有谁能帮我纠正我的错误吗?将是伟大的任何帮助。

谢谢,鲁德拉

共有2个答案

郭盛
2023-03-14

对于所有漫游者,下面是从Azure SB队列获取挂起消息的工作Java代码:-

import java.io.StringWriter;
import java.nio.charset.StandardCharsets;


import java.util.Scanner;

import com.microsoft.windowsazure.services.servicebus.ServiceBusConfiguration;
import com.microsoft.windowsazure.services.servicebus.ServiceBusContract;
import com.microsoft.windowsazure.services.servicebus.ServiceBusService;
import com.microsoft.windowsazure.services.servicebus.models.BrokeredMessage;
import com.microsoft.windowsazure.services.servicebus.models.ReceiveMessageOptions;
import com.microsoft.windowsazure.services.servicebus.models.ReceiveMode;
import com.microsoft.windowsazure.services.servicebus.models.ReceiveQueueMessageResult;


public class Test1 {

    //static StringWriter writer = new StringWriter();

    public static void main(String...s) throws Exception{

        com.microsoft.windowsazure.Configuration config = ServiceBusConfiguration.configureWithSASAuthentication("Your_NameSpace", "RootManageSharedAccessKey", "Mkf1H3g9qg0LrNEP1QbZ/EJKSARmJZQdOI6ek6obalI=", ".servicebus.windows.net");
        ServiceBusContract service = ServiceBusService.create(config);

        ReceiveMessageOptions opts = ReceiveMessageOptions.DEFAULT;
        opts.setReceiveMode(ReceiveMode.PEEK_LOCK);
        while(true)
        { 
             ReceiveQueueMessageResult resultQM = service.receiveQueueMessage("Queue_Name", opts);
             BrokeredMessage message = resultQM.getValue(); 
             if (message != null && message.getMessageId() != null)
             {
                try 
                {
                //  IOUtils.copy(message.getBody(), writer, encoding);

                    Scanner s1 = new Scanner(message.getBody()).useDelimiter("\\A");
                    String result = s1.hasNext() ? s1.next() : "";

                    //above will convert InputStream in String

                   System.out.println("Body: " + message.toString());
                   System.out.println("MainBody : " + result );
                   System.out.println("MessageID: " + message.getMessageId());
                   System.out.println("Custom Property: " + 
                        message.getProperty("TestProperty"));
                   // Remove message from queue
                   System.out.println("Deleting this message.");
                   service.deleteMessage(message);
                }
                catch (Exception ex)
                {
                   // Indicate a problem, unlock message in queue
                   System.out.println("Inner exception encountered!");
                   service.unlockMessage(message);
                }
             }
             else
             {
                System.out.println("Finishing up - no more messages.");
                break; 
                // Added to handle no more messages in the queue.
                // Could instead wait for more messages to be added.
             }
        }

    }


}

确保为"BrokeredMessage"获取所需的Maven依赖项。

谢谢,鲁德拉

扈韬
2023-03-14

根据接收消息的教程,您需要创建一个队列客户机,并为其注册一个消息处理程序。

A) 获取连接字符串。

b)用于发送和接收消息的代码示例

public static void registerReceiver(QueueClient queueClient, ExecutorService executorService) throws Exception {
    queueClient.registerMessageHandler(
        new IMessageHandler() {
            public CompletableFuture<Void> onMessageAsync(IMessage message) {
                if (message.getLabel() != null &&
                       message.getContentType() != null &&
                       message.getLabel().contentEquals("TestMessage") &&
                       message.getContentType().contentEquals("text/plain")) {

                    System.out.printf(
                        "\nMessage received: \n -->MessageId = %s\n -->ContentType = %s\n -->Content = %s\n",
                        message.getMessageId(),
                        message.getContentType(),
                        new String(message.getBody())
                    );
                    return queueClient.completeAsync(message.getLockToken());
                }
                return queueClient.abandonAsync(message.getLockToken());
            }

            public void notifyException(Throwable throwable, ExceptionPhase exceptionPhase) {
                System.out.printf(exceptionPhase + "-" + throwable.getMessage());
            }
        },
        new MessageHandlerOptions(1, false, Duration.ofSeconds(10)),
        executorService
    );
}

public static void sendMessages(QueueClient client) throws ServiceBusException, InterruptedException {
    for (int i = 0; i < 100; i++) {
        String messageId = Integer.toString(i);
        Message message = new Message("This is message " + i);
        message.setContentType("text/plain");
        message.setLabel("TestMessage");
        message.setMessageId(messageId);
        message.setTimeToLive(Duration.ofMinutes(10));
        client.send(message);
        System.out.printf("Message sent: Id = %s \n", message.getMessageId());
    }
}

public static void main(String[] args) throws Exception {
    String connectionString = "your_connection_string, Endpoint=sb://j*9.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=V*=";
    String queueName = "your_queue_name, testQueue";

    QueueClient client = new QueueClient(new ConnectionStringBuilder(connectionString, queueName), ReceiveMode.PEEKLOCK);
    sendMessages(client);
    client.close();

    QueueClient receiveClient = new QueueClient(new ConnectionStringBuilder(connectionString, queueName), ReceiveMode.PEEKLOCK);
    ExecutorService executorService = Executors.newSingleThreadExecutor();
    registerReceiver(receiveClient, executorService);

    Thread.sleep(60 * 1000); // Wait for 60 seconds to receive all the messages.
    receiveClient.close();
    executorService.shutdown();
}

结果:

将发送100条消息。

Message sent: Id = 0 
Message sent: Id = 1 
Message sent: Id = 2 
Message sent: Id = 3 
*
*
*
Message sent: Id = 99 

然后将开始接收消息。

Message received: 
 -->MessageId = 0
 -->ContentType = text/plain
 -->Content = This is message 0

Message received: 
 -->MessageId = 1
 -->ContentType = text/plain
 -->Content = This is message 1

Message received: 
 -->MessageId = 2
 -->ContentType = text/plain
 -->Content = This is message 2
*
*
*
Message received: 
 -->MessageId = 99
 -->ContentType = text/plain
 -->Content = This is message 99
 类似资料:
  • 我已经创建了一个简单的窗口服务来使用来自Azure服务总线队列的消息。我使用TopShelch创建windows服务。下面的代码从这里剪切如下示例:https://docs.microsoft.com/en-us/azure/service-bus-messaging/service-bus-dotnet-get-started-with-queues 高频。Run(); ServiceBusHe

  • 我有一个windows服务,它侦听Azure服务总线队列消息,以便从我的WebApi应用程序分发处理。此外,我还需要处理重复性任务(每晚/每周),我认为最好使用相同的系统来处理这些任务。 例如,假设我有一个“CleanupDb”队列,每天午夜删除过时的DB节点: 理论上这应该行得通,但我觉得我错过了一个更明显的处理方法。有没有更好的办法?

  • 参考https://github.com/Azure/azure-service-bus/tree/master/samples/dotnet/gettingstart/microsoft.Azure.servicebus/basicsendreceiveusingtopicsubscriptionclient,我了解Azure服务总线主题的一般工作方式,我的问题更多地是关于它实际上是如何工作的。

  • 根据MS文档,从订阅接收消息并不困难。但是,如果我希望我的应用程序在每次发布新消息时都接收一条消息--一个恒定的轮询。因此,使用了SubscriptionClient类的OnMessage()方法。 MS文档说:“...当调用OnMessage时,客户端启动一个内部消息泵,该消息泵不断轮询队列或订阅。该消息泵由发出Receive()调用的无限循环组成。如果调用超时,它发出下一个Receive()调

  • 我们有内部服务总线安装。我可以使用QPID AMQP 1.0 0.24客户端发布和订阅/读取消息。然而,队列浏览不起作用,当队列中没有更多消息时,对hasMoreElements()的调用将无限期挂起。堆栈跟踪是: 代码: ConnectionFactory ConnectionFactory=(ConnectionFactory)上下文。查找(“MS\U SERVICE\U BUS”);连接=连

  • 我正在用java编写代码(使用Azure SDK for java),我有一个包含会话消息的服务总线队列。我想接收这些消息并将它们处理到另一个地方。 我使用QueueClient连接到队列,然后使用registerSessionHandler处理消息(下面的代码)。 问题是,每当收到消息时,我都可以打印有关它的所有详细信息,包括内容,但它会打印10次,每次之后都会打印一个异常。(打印10次:我理解