当前位置: 首页 > 知识库问答 >
问题:

是什么导致了“DLQ支持对匿名订阅不可用”?

柴正祥
2023-03-14
@Slf4j
@Component
@EnableBinding(KafkaBinding.class)
public class AListener {

      @StreamListener
      public void sink(@Input(KafkaBinding.ABINDING) KStream<String, AnOrder> events) {
        log.info("HERE_BEFORE");
        events.foreach((k, v) -> {
          log.info("HERE_AFTER value: {}", v.toString());
          throw new RuntimeException("Failed, should land in dlq topic");
        });
      }
    }
public interface KafkaBinding {
  String ABINDING = "some.events";

  @Input(ABINDING)
  public KStream<String, AnOrder> incomingOrder();
}
spring:
  application:
    name: aprocessor
  cloud:
    stream:
      kafka:
        streams:
          binder:
            brokers: localhost:9092
            serdeError: sendToDlq
            configuration:
              commit.interval.ms: 1000
              default:
                key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
                value.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
          bindings:
            input:
              consumer:
                enableDlq: true
                dlqName: a-dlq
                autoCommitOnError: true
                autoCommitOffset: true
      bindings:
        input:
          group: a-group
          destination: some.events
        pos:
          destination: some.events
          consumer.header-mode: raw
@Slf4j
@DirtiesContext
@SpringBootTest
@EmbeddedKafka(
    partitions = 1,
    topics = {"some.events"},
    controlledShutdown = true,
    brokerProperties = {
      "listeners=PLAINTEXT://localhost:9092",
      "port=9092",
      "auto.create.topics.enable=${topics.autoCreate:false}",
      "delete.topic.enable=${topic.delete:true}"
    })
public class AListenerTest {
  private KafkaTemplate<String, String> kafkaTemplate;

  @Autowired private EmbeddedKafkaBroker embeddedKafka;
  @SpyBean private AListener listener;

  private static final String INPUT_TOPIC = "some.events";

  @BeforeEach
  public void setUp() {

    Map<String, Object> senderProperties =
        KafkaTestUtils.senderProps(embeddedKafka.getBrokersAsString());

    ProducerFactory<String, String> producerFactory =
        new DefaultKafkaProducerFactory<>(senderProperties);

    kafkaTemplate = new KafkaTemplate<>(producerFactory);

    kafkaTemplate.setDefaultTopic(INPUT_TOPIC);
  }

  @Test
  public void whenExceptionInConsumer_thenLogToDLQ(){
    String logme = "{\"body\":\"thor\"}";
    kafkaTemplate.sendDefault(logme);
    log.info("<<<<DATA>>>> {}", logme);
  }
}
Caused by: org.springframework.context.ApplicationContextException: Failed to start bean 'inputBindingLifecycle'; nested exception is java.lang.IllegalArgumentException: DLQ support is not available for anonymous subscriptions
    at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:185)
    at org.springframework.context.support.DefaultLifecycleProcessor.access$200(DefaultLifecycleProcessor.java:53)
    at org.springframework.context.support.DefaultLifecycleProcessor$LifecycleGroup.start(DefaultLifecycleProcessor.java:360)
    at org.springframework.context.support.DefaultLifecycleProcessor.startBeans(DefaultLifecycleProcessor.java:158)
    at org.springframework.context.support.DefaultLifecycleProcessor.onRefresh(DefaultLifecycleProcessor.java:122)
    at org.springframework.context.support.AbstractApplicationContext.finishRefresh(AbstractApplicationContext.java:893)
    at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:552)
    at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:775)
    at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:397)
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:316)
    at org.springframework.boot.test.context.SpringBootContextLoader.loadContext(SpringBootContextLoader.java:127)
    at org.springframework.test.context.cache.DefaultCacheAwareContextLoaderDelegate.loadContextInternal(DefaultCacheAwareContextLoaderDelegate.java:99)
    at org.springframework.test.context.cache.DefaultCacheAwareContextLoaderDelegate.loadContext(DefaultCacheAwareContextLoaderDelegate.java:117)
    ... 54 more
Caused by: java.lang.IllegalArgumentException: DLQ support is not available for anonymous subscriptions
    at org.springframework.util.Assert.isTrue(Assert.java:118)
    at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.doProvisionConsumerDestination(KafkaTopicProvisioner.java:186)
    at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.provisionConsumerDestination(KafkaTopicProvisioner.java:161)
    at org.springframework.cloud.stream.binder.kafka.streams.KafkaStreamsBinderUtils.prepareConsumerBinding(KafkaStreamsBinderUtils.java:53)
    at org.springframework.cloud.stream.binder.kafka.streams.KStreamBinder.doBindConsumer(KStreamBinder.java:93)
    at org.springframework.cloud.stream.binder.kafka.streams.KStreamBinder.doBindConsumer(KStreamBinder.java:51)
    at org.springframework.cloud.stream.binder.AbstractBinder.bindConsumer(AbstractBinder.java:142)
    at org.springframework.cloud.stream.binding.BindingService.doBindConsumer(BindingService.java:144)
    at org.springframework.cloud.stream.binding.BindingService.bindConsumer(BindingService.java:112)
    at org.springframework.cloud.stream.binding.BindableProxyFactory.createAndBindInputs(BindableProxyFactory.java:254)
    at org.springframework.cloud.stream.binding.InputBindingLifecycle.doStartWithBindable(InputBindingLifecycle.java:58)
    at java.base/java.util.LinkedHashMap$LinkedValues.forEach(LinkedHashMap.java:608)
    at org.springframework.cloud.stream.binding.AbstractBindingLifecycle.start(AbstractBindingLifecycle.java:48)
    at org.springframework.cloud.stream.binding.InputBindingLifecycle.start(InputBindingLifecycle.java:34)
    at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:182)
    ... 66 more

我希望测试成功,控制台日志显示创建了dlq/topic,并查询dlq以打印消息。是什么原因导致KafkaTopicProvisioner抛出“IllegalArgumentException:DLQ支持不适用于匿名订阅”?

我已经尝试了这里的帖子中提到的步骤--“在Spring Cloud Stream Kafka中正确管理DLQ”。

共有1个答案

宓诚
2023-03-14

匿名消费者不允许使用DLQ;您需要一个持久的订阅。

匿名使用者是那些没有指定使用者组的使用者。

从你引用的答案中。

  bindings:
    input:
      group: so51247113
    boolean anonymous = !StringUtils.hasText(group);
    Assert.isTrue(!anonymous || !properties.getExtension().isEnableDlq(),
            "DLQ support is not available for anonymous subscriptions");
 类似资料:
  • STOMP规范规定订阅必须有id头。 https://stomp.github.io/stomp-specification-1.2.html#SUBSCRIBE_id_Header 订阅id标头 由于单个连接可以与服务器有多个打开的订阅,因此必须在框架中包含id标头以唯一标识订阅。id标头允许客户端和服务器将后续消息或取消订阅帧与原始订阅关联。在同一连接中,不同的订阅必须使用不同的订阅标识符。

  • 问题内容: 哪些类型的对象属于“可订阅”范围? 问题答案: 它基本上意味着对象实现了该方法。换句话说,它描述的是“容器”对象,这意味着它们包含其他对象。这包括字符串,列表,元组和字典。

  • 1.使用者角度 为企业、组织或个人提供一种信息传播方式,用对口的内容达成企业、组织与成员之间的沟通和知识传播。体现在为用户提供内容服务,传达各类资讯,用户订阅后,可在轻推客户端定期接收到内容资讯的推送 2.开发者角度 主要通过会话的形式为用户提供服务,用户在协同界面点击订阅号图标后,可直接进入与订阅号的聊天界面,开发难度低,支持在后台定制菜单,通过菜单引导用户到不同的去处。通知消息会被折叠在订阅消

  • 我试图理解可观察对象是如何执行的,但似乎无法让这个简单的代码正常工作。 不应该是你好。订阅()执行?

  • 为什么我会。。。 未捕获的类型错误:string.split 不是一个函数 ...当我跑步时...

  • 我目前正在学习Spring Boot和GraphQL。对于graphql,我正在使用graphql spqr库,遇到了一个graphql错误。我可以使用localhost:8080/gui对我的graphql api进行查询和修改,但当我使用Postman或从前端调用时,我会收到下面的错误消息。在gui中,endpoint似乎仍然是/graphql,但当我将同一个endpoint与Postman或