@Slf4j
@Component
@EnableBinding(KafkaBinding.class)
public class AListener {
@StreamListener
public void sink(@Input(KafkaBinding.ABINDING) KStream<String, AnOrder> events) {
log.info("HERE_BEFORE");
events.foreach((k, v) -> {
log.info("HERE_AFTER value: {}", v.toString());
throw new RuntimeException("Failed, should land in dlq topic");
});
}
}
public interface KafkaBinding {
String ABINDING = "some.events";
@Input(ABINDING)
public KStream<String, AnOrder> incomingOrder();
}
spring:
application:
name: aprocessor
cloud:
stream:
kafka:
streams:
binder:
brokers: localhost:9092
serdeError: sendToDlq
configuration:
commit.interval.ms: 1000
default:
key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
value.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
bindings:
input:
consumer:
enableDlq: true
dlqName: a-dlq
autoCommitOnError: true
autoCommitOffset: true
bindings:
input:
group: a-group
destination: some.events
pos:
destination: some.events
consumer.header-mode: raw
@Slf4j
@DirtiesContext
@SpringBootTest
@EmbeddedKafka(
partitions = 1,
topics = {"some.events"},
controlledShutdown = true,
brokerProperties = {
"listeners=PLAINTEXT://localhost:9092",
"port=9092",
"auto.create.topics.enable=${topics.autoCreate:false}",
"delete.topic.enable=${topic.delete:true}"
})
public class AListenerTest {
private KafkaTemplate<String, String> kafkaTemplate;
@Autowired private EmbeddedKafkaBroker embeddedKafka;
@SpyBean private AListener listener;
private static final String INPUT_TOPIC = "some.events";
@BeforeEach
public void setUp() {
Map<String, Object> senderProperties =
KafkaTestUtils.senderProps(embeddedKafka.getBrokersAsString());
ProducerFactory<String, String> producerFactory =
new DefaultKafkaProducerFactory<>(senderProperties);
kafkaTemplate = new KafkaTemplate<>(producerFactory);
kafkaTemplate.setDefaultTopic(INPUT_TOPIC);
}
@Test
public void whenExceptionInConsumer_thenLogToDLQ(){
String logme = "{\"body\":\"thor\"}";
kafkaTemplate.sendDefault(logme);
log.info("<<<<DATA>>>> {}", logme);
}
}
Caused by: org.springframework.context.ApplicationContextException: Failed to start bean 'inputBindingLifecycle'; nested exception is java.lang.IllegalArgumentException: DLQ support is not available for anonymous subscriptions
at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:185)
at org.springframework.context.support.DefaultLifecycleProcessor.access$200(DefaultLifecycleProcessor.java:53)
at org.springframework.context.support.DefaultLifecycleProcessor$LifecycleGroup.start(DefaultLifecycleProcessor.java:360)
at org.springframework.context.support.DefaultLifecycleProcessor.startBeans(DefaultLifecycleProcessor.java:158)
at org.springframework.context.support.DefaultLifecycleProcessor.onRefresh(DefaultLifecycleProcessor.java:122)
at org.springframework.context.support.AbstractApplicationContext.finishRefresh(AbstractApplicationContext.java:893)
at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:552)
at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:775)
at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:397)
at org.springframework.boot.SpringApplication.run(SpringApplication.java:316)
at org.springframework.boot.test.context.SpringBootContextLoader.loadContext(SpringBootContextLoader.java:127)
at org.springframework.test.context.cache.DefaultCacheAwareContextLoaderDelegate.loadContextInternal(DefaultCacheAwareContextLoaderDelegate.java:99)
at org.springframework.test.context.cache.DefaultCacheAwareContextLoaderDelegate.loadContext(DefaultCacheAwareContextLoaderDelegate.java:117)
... 54 more
Caused by: java.lang.IllegalArgumentException: DLQ support is not available for anonymous subscriptions
at org.springframework.util.Assert.isTrue(Assert.java:118)
at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.doProvisionConsumerDestination(KafkaTopicProvisioner.java:186)
at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.provisionConsumerDestination(KafkaTopicProvisioner.java:161)
at org.springframework.cloud.stream.binder.kafka.streams.KafkaStreamsBinderUtils.prepareConsumerBinding(KafkaStreamsBinderUtils.java:53)
at org.springframework.cloud.stream.binder.kafka.streams.KStreamBinder.doBindConsumer(KStreamBinder.java:93)
at org.springframework.cloud.stream.binder.kafka.streams.KStreamBinder.doBindConsumer(KStreamBinder.java:51)
at org.springframework.cloud.stream.binder.AbstractBinder.bindConsumer(AbstractBinder.java:142)
at org.springframework.cloud.stream.binding.BindingService.doBindConsumer(BindingService.java:144)
at org.springframework.cloud.stream.binding.BindingService.bindConsumer(BindingService.java:112)
at org.springframework.cloud.stream.binding.BindableProxyFactory.createAndBindInputs(BindableProxyFactory.java:254)
at org.springframework.cloud.stream.binding.InputBindingLifecycle.doStartWithBindable(InputBindingLifecycle.java:58)
at java.base/java.util.LinkedHashMap$LinkedValues.forEach(LinkedHashMap.java:608)
at org.springframework.cloud.stream.binding.AbstractBindingLifecycle.start(AbstractBindingLifecycle.java:48)
at org.springframework.cloud.stream.binding.InputBindingLifecycle.start(InputBindingLifecycle.java:34)
at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:182)
... 66 more
我希望测试成功,控制台日志显示创建了dlq/topic,并查询dlq以打印消息。是什么原因导致KafkaTopicProvisioner抛出“IllegalArgumentException:DLQ支持不适用于匿名订阅”?
我已经尝试了这里的帖子中提到的步骤--“在Spring Cloud Stream Kafka中正确管理DLQ”。
匿名消费者不允许使用DLQ;您需要一个持久的订阅。
匿名使用者是那些没有指定使用者组的使用者。
从你引用的答案中。
bindings:
input:
group: so51247113
boolean anonymous = !StringUtils.hasText(group);
Assert.isTrue(!anonymous || !properties.getExtension().isEnableDlq(),
"DLQ support is not available for anonymous subscriptions");
STOMP规范规定订阅必须有id头。 https://stomp.github.io/stomp-specification-1.2.html#SUBSCRIBE_id_Header 订阅id标头 由于单个连接可以与服务器有多个打开的订阅,因此必须在框架中包含id标头以唯一标识订阅。id标头允许客户端和服务器将后续消息或取消订阅帧与原始订阅关联。在同一连接中,不同的订阅必须使用不同的订阅标识符。
问题内容: 哪些类型的对象属于“可订阅”范围? 问题答案: 它基本上意味着对象实现了该方法。换句话说,它描述的是“容器”对象,这意味着它们包含其他对象。这包括字符串,列表,元组和字典。
1.使用者角度 为企业、组织或个人提供一种信息传播方式,用对口的内容达成企业、组织与成员之间的沟通和知识传播。体现在为用户提供内容服务,传达各类资讯,用户订阅后,可在轻推客户端定期接收到内容资讯的推送 2.开发者角度 主要通过会话的形式为用户提供服务,用户在协同界面点击订阅号图标后,可直接进入与订阅号的聊天界面,开发难度低,支持在后台定制菜单,通过菜单引导用户到不同的去处。通知消息会被折叠在订阅消
我试图理解可观察对象是如何执行的,但似乎无法让这个简单的代码正常工作。 不应该是你好。订阅()执行?
为什么我会。。。 未捕获的类型错误:string.split 不是一个函数 ...当我跑步时...
我目前正在学习Spring Boot和GraphQL。对于graphql,我正在使用graphql spqr库,遇到了一个graphql错误。我可以使用localhost:8080/gui对我的graphql api进行查询和修改,但当我使用Postman或从前端调用时,我会收到下面的错误消息。在gui中,endpoint似乎仍然是/graphql,但当我将同一个endpoint与Postman或