当前位置: 首页 > 知识库问答 >
问题:

抽象Spring云流生产者和消费者代码

钱欣悦
2023-03-14

我有一个服务,它从不同的Spring云流通道(绑定到EventHub/Kafka主题)生成和使用消息。有几种设置类似的服务。

配置如下所示

 public interface MessageStreams {
      String WORKSPACE = "workspace";
      String UPLOADNOTIFICATION = "uploadnotification";
      String BLOBNOTIFICATION = "blobnotification";
      String INGESTIONSTATUS = "ingestionstatusproducer";

      @Input(WORKSPACE)
      SubscribableChannel workspaceChannel();

      @Output(UPLOADNOTIFICATION)
      MessageChannel uploadNotificationChannel();

      @Input(BLOBNOTIFICATION)
      SubscribableChannel blobNotificationChannel();

      @Output(INGESTIONSTATUS)
      MessageChannel ingestionStatusChannel();
    }


    @EnableBinding(MessageStreams.class)
    public class EventHubStreamsConfiguration {
    }

生产者/发布者代码如下所示

    @Service
    @Slf4j
    public class IngestionStatusEventPublisher {
      private final MessageStreams messageStreams;

      public IngestionStatusEventPublisher(MessageStreams messageStreams) {
        this.messageStreams = messageStreams;
      }

      public void sendIngestionStatusEvent() {
        log.info("Sending ingestion status event");
        System.out.println("Sending ingestion status event");
        MessageChannel messageChannel = messageStreams.ingestionStatusChannel();
        boolean messageSent = messageChannel.send(MessageBuilder
            .withPayload(IngestionStatusMessage.builder()
                .correlationId("some-correlation-id")
                .status("done")
                .source("some-source")
                .eventTime(OffsetDateTime.now())
                .build())
            .setHeader("tenant-id", "some-tenant")
            .build());
        log.info("Ingestion status event sent successfully {}", messageSent);
      }
    }

类似地,我还有多个其他发布者发布到不同的活动中心/主题。请注意,每个已发布的消息都有一个租户id标头。这是我的多租户应用程序特定于跟踪租户上下文的内容。还请注意,在发送消息时,我正在获取要发布到的频道。

我的消费者代码如下所示

java prettyprint-override">    @Component
    @Slf4j
    public class IngestionStatusEventHandler {
      private AtomicInteger eventCount = new AtomicInteger();

      @StreamListener(TestMessageStreams.INGESTIONSTATUS)
      public void handleEvent(@Payload IngestionStatusMessage message, @Header(name = "tenant-id") String tenantId) throws Exception {
        log.info("New ingestion status event received: {} in Consumer: {}", message, Thread.currentThread().getName());

        // set the tenant context as thread local from the header.

      }

同样,我有几个这样的使用者,并且每个使用者中都有一个租户上下文,该上下文是基于发布者发送的传入租户id标头设置的。

我的问题是

我如何摆脱在Publisher中设置租户id头和在Consumer中设置租户上下文的锅炉板代码,将其抽象到一个库中,该库可以包含在我拥有的所有不同服务中。

此外,是否有一种基于发布的消息类型动态标识通道的方法。对于ex IngestionStatusMessage。在给定场景中初始化

共有1个答案

柳墨一
2023-03-14

要在公共代码中设置和租户id标题,并避免在每个微服务中复制/粘贴,您可以使用通道侦听器,并使用@GlobalChannelInterceptor及其模式选项将其设置为全局。

在Spring集成中查看更多信息:https://docs.spring.io/spring-integration/docs/5.3.0.BUILD-SNAPSHOT/reference/html/core.html#channel-interceptors

https://docs.spring.io/spring-integration/docs/5.3.0.BUILD-SNAPSHOT/reference/html/overview.html#configuration-enable-integration

您不能按有效负载类型进行频道选择,因为有效负载类型实际上是由@StreamListener方法签名确定的。

您可以尝试使用带有消息的通用路由器

看见https://docs.spring.io/spring-integration/docs/5.3.0.BUILD-SNAPSHOT/reference/html/message-routing.html#messaging-路由章节

 类似资料:
  • 本教程演示了如何发送和接收来自Spring Kafka的消息。 首先创建一个能够发送消息给Kafka主题的Spring Kafka Producer。 接下来,我们创建一个Spring Kafka Consumer,它可以收听发送给Kafka主题的消息。使用适当的键/值序列化器和解串器来配置它们。 最后用一个简单的Spring Boot应用程序演示应用程序。 下载并安装Apache Kafka 要

  • 我想使用一个camel组件,它提供了使用和生成RESTful资源的能力。 对于这个例子,我想使用camel restlet组件。restlet组件一切正常,我已经使用REST DSL成功地实现了restlet consumer。然而,我有几个问题: 问题 1) 将restlet启用为异步是否安全?我读过restlet async可能会导致一些问题。这仍然正确吗?如何提高服务绩效?我应该改用码头吗?

  • 生产者线程与消费者线程使用信号量同步 生产者线程与消费者线程使用信号量同步 源码/* * Copyright (c) 2006-2018, RT-Thread Development Team * * SPDX-License-Identifier: Apache-2.0 * * Change Logs: * Date Author Notes * 2018-08-24 yangjie the f

  • 所谓的生产者消费者模型就是 某个模块(函数)负责生产数据,这些数据由另一个模块来负责处理 一般生产者消费者模型包含三个部分 生产者、缓冲区、消费者 为什么生产者消费者模型要含三个部分?直接生产和消费不行么? 一个案例说明一切 生产者好比现实生活中的某个人 缓冲区好比现实生活中的邮箱 消费者好比现实生活中的邮递员 如果只有生产者和消费者, 那么相当于只有写信的人和邮递员,那么如果将来过去的邮递员离职

  • 我的应用程序由一个带有POST方法的REST控制器组成,用于提交我必须使用生产者发送到主题的数据。 这是控制器 使用Spring-Cloud-Stream版本 从3.1版开始,和注释被弃用,所以我尝试切换到新的方式来设置生产者,我就是这样工作的 最后在应用程序中。yaml我有这个 现在的问题是,当我启动应用程序时,方法被无限调用(我在主题中看到消息)。然后使用供应商似乎我被迫在供应商内部定义消息数

  • 问题内容: 我对于如何使用特定的生产者-消费者模式感到困惑,在该模式中,生产者和消费者都可以同时并独立地进行操作。 首先,考虑以下示例,该示例紧随docs中的示例: 关于此脚本,有一个更详细的细节:通过常规的for循环将项目同步放入队列。 我的目标是创建一个使用(或)和的脚本。两者都应安排为同时运行。没有一个消费者协程明确地与生产者绑定或链接。 我如何修改上面的程序,以便生产者是可以与消费者/工人