当前位置: 首页 > 知识库问答 >
问题:

如何从kafka主题检索消息,并在融合超文本传输协议接收器连接器中使用相同的值?

齐献
2023-03-14

我正在使用融合的 http 接收器连接器从 kafka 主题读取消息并将其发送到endpoint。下面是 http 接收器连接器,

    {
      "name": "HttpSink",
      "config": {
        "topics": "http-messages",
        "tasks.max": "1",
        "connector.class": "io.confluent.connect.http.HttpSinkConnector",
        "value.converter": "org.apache.kafka.connect.storage.StringConverter",
        "topics":"topic-name",
        "request.method":"POST",
        "behavior.on.null.values":"ignore",
        "behavior.on.error":"log",
        "confluent.topic.bootstrap.servers": "localhost:9092",
        "confluent.topic.replication.factor": "1",
        "reporter.bootstrap.servers": "localhost:9092",
        "reporter.result.topic.name": "success-responses",
        "reporter.result.topic.replication.factor": "1",
        "reporter.error.topic.name":"error-responses",
        "reporter.error.topic.replication.factor":"1",
        "http.api.url": "",
        "auth.type":"BASIC",
        "connection.user":"",
        "connection.password":"" 
      }
    }

主题中的消息具有以下 json 格式,

    {
       "endpoint url": "http://localhost:8080/api/messages",
       "rest method":"",
       "credentials": {"username":"username", "password":"password"},
       "payload": {}
    }

因此,使用超文本传输协议接收器连接器,我需要从主题消息中检索“endpointurl”和“凭据”的值,并在同一连接器中使用与“http.api.url”、“connection.user”和“connection.password”键的值相同的值。

所以最终的格式应该是这样的,

    {
      "name": "HttpSink",
      "config": {
        "topics": "http-messages",
        "tasks.max": "1",
        "connector.class": "io.confluent.connect.http.HttpSinkConnector",
        "value.converter": "org.apache.kafka.connect.storage.StringConverter",
        "topics":"topic",
        "request.method":"POST",
        "behavior.on.null.values":"ignore",
        "behavior.on.error":"log",
        "confluent.topic.bootstrap.servers": "localhost:9092",
        "confluent.topic.replication.factor": "1",
        "reporter.bootstrap.servers": "localhost:9092",
        "reporter.result.topic.name": "success-responses",
        "reporter.result.topic.replication.factor": "1",
        "reporter.error.topic.name":"error-responses",
        "reporter.error.topic.replication.factor":"1",
        "http.api.url": "http://localhost:8080/api/messages",
        "auth.type":"BASIC",
        "connection.user":"username",
        "connection.password":"password"
      }
    }

如何从主题消息中检索数据并在同一连接器中使用它?

共有1个答案

宋飞文
2023-03-14

我找到了答案。这可以通过创建自定义连接器来实现。下面是http接收器自定义连接器的参考链接。

使用专家:https://github.com/university-of-auckland/kafka-http-sink-connector

使用gradle:https://github.com/aiven/aiven-kafka-connect-http

执行步骤,

Step 1 : Clone the project and build
Step 2 : Copy the jar to the desired location. For example, you can create a directory named <path-to-kafka>/share/kafka/plugins then copy the connector plugin jar.
Step 3 : Add this to the plugin path in your Connect properties file. For example, plugin.path=/usr/local/share/kafka/plugins. Kafka Connect finds the plugins using its plugin path. A plugin path is a comma-separated list of directories defined in the Kafka Connect's worker configuration.
Step 4 : Start the Connect workers with that configuration. Connect will discover all connectors defined within those plugins.

执行插件的命令,

$CONFLUENT_HOME/bin/connect-standalone $CONFLUENT_HOME/etc/kafka/connect-standalone.properties $CONFLUENT_HOME/etc/kafka/connect-http-sink.properties

connect-http-sink.properties文件的内容如下所示:

name=HttpSink
http.api.url=http://localhost:8080/api/messages
request.methods=POST
topics=http-test-topic
connector.class=com.connector.HttpSinkConnector
 类似资料:
  • 我需要检查与vbscript的http连接 我想打给主机看看主机是否有反应 我需要测试到特定端口的连接,为什么不使用url呢 你有解决办法吗?

  • 我是新的Kafka,我试图通过超文本传输协议从外部应用程序发布数据,但我找不到这样做的方法。 我已经在kafka中创建了一个主题,并对其进行了测试,以生成和使用消息,但我不知道如何通过http插入/发布消息,我尝试调用以下url来检索主题,但它没有检索任何数据http://servername:2181/topics/ 我使用的是cloudera 5.12.1。

  • 我是Kafka的新手,我有一个使用Java Apache Camel库实现的Kafka消费者。我发现的问题是-消费者花了很长的时间(>15分钟)来处理很少的消息-这对于我们的用例来说是很好的。 需要一些配置帮助,因为相同的消息会在15分钟后重新发送,如果在15分钟内没有处理(我相信线程控制不会返回)。我想这可能是默认间隔,不确定这是哪一个属性。 那么,我必须在哪里修复配置 生产者级别,以便它不重新

  • 我正在使用Kafka连接JDBC源连接器从数据库中的视图中读取并将其发布在kafka上,它工作正常。 我的用例是用户可以创建多个对象,并且对象的顺序在我的应用程序中很重要。我想使用用户 ID 作为我发布到主题中的所有消息的消息密钥,以保持它们的顺序。 我的问题是,如何在Kafka connect source连接器中定义消息键?

  • 我只是Kafka的新手,我有个问题: 我在Kafka中有一个主题“A”,我启动Spring boot应用程序并使用MessageChannel向主题“A”发送一些消息,然后我停止应用程序。 当我再次启动应用程序时,是否可以获取我发送到主题“A”的最新消息(并非所有消息)?我搜索了所有的解决方案,但它们对我帮助不大,如果我只发送新消息,它总是会立即收到消息。如果你有可运行的代码,请分享,我非常感谢:

  • 对于托管在 Confluent Cloud 中的 Kafka 集群,会创建一个审核日志集群。似乎可以将接收器连接器挂接到此群集,并从“汇合审核日志事件”主题中排出事件。 但是,当我运行连接器执行相同操作时,我遇到了以下错误。 在我的connect-distributed.properties文件中,我的设置如下: 需要授予哪些额外的权限,以便连接器可以在集群中创建所需的主题?connect-dis