当前位置: 首页 > 知识库问答 >
问题:

如何在Amazon MSK上运行Kafka骆驼连接器

计弘
2023-03-14

背景:我按照这个链接设置了AWS MSK,并测试了生产者和消费者,它的设置和工作正常。我能够通过两个单独的EC2实例发送和接收消息,这两个实例都使用同一个Kafka集群(我的MSK集群)。现在,我想建立一条从Eventhubs到AWS Firehose的数据管道,其形式如下:

Azure Eventhub-

我能够成功地做到这一点,没有使用MSK(通过常规的老Kafka),但由于未说明的原因,需要使用MSK现在,我不能让它工作。

问题:当尝试启动AWS MSK和我正在使用的两个Camel连接器之间的连接器时,我会收到以下错误:

这是两个有问题的连接器:

  1. AWS Kinesis消防软管至Kafka接头(Kafka-

目标:让这些连接器与MSK一起工作,就像他们直接与Kafka一起工作时没有MSK一样。

这是Firehose的问题:

Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:177)
com.amazonaws.services.kinesisfirehose.model.AmazonKinesisFirehoseException: The security token included in the request is invalid

以下是Azure的示例:

[2021-05-04 14:09:56,848] WARN Load balancing for event processor failed - If you are using a StorageSharedKeyCredential, and the server returned an error message that says 'Signature did not match', you can compare the string to sign with the one generated by the SDK. To log the string to sign, pass in the context key value pair 'Azure-Storage-Log-String-To-Sign': true to the appropriate method call.
If you are using a SAS token, and the server returned an error message that says 'Signature did not match', you can compare the string to sign with the one generated by the SDK. To log the string to sign, pass in the context key value pair 'Azure-Storage-Log-String-To-Sign': true to the appropriate generateSas method call.
Please remember to disable 'Azure-Storage-Log-String-To-Sign' before going to production as this string can potentially contain PII.
Status code 403, "<?xml version="1.0" encoding="utf-8"?><Error><Code>AuthorizationFailure</Code><Message>This request is not authorized to perform this operation.
Time:2021-05-04T14:09:56.7148317Z</Message></Error>" (com.azure.messaging.eventhubs.PartitionBasedLoadBalancer:344)
[2021-05-04 14:09:56,858] Error was received while reading the incoming data. The connection will be closed. (reactor.netty.channel.ChannelOperationsHandler:319)
java.lang.NoSuchMethodError: org.apache.camel.component.azure.eventhubs.EventHubsConsumer.createExchange(Z)Lorg/apache/camel/Exchange;
        at org.apache.camel.component.azure.eventhubs.EventHubsConsumer.createAzureEventHubExchange(EventHubsConsumer.java:93)

共有2个答案

郜琦
2023-03-14

Kafka Connect是一个与Kafka(MSK、开源或任何其他Kafka发行版)一起使用的框架。但是,它没有任何连接器。Kafka Connect与开源Kafka捆绑在一起。

作为最佳实践,永远不要在与代理节点相同的服务器上运行kafka连接。因为它们共享二进制文件。调整代理可能会在kafka代理上导致意外问题。此外,Kafka Connect应用程序是应用程序,您不会在相同的节点上运行kafka消费者或生产者应用程序。因此,请创建一个EC2实例并在那里部署kafka连接。

来到TLS-如果您启用客户端TLS身份验证-您需要寻找boostrap_broker_tls。

汪臻
2023-03-14

MSK不提供Kafka连接服务。您需要在自己的计算机或其他AWS计算资源上安装此软件。从那里,您需要安装Camel连接器插件

 类似资料:
  • 我有哪些选项可以将Kafka与Spring靴骆驼连接? 我正在运行ActiveMQ Artemis和Camel,以建立进出客户端的JMS/MQTT和REST路由。我想把Kafka添加到这个二重唱中,以流式传输/交换数据(视频音频、文件/文本)。 到目前为止,我下载了Kafka汇合平台(免费试用),我正在测试他们提供什么。在融合平台中,我看到有可能将连接器作为“插件”添加。我假设我可以添加Camel

  • 我正试图在GAE上开始一条骆驼路线,但却撞上了一堵又一堵砖墙。首先,我尝试了如下路线: 但那什么都起不到作用。由于对Camel和GAE都是新手,我怀疑这是因为组件在路由开始时不充当使用者。然后我试着启动计时器: 却从GAE中得到了一个严重的错误: 错误:拒绝访问(java.lang.RuntimePermission modifyThreadGroup) 事实证明,您不能在GAE上创建新的实例,而

  • 我有一个Spring Boot2.25.1应用程序,它使用Camel 2.25.1与camel-kafka,一切都正常工作…在我的Kafka消费者中,我需要添加该功能以按需暂停消费,因此我升级到camel 3.18.1,以便我可以使用可暂停功能。升级到3.18.1后,我收到错误FileNotes与类文件TimeoutAwareAggregationStategy.class. 当我打开camel-

  • 我最近注意到Camel现在有自己的Kafka组件,所以我决定给它一个旋转。 我决定尝试一个很好的简单文件->kafka主题如下...

  • 我已经尝试了几个小时来获取Spring Boot应用程序的Camel路由(通过Camel组件camel-google-pubsub连接到Google Pubsub模拟器的本地实例),但没有成功。 null null 如果有人已经成功地使用pubsub模拟器与他们的骆驼路线,我对您的解决方案感兴趣。

  • 我有一个驼峰endpoint,基本上是Kafka消费者从一个主题中读取信息并将其发送到数据库。它工作得很好,但是,我很难对它进行单元测试,因为我无法模拟Kafkaendpoint。有谁能帮我在骆驼路线上嘲笑Kafka的消费者吗?