当前位置: 首页 > 知识库问答 >
问题:

仅拦截来自消费者的Spring云流消息

郎飞龙
2023-03-14

我目前正在使用带有GlobalChannelInterceptor的Kafka绑定器的Spring Cloud Stream为我的Spring Boot微服务执行消息记录。

我有:

  1. 生产者将消息发布到订阅频道

在消息从生产者发布到流并被消费者收听的整个过程中,可以观察到preSend方法被触发了两次:

  1. 一次在生产者端-消息发布到流时

然而,出于日志记录的目的,我只需要在消费者端截获并记录消息。

是否有任何方法可以仅在一侧(例如消费者侧)截获SCS消息?

我将不胜感激对此事的任何想法。谢谢!

参考:

  1. GlobalChannelInterceptor文档-https://docs.spring.io/spring-integration/api/org/springframework/integration/config/GlobalChannelInterceptor.html

编辑

制作人

public void sendToPushStream(PushStreamMessage message) {
        try {
            boolean results = streamChannel.pushStream().send(MessageBuilder.withPayload(new ObjectMapper().writeValueAsString(message)).build());
        log.info("Push stream message {} sent to {}.", results ? "successfully" : "not", StreamChannel.PUSH_STREAM);
        } catch (JsonProcessingException ex) {
            log.error("Unable to parse push stream message.", ex);
        }
    }

制片人的streamChannel

public interface StreamChannel {

    String PUSH_STREAM = "PushStream";

    @Output(StreamChannel.PUSH_STREAM)
    SubscribableChannel pushStream();

}

消费者

@StreamListener(StreamChannel.PUSH_STREAM)
public void handle(Message<PushStreamMessage> message) {
    log.info("Incoming stream message from {}, {}", streamChannel.pushStream(), message);

}

消费者的streamChannel

public interface StreamChannel {

    String PUSH_STREAM = "PushStream";

    @Input(StreamChannel.PUSH_STREAM)
    SubscribableChannel pushStream();

}

拦截器(公共库)

public class GlobalStreamInterceptor extends ChannelInterceptorAdapter {

    @Override
    public Message<?> preSend(Message<?> msg, MessageChannel mc) {
       log.info("presend " + msg);
        return msg;
    }

    @Override
    public void postSend(Message<?> msg, MessageChannel mc, boolean sent) {
        log.info("postSend " + msg);
    }

}

共有1个答案

郑浩博
2023-03-14

对,为什么不遵循GlobalChannelInterceptor选项并且不应用

通道名称将与之匹配的简单模式数组。

因此,您可能会遇到这样的情况:

@GlobalChannelInterceptor(patterns = Processor.INPUT)

或者为您的SCSt应用程序使用输入频道的自定义名称。

 类似资料:
  • D: \软件\Kafka\Kafka2.10-0.10.0.1\bin\windows 我使用上面的命令来消费消息,有什么我错过的吗?帮助我: 这个 那些是生产者和消费者......

  • 我有一个向rabbitmq发送消息的服务,消费者对消息进行一些操作并重新排队。 我可以成功地将初始消息发送给rabbitmq,但问题是,如果消息需要修改,我无法将任何已使用的消息重新发送给rabbitmq。 我试图用new创建一个新类,但“MyService”始终为空

  • 生产者发送消息到一个有四个分区的主题。我们有一个消费者在消费来自这个主题的消息。应用程序在工作日一直运行周末例外:它不会在周末期间调用poll方法。 使用者配置:自动提交,自动提交时间为5s(默认)。 应用程序一直运行良好,直到一个星期天,当它重新开始调用poll方法。我们看到有数百万条消息从这个话题中被轮询出来。消费者基本上是轮询来自主题的所有消息。将新的偏移量与它在周末停止之前的偏移量进行比较

  • 我有一个服务,它从不同的Spring云流通道(绑定到EventHub/Kafka主题)生成和使用消息。有几种设置类似的服务。 配置如下所示 生产者/发布者代码如下所示 类似地,我还有多个其他发布者发布到不同的活动中心/主题。请注意,每个已发布的消息都有一个租户id标头。这是我的多租户应用程序特定于跟踪租户上下文的内容。还请注意,在发送消息时,我正在获取要发布到的频道。 我的消费者代码如下所示 同样

  • 消费者使用Spring的JavaConfig类如下: Kafka主题侦听器使用@KafkaListener注释,如下所示: 我的pom包括依赖项: 现在当我打包到war并部署到tomcat时,它不会显示任何错误,即使在调试模式下也不会显示任何错误,只是部署war什么都没有。 请帮助我了解是否缺少触发kafkalistner的某些配置。 谢谢Gary我添加了上下文。xml和web。xml,但我得到了

  • 是否有一种方法以编程方式访问和打印使用者滞后偏移,或者说使用者读取的最后一条记录的偏移与某个生产者写入该使用者分区的最后一条记录的偏移之间的位置差。 要知道我的最终目标是将这个值发送到prometheus进行监视,我应该在上面添加哪些语句来得到滞后偏移值?