当前位置: 首页 > 知识库问答 >
问题:

Spring Cloud Stream pollable consumer dlq和errorChannel在使用不同线程时不起作用

曾航
2023-03-14
  private final ExecutorService executor = Executors.newFixedThreadPool(1);

  private volatile boolean paused = false;

  @Around(value = "@annotation(pollableConsumer) && args(dataCapsule,..)")
  public void handleMessage(ProceedingJoinPoint joinPoint,
      PollableConsumer pollableConsumer, Object dataCapsule) {
    if (dataCapsule instanceof Message) {
      Message<?> message = (Message<?>) dataCapsule;
      AcknowledgmentCallback callback = StaticMessageHeaderAccessor
          .getAcknowledgmentCallback(message);
      callback.noAutoAck();

      if (!paused) {
        // The separate thread is not busy with a previous message, so process this message:
        Runnable runnable = () -> {
          try {
            paused = true;

            // Call method to process this Kafka message
            joinPoint.proceed();

            callback.acknowledge(Status.ACCEPT);
          } catch (Throwable e) {
            callback.acknowledge(Status.REJECT);
            throw new PollableConsumerException(e);
          } finally {
            paused = false;
          }
        };

        executor.submit(runnable);
      } else {  

        // The separate thread is busy with a previous message, so re-queue this message for later:
        callback.acknowledge(Status.REQUEUE);
      }
    }
  }

我们可以创建一个不同的输出通道来在发生异常时发布消息,但它感觉我们试图实现一些可能不必要的东西。

更新1

我们添加了这些豆子:

  @Bean
  public KafkaTemplate<String, byte[]> kafkaTemplate() {
    return new KafkaTemplate<>(producerFactory());
  }
  @Bean
  public ProducerFactory<String, byte[]> producerFactory() {
    Map<String, Object> configProps = new HashMap<>();
    configProps.put(
        org.apache.kafka.clients.producer.ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
        "http://localhost:9092");
    configProps.put(
        org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
        StringSerializer.class);
    configProps.put(
        org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
        KafkaAvroSerializer.class);
    return new DefaultKafkaProducerFactory<>(configProps);
  }
  @Bean
  public KafkaAdmin admin() {
    Map<String, Object> configs = new HashMap<>();
    configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "http://localhost:9092");
    return new KafkaAdmin(configs);
  }
  @Bean
  public NewTopic topicErr() {
    return TopicBuilder.name("ERR").partitions(1).replicas(1).build();
  }
  @Bean
  public SeekToCurrentErrorHandler eh(KafkaOperations<String, byte[]> template) {
    return new SeekToCurrentErrorHandler(new DeadLetterPublishingRecoverer(
        template,
        (cr, e) -> new TopicPartition("ERR", 1)),
        new FixedBackOff(0L, 1L));
  }

加里的例子似乎普遍有效。尽管我们需要在使用不推荐的StreamListner方法而不是函数时进行一些修改,但仍有一些问题无法解决。

  • 主题名称似乎应该总是channel_name+.dlt,因为我们不知道如何使用dlq这样的不同名称。我们为所有使用者使用一个dlq主题,这似乎不是Spring-kafka默认DLT所期望的。
  • 似乎我们需要在DLT上至少有与使用者主题相同数量的分区。否则,此解决方案不起作用。但不确定如何管理这一问题,因为对我们来说,这似乎不是一个实际的假设。
  • 有没有一种方法,我们可以利用Spring的重试,就像Spring的云流在幕后所做的那样?或者这需要单独实施?即,根据max.attemps重试工作,然后输入DLQ部分。
  • 我可以看到,在示例中,Sring actuator通过this.endpoint.changestate(“polled”,state.paused)this.endpoint.changestate(“polled”,state.recomed)更新通道状态。为什么我们需要与pause、requeue等一起这样做?不这样做的副作用是什么?

共有1个答案

沈飞舟
2023-03-14

你的观察是正确的;错误处理绑定到线程。

您可以直接在代码中使用deadletterpublishingrecoverer,以使发布DLQ变得更容易(而不是输出通道)。这样,您将获得带有异常信息等的增强头。

https://docs.spring.io/spring-kafka/docs/current/reference/html/#死信

@SpringBootApplication
@EnableScheduling
public class So67296258Application {

    public static void main(String[] args) {
        SpringApplication.run(So67296258Application.class, args);
    }

    @Bean
    TaskExecutor exec() {
        return new ThreadPoolTaskExecutor();
    }

    @Bean
    DeadLetterPublishingRecoverer recoverer(KafkaOperations<Object, Object> template) {
        return new DeadLetterPublishingRecoverer(template);
    }

    @Bean
    NewTopic topic() {
        return TopicBuilder.name("polled.DLT").partitions(1).replicas(1).build();
    }

    @Bean
    MessageSourceCustomizer<KafkaMessageSource<?, ?>> customizer() {
        return (source, dest, group) -> source.setRawMessageHeader(true);
    }

}

@Component
class Handler {

    private static final Logger LOG = LoggerFactory.getLogger(Handler.class);

    private final PollableMessageSource source;

    private final TaskExecutor exec;

    private final BindingsEndpoint endpoint;

    private final DeadLetterPublishingRecoverer recoverer;

    Handler(PollableMessageSource source, TaskExecutor exec, BindingsEndpoint endpoint,
            DeadLetterPublishingRecoverer recoverer) {

        this.source = source;
        this.exec = exec;
        this.endpoint = endpoint;
        this.recoverer = recoverer;
    }

    @Scheduled(fixedDelay = 5_000)
    public void process() {
        LOG.info("Polling");
        boolean polled = this.source.poll(msg -> {
            LOG.info("Pausing Binding");
            this.endpoint.changeState("polled", State.PAUSED);
            AcknowledgmentCallback callback = StaticMessageHeaderAccessor.getAcknowledgmentCallback(msg);
            callback.noAutoAck();
//          LOG.info(msg.toString());
            this.exec.execute(() -> {
                try {
                    runJob(msg);
                }
                catch (Exception e) {
                    this.recoverer.accept(msg.getHeaders().get(KafkaHeaders.RAW_DATA, ConsumerRecord.class), e);
                }
                finally {
                    callback.acknowledge();
                    this.endpoint.changeState("polled", State.RESUMED);
                    LOG.info("Resumed Binding");
                }
            });
        });
        LOG.info("" + polled);
    }

    private void runJob(Message<?> msg) throws InterruptedException {
        LOG.info("Running job");
        Thread.sleep(30_000);
        throw new RuntimeException("fail");
    }

}
spring.cloud.stream.pollable-source=polled
spring.cloud.stream.bindings.polled-in-0.destination=polled
spring.cloud.stream.bindings.polled-in-0.group=polled

对补充问题的答复:

1,2:参见Spring for Apache Kafka文档:https://docs.Spring.io/Spring-Kafka/docs/current/reference/html/#dead-letters

DLPR有一个备用构造函数,使您能够指定目标解析器。默认值只是追加.dlt并使用相同的分区。javadocs指定如何指定目标分区:

    /**
     * Create an instance with the provided template and destination resolving function,
     * that receives the failed consumer record and the exception and returns a
     * {@link TopicPartition}. If the partition in the {@link TopicPartition} is less than
     * 0, no partition is set when publishing to the topic.
     * @param template the {@link KafkaOperations} to use for publishing.
     * @param destinationResolver the resolving function.
     */
retryTemplate.execute(context -> { ... },
    context -> {...});
 类似资料:
  • 问题内容: 我发现了一个代码创建一个超时功能在这里,这似乎并没有工作。完整的测试代码如下: 预期的行为:代码在3秒内结束。问题出在哪儿? 问题答案: 一个线程不能优雅地杀死另一个线程,因此对于您当前的代码,它永远不会终止。(使用Python程序时,仅保留守护程序线程将退出,但这不允许您在不终止主线程的情况下终止。) 有些人]试图使用信号来停止执行,但这在某些情况下可能不安全。 如果可以修改,则有许

  • 运行之后(在jdk1.8中),答案不是1000。请告诉我原因。

  • 问题内容: 我正在尝试同步三个线程以打印012012012012…。但是它不能正常工作。每个线程都分配有一个编号,当它从主线程接收到信号时将打印该编号。以下程序有问题,我无法捕获。 问题答案: 您需要更多的协调。该notify调用不会立即唤醒线程并强制其继续执行。相反,您可以考虑notify将电子邮件发送给线程以使其可以继续进行。想象一下,如果您想让3个朋友按顺序给您打电话。您向朋友1发送了一封电

  • 问题内容: 当我运行TaskJob时,我得到了空指针异常,因为Spring不会自动装配serviceJob服务。是新线程导致此问题,因为Spring自动连接mysqlService没有任何问题? 我的applicationContext.xml; 我的课是; 编辑: TaskJob实例化; 问题答案: Spring仅自动装配其创建的组件。您正在调用新的TaskJob(),Spring不知道该对象,

  • 问题内容: 为什么计数为0? 我先启动线程1,然后再启动线程2。计数应为2000。但是它显示计数为0。请有人用简单的术语进行解释。 问题答案: 在打印线程计数时,线程尚未完成执行。 为了演示,在打印出线程数之前添加一条指令: 还要注意,对基元的操作不是线程安全的,并且该操作也不是原子的。您应该同步访问变量,或使用或代替。就目前而言,最终的计数可能在零到20,000之间。

  • 我有一个基于java的web应用程序的kafka生产者,可以将消息推送到kafka。根据文档,我可以看到kafka生产者是线程安全的。这是否意味着我可以拥有Kafka生产者的单个实例,并由不同的线程(web请求)使用,在我的情况下,每个线程都将打开和关闭生产者。这会产生任何问题吗?还是根据请求启动生产者更好?