我正在尝试使用Spring Cloud Stream处理发送到Azure Event Hub实例的消息。这些消息应该路由到Kafka集群上运行时根据消息内容确定的租户特定主题。出于开发目的,我通过Docker在本地运行Kafka。我对配置时未知的绑定进行了一些研究,发现动态目标解析可能正是我在这个场景中所需要的。
然而,让我的解决方案工作的唯一方法是使用StreamBridge
。我宁愿使用动态目标标头spring.cloud.stream.sendto.destination
,这样处理器就可以被编写为函数
继续看代码,这是处理器函数,我去掉了不相关的部分
private static final String OUTPUT_DESTINATION_TEMPLATE = "%s.gateway-report";
private static final String STREAM_DESTINATION_HEADER = "spring.cloud.stream.sendto.destination";
private static final String TENANT_ID_HEADER = "tenant-id";
@Bean
public Function<Message<String>, Message<String>>
routeMessageToTenantDestination(TenantGatewayDeviceService gatewayDeviceService) {
return msg -> {
final String tenantId = "test";
final String destination = String.format(OUTPUT_DESTINATION_TEMPLATE, tenantId);
return MessageBuilder.withPayload(msg.getPayload())
.setHeader(STREAM_DESTINATION_HEADER, destination)
.setHeader(TENANT_ID_HEADER, tenantId)
.build();
};
}
这是我的application.yml
spring:
cloud:
stream:
bindings:
routeMessageToTenantDestination-in-0:
binder: kafka-evthub
destination: gateway-report
group: report-processor
dynamic-destinations:
binders:
kafka-ioc:
type: kafka
environment:
spring.cloud.stream.kafka.binder:
brokers: localhost:29092
kafka-evthub:
type: kafka
environment:
spring.cloud.stream.kafka.binder:
brokers: xxxxxxxxxxx.servicebus.windows.net:9093
configuration:
sasl:
jaas:
config: org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="Endpoint=sb://xxxxxxxxxxx.servicebus.windows.net/;SharedAccessKeyName=*******;SharedAccessKey=********";
mechanism: PLAIN
security.protocol: SASL_SSL
default-binder: kafka-ioc
pom.xml中的相关依赖项
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
这是每次函数启动时我都会遇到的异常
2022-01-20 10:56:18.848 ERROR 2258917 --- [container-0-C-1] o.s.integration.handler.LoggingHandler : org.springframework.messaging.MessageHandlingException: error occurred in message handler [... stripped away ...]
at org.springframework.integration.support.utils.IntegrationUtils.wrapInHandlingExceptionIfNecessary(IntegrationUtils.java:191)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:65)
at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115)
at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:133)
at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106)
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:72)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:317)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:272)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)
at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:208)
at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter.sendMessageIfAny(KafkaMessageDrivenChannelAdapter.java:385)
at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter.access$300(KafkaMessageDrivenChannelAdapter.java:79)
at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$IntegrationRecordMessageListener.onMessage(KafkaMessageDrivenChannelAdapter.java:442)
at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$IntegrationRecordMessageListener.onMessage(KafkaMessageDrivenChannelAdapter.java:416)
at org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter.lambda$onMessage$0(RetryingMessageListenerAdapter.java:125)
at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:329)
at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:255)
at org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter.onMessage(RetryingMessageListenerAdapter.java:119)
at org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter.onMessage(RetryingMessageListenerAdapter.java:42)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:2588)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeOnMessage(KafkaMessageListenerContainer.java:2569)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:2483)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:2405)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:2284)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:1958)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeIfHaveRecords(KafkaMessageListenerContainer.java:1353)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1344)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1236)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.lang.NullPointerException
at org.springframework.cloud.stream.function.StreamBridge.resolveDestination(StreamBridge.java:276)
at org.springframework.cloud.stream.function.FunctionConfiguration$FunctionToDestinationBinder$1.doSendMessage(FunctionConfiguration.java:604)
at org.springframework.cloud.stream.function.FunctionConfiguration$FunctionToDestinationBinder$1.handleMessageInternal(FunctionConfiguration.java:597)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:56)
... 32 more
我尝试过不同的方法,例如手动创建目标主题,配置一个具有相同名称的显式目标绑定(不是一个明确的解决方案,只是为了测试),但我一直遇到这个异常。我还尝试提供了一个<code>NewDestinationBindingCallback
这也适用于将Spring Cloud Stream与Event Hubs集成的另一种方法,即库<code>azure Spring Cloud Stream binder eventhubs</code>。
正如我之前所说,我已经找到了依赖 StreamBridge 的解决方法,但这个解决方案对我来说似乎不太理想,我想了解我错过了什么。
编辑:我向前迈出了一小步,通过将spring boot starter版本从
2.6.2
降级到2.4.4
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.4.4</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
和设置
<properties>
<spring-cloud.version>2020.0.2</spring-cloud.version>
</properties>
而不是pom.xml中的
2021.0.0
,如sobychacko提供的示例中所示。然而,这似乎是一种回归,或者我的配置中缺少一些东西,无法使其与最新版本一起工作?
不确定到底是什么导致了您的问题。我刚刚创建了一个演示sendto.destination
标头的基本示例应用程序,并验证了该应用程序是否按预期工作。它是一个多绑定器应用程序,连接了两个Kafka集群。该函数将从第一个集群消耗,然后使用sendto
标头,生成第二个集群的输出。将此示例中的代码/配置与您的应用程序进行比较,看看缺少了什么。
我在您共享的stacktrace中看到了对< code>StreamBridge的引用。但是,当使用< code>sendto.destination头时,它不应该通过< code>StreamBridge。
我正在开发一个路由器(事件代理)应用程序与Spring云流在Kafka,在功能范式。应用程序从不断输入的主题消耗,映射和过滤消息,然后应该根据一些输入字段将其发送到某个主题(一次只有单个消息,而不是多个结果)。 最好的方法是设置Spring。云流动发送到。输出消息的目标标题?如果是这样,我应该如何为生产者设置绑定?
Spring Cloud Kafka Streams与Spring Cloud Stream、Spring Cloud Function、Spring AMQP和Spring for Apache Kafka有什么区别?
我用的是Apache Kafka 2.7.0和Spring Cloud Stream Kafka Streams。 在我的Spring Cloud Stream (Kafka Streams)应用程序中,我已经将我的application.yml配置为当输入主题中的消息出现反序列化错误时使用sendToDlq机制: 我启动了我的应用程序,但我看不到这个主题存在。文档指出,如果 DLQ 主题不存在,
问题:我试图逐行读取一个大文件,并将消息放入RabbitMQ中。我想在文件末尾提交给rabbitMQ。如果文件中的任何记录是坏的,那么我想撤销发布到队列的消息。 技术:Spring boot、Spring cloud stream、RabbitMQ 你能帮我实现这个过渡的东西吗。我知道如何使用spring cloud Stream读取文件并发布到队列。
我想使用bean向Kafka发送带有在运行时解析的主题名称的消息。我的问题是,本主题的中的默认值为1。 当Kafka中已经存在此主题时,以及当客户端创建此主题时(分区计数等于配置值),此值并不反映真正的分区数。 如何使此属性反映分区的真实数目?
在使用ErrorHandlingDeserializer处理Avro组合的错误时,我无法发布到Dlq主题。以下是发布时的错误。 主题Topic_DLT在60000毫秒后不在元数据中。错误KafkaConsumerDestination{consumerDestination Name='Topic‘,partitions=6,dlqName='TOIC_DLT‘}。container-0-C-1