当前位置: 首页 > 知识库问答 >
问题:

传入消息在来自Azure服务总线主题订阅的侦听器之间进行分配

雍兴修
2023-03-14

我正在使用以下来自Microsoft portal的代码从Azure service bus主题/订阅发送和接收消息。代码运行良好。当我运行两个接收者代码实例时,消息会在两个接收者之间分配。10个发送和接收各5个。我只想知道如何在所有接收器中获取所有消息。

public class ServiceBusTopicSender 
{

static final Gson GSON = new Gson();

    public static void main(String[] args) throws Exception, ServiceBusException {
        // TODO Auto-generated method stub

        TopicClient sendClient;
        String connectionString = "Endpoint=sb://basicbus.servicebus.windows.net/;"
                                + "SharedAccessKeyName=RootManageSharedAccessKey;"
                                + "SharedAccessKey=xxxxxpxxxxxxxxxxxxxxxxxx/xxxxxxxxxx=";

        sendClient = new TopicClient(new ConnectionStringBuilder(connectionString, "basictopic"));       
        sendMessagesAsync(sendClient).thenRunAsync(() -> sendClient.closeAsync());
    }

    static CompletableFuture<Void> sendMessagesAsync(TopicClient sendClient) {
        List<HashMap<String, String>> data =
                GSON.fromJson(
                        "[" +
                                "{'name' = 'Einstein', 'firstName' = 'Albert'}," +
                                "{'name' = 'Heisenberg', 'firstName' = 'Werner'}," +
                                "{'name' = 'Curie', 'firstName' = 'Marie'}," +
                                "{'name' = 'Hawking', 'firstName' = 'Steven'}," +
                                "{'name' = 'Newton', 'firstName' = 'Isaac'}," +
                                "{'name' = 'Bohr', 'firstName' = 'Niels'}," +
                                "{'name' = 'Faraday', 'firstName' = 'Michael'}," +
                                "{'name' = 'Galilei', 'firstName' = 'Galileo'}," +
                                "{'name' = 'Kepler', 'firstName' = 'Johannes'}," +
                                "{'name' = 'Kopernikus', 'firstName' = 'Nikolaus'}" +
                                "]",
                        new TypeToken<List<HashMap<String, String>>>() {
                        }.getType());

        List<CompletableFuture> tasks = new ArrayList<>();
        for (int i = 0; i < data.size(); i++) {
            final String messageId = Integer.toString(i);
            Message message = new Message(GSON.toJson(data.get(i), Map.class).getBytes(UTF_8));
            message.setContentType("application/json");
            message.setLabel("Scientist");
            message.setMessageId(messageId);
            message.setTimeToLive(Duration.ofMinutes(2));           
            System.out.printf("Message sending: Id = %s\n", message.getMessageId());
            tasks.add(
                    sendClient.sendAsync(message).thenRunAsync(() -> {
                        System.out.printf("\tMessage acknowledged: Id = %s\n", message.getMessageId());
                    }));
        }
        return CompletableFuture.allOf(tasks.toArray(new CompletableFuture<?>[tasks.size()]));
    }
}
public class ServiceBusTopicReceiver 
{
static final Gson GSON = new Gson();

    public static void main(String[] args) throws Exception, ServiceBusException {

        String connectionString = "Endpoint=sb://basicbus.servicebus.windows.net/;"
                                + "SharedAccessKeyName=RootManageSharedAccessKey;"
                                + "SharedAccessKey=xxxxxpxxxxxxxxxxxxxxxxxx/xxxxxxxxxx=";

        SubscriptionClient subscription1Client = new SubscriptionClient(new ConnectionStringBuilder(connectionString, 
                "basictopic/subscriptions/basicsubscription"), ReceiveMode.PEEKLOCK);


        registerMessageHandlerOnClient(subscription1Client);

    }

    static void registerMessageHandlerOnClient(SubscriptionClient receiveClient) throws Exception {

        // register the RegisterMessageHandler callback
        IMessageHandler messageHandler = new IMessageHandler() {
            // callback invoked when the message handler loop has obtained a message
            public CompletableFuture<Void> onMessageAsync(IMessage message) {
                // receives message is passed to callback
                if (message.getLabel() != null &&
                        message.getContentType() != null &&
                        message.getLabel().contentEquals("Scientist") &&
                        message.getContentType().contentEquals("application/json")) {

                    byte[] body = message.getBody();
                    Map scientist = GSON.fromJson(new String(body, UTF_8), Map.class);

                System.out.printf(
                        "\n\t\t\t\t%s Message received: \n\t\t\t\t\t\tMessageId = %s, \n\t\t\t\t\t\tSequenceNumber = %s, \n\t\t\t\t\t\tEnqueuedTimeUtc = %s," +
                                "\n\t\t\t\t\t\tExpiresAtUtc = %s, \n\t\t\t\t\t\tContentType = \"%s\",  \n\t\t\t\t\t\tContent: [ firstName = %s, name = %s ]\n",
                        receiveClient.getEntityPath(),
                        message.getMessageId(),
                        message.getSequenceNumber(),
                        message.getEnqueuedTimeUtc(),
                        message.getExpiresAtUtc(),
                        message.getContentType(),
                        scientist != null ? scientist.get("firstName") : "",
                        scientist != null ? scientist.get("name") : "");
                System.out.println("Partition Key is ::::: " + message.getPartitionKey());
            }
            return receiveClient.completeAsync(message.getLockToken());
        }

        public void notifyException(Throwable throwable, ExceptionPhase exceptionPhase) {
            System.out.printf(exceptionPhase + "-" + throwable.getMessage());
        }
    };
    receiveClient.registerMessageHandler(
                messageHandler,
                // callback invoked when the message handler has an exception to report
            // 1 concurrent call, messages are auto-completed, auto-renew duration
            new MessageHandlerOptions(1, false, Duration.ofMinutes(1)));

}

}

共有1个答案

澹台成龙
2023-03-14

Azure Service Bus是一个经纪人。当您接收消息时,您使用的是竞争消费者模式。这意味着,如果您有一个订阅,并且您的流程的多个实例侦听同一个订阅,它们将不会得到相同的消息。它们只会得到一个子集,这是其他实例没有接收到的。那是故意的。如果希望多个处理器实例接收相同的消息,则需要为每个处理器实例创建一个订阅。这样,发送到主题的每个消息都将复制到每个订阅,并且每个订阅者(接收者)都将获得自己的副本。

 类似资料:
  • 这似乎是最简单的解决办法。让我们看看流程: 第三方向RESTful API发送请求,以获取Windows Azure服务总线连接字符串-凭据-。 一旦拥有连接字符串,第三方就会连接到Windows服务总线,并开始从某个主题订阅接收消息。注意:连接字符串是在服务器端加密的,只能由接受的客户端解密。 优点 null null 第三方请求一个类似于RESTful的TCP API,以便订阅一些Window

  • 有可能做到这一点吗?

  • 我有一个服务总线主题与50个订阅者有他们自己的过滤器。如果有人猜到订户的名字,我如何从49个订户中获得消息?我可以验证订阅者凭据吗? 现在我的B计划是创建一个50个队列,以便每个队列都有自己的安全连接字符串。有谁能提出正确的方法吗?

  • 我之所以要这样做,是因为我们的服务在部署时配置订阅规则,并且具有消息代理的服务可能会在具有订阅客户端的服务更改规则集和新的业务逻辑之前部署该服务,该服务会发送一组新的消息。我们不希望丢失在部署期间发送的消息,并在新服务退出时处理它们。 干杯。

  • 我想将一个小的JSON消息放入中。消息将具有附加到它的“ProviderID”属性,并且根据筛选规则,该消息将被筛选到特定于提供程序的上 但是,我似乎无法在上指定共享访问策略,以限制第三方提供商仅连接到他们自己的 我假设应该在订阅上设置以便将这些消息发送到另一个并在那里应用特定于提供程序的安全性,这样做是否正确。 或者有其他/更好的/推荐的方法来做这件事。