当前位置: 首页 > 知识库问答 >
问题:

从Azure Event Hub到Kafka的春云流中的动态目的地

商辰钊
2023-03-14

我正在尝试使用Spring Cloud Stream处理发送到Azure Event Hub实例的消息。这些消息应该路由到Kafka集群上运行时根据消息内容确定的租户特定主题。出于开发目的,我通过Docker在本地运行Kafka。我对配置时未知的绑定进行了一些研究,发现动态目标解析可能正是我在这个场景中所需要的。

然而,让我的解决方案工作的唯一方法是使用StreamBridge。我宁愿使用动态目标标头spring.cloud.stream.sendto.destination,这样处理器就可以被编写为函数

继续看代码,这是处理器函数,我去掉了不相关的部分

    private static final String OUTPUT_DESTINATION_TEMPLATE = "%s.gateway-report";
    private static final String STREAM_DESTINATION_HEADER = "spring.cloud.stream.sendto.destination";
    private static final String TENANT_ID_HEADER = "tenant-id";

    @Bean
    public Function<Message<String>, Message<String>>
    routeMessageToTenantDestination(TenantGatewayDeviceService gatewayDeviceService) {
        return msg -> {
            final String tenantId = "test";
            final String destination = String.format(OUTPUT_DESTINATION_TEMPLATE, tenantId);
            return MessageBuilder.withPayload(msg.getPayload())
                    .setHeader(STREAM_DESTINATION_HEADER, destination)
                    .setHeader(TENANT_ID_HEADER, tenantId)
                    .build();
        };
    }

这是我的application.yml

spring:
  cloud:
    stream:
      bindings:
        routeMessageToTenantDestination-in-0:
          binder: kafka-evthub
          destination: gateway-report
          group: report-processor
      dynamic-destinations:
      binders:
        kafka-ioc:
          type: kafka
          environment:
            spring.cloud.stream.kafka.binder:
              brokers: localhost:29092
        kafka-evthub:
          type: kafka
          environment:
            spring.cloud.stream.kafka.binder:
              brokers: xxxxxxxxxxx.servicebus.windows.net:9093
              configuration:
                sasl:
                  jaas:
                    config: org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="Endpoint=sb://xxxxxxxxxxx.servicebus.windows.net/;SharedAccessKeyName=*******;SharedAccessKey=********";
                  mechanism: PLAIN
                security.protocol: SASL_SSL
      default-binder: kafka-ioc

pom.xml中的相关依赖项

<dependency>
<groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-stream</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-stream-binder-kafka</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>

这是每次函数启动时我都会遇到的异常

2022-01-20 10:56:18.848 ERROR 2258917 --- [container-0-C-1] o.s.integration.handler.LoggingHandler   : org.springframework.messaging.MessageHandlingException: error occurred in message handler [... stripped away ...]
    at org.springframework.integration.support.utils.IntegrationUtils.wrapInHandlingExceptionIfNecessary(IntegrationUtils.java:191)
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:65)
    at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:133)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106)
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:72)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:317)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:272)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)
    at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:208)
    at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter.sendMessageIfAny(KafkaMessageDrivenChannelAdapter.java:385)
    at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter.access$300(KafkaMessageDrivenChannelAdapter.java:79)
    at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$IntegrationRecordMessageListener.onMessage(KafkaMessageDrivenChannelAdapter.java:442)
    at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$IntegrationRecordMessageListener.onMessage(KafkaMessageDrivenChannelAdapter.java:416)
    at org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter.lambda$onMessage$0(RetryingMessageListenerAdapter.java:125)
    at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:329)
    at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:255)
    at org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter.onMessage(RetryingMessageListenerAdapter.java:119)
    at org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter.onMessage(RetryingMessageListenerAdapter.java:42)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:2588)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeOnMessage(KafkaMessageListenerContainer.java:2569)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:2483)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:2405)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:2284)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:1958)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeIfHaveRecords(KafkaMessageListenerContainer.java:1353)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1344)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1236)
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
    at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.lang.NullPointerException
    at org.springframework.cloud.stream.function.StreamBridge.resolveDestination(StreamBridge.java:276)
    at org.springframework.cloud.stream.function.FunctionConfiguration$FunctionToDestinationBinder$1.doSendMessage(FunctionConfiguration.java:604)
    at org.springframework.cloud.stream.function.FunctionConfiguration$FunctionToDestinationBinder$1.handleMessageInternal(FunctionConfiguration.java:597)
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:56)
    ... 32 more

我尝试过不同的方法,例如手动创建目标主题,配置一个具有相同名称的显式目标绑定(不是一个明确的解决方案,只是为了测试),但我一直遇到这个异常。我还尝试提供了一个<code>NewDestinationBindingCallback

这也适用于将Spring Cloud Stream与Event Hubs集成的另一种方法,即库<code>azure Spring Cloud Stream binder eventhubs</code>。

正如我之前所说,我已经找到了依赖 StreamBridge 的解决方法,但这个解决方案对我来说似乎不太理想,我想了解我错过了什么。

编辑:我向前迈出了一小步,通过将spring boot starter版本从2.6.2降级到2.4.4

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.4.4</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>

和设置

    <properties>
        <spring-cloud.version>2020.0.2</spring-cloud.version>
    </properties>

而不是pom.xml中的2021.0.0,如sobychacko提供的示例中所示。然而,这似乎是一种回归,或者我的配置中缺少一些东西,无法使其与最新版本一起工作?


共有1个答案

高正初
2023-03-14
匿名用户

不确定到底是什么导致了您的问题。我刚刚创建了一个演示sendto.destination标头的基本示例应用程序,并验证了该应用程序是否按预期工作。它是一个多绑定器应用程序,连接了两个Kafka集群。该函数将从第一个集群消耗,然后使用sendto标头,生成第二个集群的输出。将此示例中的代码/配置与您的应用程序进行比较,看看缺少了什么。

我在您共享的stacktrace中看到了对< code>StreamBridge的引用。但是,当使用< code>sendto.destination头时,它不应该通过< code>StreamBridge。

 类似资料:
  • 我正在开发一个路由器(事件代理)应用程序与Spring云流在Kafka,在功能范式。应用程序从不断输入的主题消耗,映射和过滤消息,然后应该根据一些输入字段将其发送到某个主题(一次只有单个消息,而不是多个结果)。 最好的方法是设置Spring。云流动发送到。输出消息的目标标题?如果是这样,我应该如何为生产者设置绑定?

  • Spring Cloud Kafka Streams与Spring Cloud Stream、Spring Cloud Function、Spring AMQP和Spring for Apache Kafka有什么区别?

  • 我用的是Apache Kafka 2.7.0和Spring Cloud Stream Kafka Streams。 在我的Spring Cloud Stream (Kafka Streams)应用程序中,我已经将我的application.yml配置为当输入主题中的消息出现反序列化错误时使用sendToDlq机制: 我启动了我的应用程序,但我看不到这个主题存在。文档指出,如果 DLQ 主题不存在,

  • 问题:我试图逐行读取一个大文件,并将消息放入RabbitMQ中。我想在文件末尾提交给rabbitMQ。如果文件中的任何记录是坏的,那么我想撤销发布到队列的消息。 技术:Spring boot、Spring cloud stream、RabbitMQ 你能帮我实现这个过渡的东西吗。我知道如何使用spring cloud Stream读取文件并发布到队列。

  • 我想使用bean向Kafka发送带有在运行时解析的主题名称的消息。我的问题是,本主题的中的默认值为1。 当Kafka中已经存在此主题时,以及当客户端创建此主题时(分区计数等于配置值),此值并不反映真正的分区数。 如何使此属性反映分区的真实数目?

  • 在使用ErrorHandlingDeserializer处理Avro组合的错误时,我无法发布到Dlq主题。以下是发布时的错误。 主题Topic_DLT在60000毫秒后不在元数据中。错误KafkaConsumerDestination{consumerDestination Name='Topic‘,partitions=6,dlqName='TOIC_DLT‘}。container-0-C-1