当前位置: 首页 > 知识库问答 >
问题:

Kafka监听器中的钩子

顾嘉纳
2023-03-14

kafka收听消息之前/之后是否有任何类型的钩子可用?

用例:必须为设置MDC关联id以执行日志跟踪

我在找什么?前/后回调方法,以便可以在进入时设置MDC关联id,并最终在退出时清除MDC。

编辑的场景:我正在获取作为Kafka Headers一部分的相关ID,我想在Kafka Listener中收到消息后立即在MDC中设置相同的ID

感谢您的帮助

共有1个答案

况鸿雪
2023-03-14

你可以在你的侦听器bean中添加一个around建议。。。

@SpringBootApplication
public class So59854374Application {

    public static void main(String[] args) {
        SpringApplication.run(So59854374Application.class, args);
    }

    @Bean
    public static BeanPostProcessor bpp() { // static is important
        return new BeanPostProcessor() {

            @Override
            public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {
                if (bean instanceof MyListener) {
                    ProxyFactoryBean pfb = new ProxyFactoryBean();
                    pfb.setTarget(bean);
                    pfb.addAdvice(new MethodInterceptor() {

                        @Override
                        public Object invoke(MethodInvocation invocation) throws Throwable {
                            try {
                                System.out.println("Before");
                                return invocation.proceed();
                            }
                            finally {
                                System.out.println("After");
                            }
                        }

                    });
                    return pfb.getObject();
                }
                return bean;
            }

        };
    }

    @Bean
    public NewTopic topic() {
        return TopicBuilder.name("so59854374").partitions(1).replicas(1).build();
    }

    @Bean
    public ApplicationRunner runner(KafkaTemplate<String, String> template) {
        return args -> template.send("so59854374", "foo");
    }

}

@Component
class MyListener {

    @KafkaListener(id = "so59854374", topics = "so59854374")
    public void listen(String in) {
        System.out.println(in);
    }

}

Before
foo
After

编辑

如果将字节[]mdc作为附加参数添加到kafka侦听器方法中,则可以在调用中使用getArguments()[1]

另一种解决方案是向侦听器容器工厂添加一个RecordInterceptor,它允许您在原始消费记录传递到侦听器适配器之前访问它。

/**
 * An interceptor for {@link ConsumerRecord} invoked by the listener
 * container before invoking the listener.
 *
 * @param <K> the key type.
 * @param <V> the value type.
 *
 * @author Gary Russell
 * @since 2.2.7
 *
 */
@FunctionalInterface
public interface RecordInterceptor<K, V> {

    /**
     * Perform some action on the record or return a different one.
     * If null is returned the record will be skipped.
     * @param record the record.
     * @return the record or null.
     */
    @Nullable
    ConsumerRecord<K, V> intercept(ConsumerRecord<K, V> record);

}
/**
 * Set an interceptor to be called before calling the listener.
 * Does not apply to batch listeners.
 * @param recordInterceptor the interceptor.
 * @since 2.2.7
 */
public void setRecordInterceptor(RecordInterceptor<K, V> recordInterceptor) {
    this.recordInterceptor = recordInterceptor;
}

如果您使用的是批处理侦听器,Kafka提供了一个消费者接收器。

 类似资料:
  • 假设我有50个Kafka Topics,每个有3个分区,总共有150个分区。如果我为这150个分区中的每一个配置了一个KafkaListener/消费者(由于每个分区的容量很大),这意味着我有150个侦听器在运行。据我所知,每个侦听器都有自己的线程。那么这是否意味着在这种情况下会有150个活动线程?这似乎很多。有什么方法可以将其限制为一次最多线程数(比如20个)?

  • 我不知道如何才能得到消息的密钥,这是发送在监听器。

  • 我明白,要使一个方法成为Kafka消息侦听器的目标,我必须用@KafkaListener注释标记这个方法。此注释允许通过containerFactory元素指定KafkaListenerContainerFactory。 下面是巴尔东SpringKafka教程的一些片段。 Kafka消费者配置。JAVA MessageListener。JAVA 我不明白的是为什么我们需要一个监听器容器工厂。什么是

  • 问题内容: 我在玩React钩子,遇到了一个问题。当我尝试使用由事件侦听器处理的按钮来控制台记录日志时,它显示错误的状态。 CodeSandbox: 点击“添加卡”按钮两次 在第一张卡中,单击Button1并在控制台中查看有2张卡处于状态(正确行为) 在第一张卡中,单击Button2(由事件侦听器处理),然后在控制台中看到只有1张卡处于状态(行为错误) 为什么显示错误状态?在第一张卡片中,Butt

  • 问题内容: 我在玩React钩子,遇到了一个问题。当我尝试使用事件侦听器处理的按钮来控制台记录日志时,它显示错误的状态。 CodeSandbox: https ://codesandbox.io/s/lrxw1wr97m 点击“添加卡”按钮两次 在第一张卡中,单击“ Button1”,然后在控制台中看到有2张卡处于状态(正确行为) 在第一张卡中,单击Button2(由事件侦听器处理),然后在控制台

  • 主要内容:监听器的分类,监听对象创建和销毁的监听器,监听属性变更的监听器,监听 Session 中对象状态改变的监听器,注册监听器监听器 Listener 是一个实现特定接口的 Java 程序,这个程序专门用于监听另一个 Java 对象的方法调用或属性改变,当被监听对象发生上述事件后,监听器某个方法将立即自动执行。 监听器的相关概念: 事件:方法调用、属性改变、状态改变等。 事件源:被监听的对象( 例如:request、session、servletContext)。 监听器:用于监听事件源对象