当前位置: 首页 > 面试题库 >

调试自定义Kafka连接器的简单有效方法是什么?

傅志用
2023-03-14
问题内容

我正在使用几个Kafka连接器,但在控制台输出中看不到它们的创建/部署中的任何错误,但是我没有得到想要的结果(无论是任何结果,无论是期望的还是除此以外)。我基于Kafka的示例FileStream连接器制作了这些连接器,因此我的调试技术基于该示例中使用的SLF4J
Logger的使用。我搜索了我认为会在控制台输出中产生的日志消息,但无济于事。我在这些消息中找错了地方吗?还是有调试这些连接器的更好方法?

我为实现参考的SLF4J Logger的示例用法:

Kafka
FileStreamSinkTask

Kafka
FileStreamSourceTask


问题答案:

我将尝试广泛地回答您的问题。连接器开发的一种简单方法如下:

  • 通过查看公开提供的许多Kafka连接器之一来构建和构建连接器源代码(您可以在此处找到广泛的列表:https : //www.confluent.io/product/connectors/)
  • 从https://www.confluent.io/download/下载最新的Confluent开源版本(> = 3.3.0)
  • 通过以下其中一种方式使连接器包可用于Kafka Connect:

    1. 将所有连接器jar文件(连接器jar以及不包括Connect API jar的依赖关系jar)存储到文件系统中的某个位置,并通过将该位置添加到plugin.pathConnect worker属性中的属性来启用插件隔离 。例如,如果连接器罐子存储在中/opt/connectors/my-first-connector,则将plugin.path=/opt/connectors在工作人员的属性中进行设置(请参见下文)。
    2. 将所有连接器jar文件存储在下的文件夹中${CONFLUENT_HOME}/share/java。例如:${CONFLUENT_HOME}/share/java/kafka-connect-my-first-connector。(需要以kafka-connect-启动脚本选择的前缀开头)。$ CONFLUENT_HOME是您安装Confluent Platform的位置。
    3. (可选)通过更改“连接${CONFLUENT_HOME}/etc/kafka/connect-log4j.properties到” DEBUG或“甚至” 的日志级别来增加日志记录TRACE
  • 使用Confluent CLI启动所有服务,包括Kafka Connect。此处的详细信息:http : //docs.confluent.io/current/connect/quickstart.html

简要地: confluent start

注意:CLI当前加载的Connect worker的属性文件是${CONFLUENT_HOME}/etc/schema- registry/connect-avro-distributed.properties。如果选择启用类加载隔离,但也需要更改Connect
worker的属性,则应编辑该文件。

  • 一旦运行了Connect worker,请运行以下命令启动连接器:

confluent load <connector_name> -d <connector_config.properties>

要么

confluent load <connector_name> -d <connector_config.json>

连接器配置可以采用Java属性或JSON格式。

  • 运行 confluent log connect以打开Connect worker的日志文件,或通过运行直接导航到您的日志和数据的存储位置

cd "$( confluent current )"

注意:在Confluent
CLI的会话期间,通过CONFLUENT_CURRENT适当地设置环境变量来更改日志和数据的存储位置。例如,假设/opt/confluent存在,并且是您要存储数据的位置,请运行:

export CONFLUENT_CURRENT=/opt/confluent
confluent current

  • 最后,以交互方式调试连接器的一种可能方法是在使用Confluent CLI启动Connect之前应用以下方法:

confluent stop connect
export CONNECT_DEBUG=y; export DEBUG_SUSPEND_FLAG=y;
confluent start connect

然后与调试器连接(例如,远程连接到Connect worker(默认端口:5005)。要停止以调试模式运行connect,只需unset CONNECT_DEBUG; unset DEBUG_SUSPEND_FLAG;在完成后运行:。

我希望以上内容将使您的连接器开发更加轻松和……更加有趣!



 类似资料:
  • 我有Kafka主题,有多种类型的消息流入并使用Kafka Connect写入弹性搜索。流看起来不错,直到我不得不将唯一的消息集分离到唯一的索引中。也就是说,我必须根据字段(JSON消息)为新的数据集获取新的索引。 我如何配置/定制Kafka connect以实现同样的功能?每个消息都包含一个表示消息类型和时间戳的字段。 示例 Json 如下所示: Sample1: {“log”:{“data”:“

  • 本文向大家介绍自定义一个简单的JDBC连接池实现方法,包括了自定义一个简单的JDBC连接池实现方法的使用技巧和注意事项,需要的朋友参考一下 一、什么是JDBC连接池? 在传统的JDBC连接中,每次获得一个Connection连接都需要加载通过一些繁杂的代码去获取,例如以下代码: 这样繁杂的操作只为了获取一次连接,当然,我们可以将其封装成一个工具类来访问(上图以封装好Connection的连接),但

  • 我正在尝试使用docker容器中的kafka connect和一个自定义连接器(PROGRESS _ DATADIRECT _ JDBC _ OE _ all . jar)来连接openedge数据库。 我将JAR文件放在插件路径(usr/share/java)中,但它不会作为连接器加载。 我可以通过将另一个(标准)连接器放在插件路径中来加载它。这行得通 有点不知道如何前进,我对Kafka很陌生。

  • 目标是:开发一个自定义Kafka连接器,该连接器以循环方式从websocket读取消息。我试着给你们举一个我所认识到的例子: 我创建了一个接口IWebsocketClientEndpoint 以及实现上述接口的类: WebsocketClientEndpoint类专用于创建websocket并管理连接、断开连接、发送和接收消息。 目标是:如何在Kafka连接结构中调整我的websocket结构?我

  • 所有这些都如预期的那样工作,只是一旦表显示了它的第一行,它就会每秒调用单元格呈现器大约十次...直到用户关闭表。 一旦我在其中得到大约20行,表格变得相当缓慢,需要2-8秒来调整列的大小,向上或向下scoll,或用选定的背景色呈现选定的行。 我在呈现器中插入了一个print语句,这样我就可以看到getTableCellRendererComponent方法被调用了多少次。

  • 问题内容: 在JavaScript中串联N个对象数组的最有效方法是什么? 数组是可变的,结果可以存储在输入数组之一中。 问题答案: 如果要连接两个以上的数组,那么这样做是为了方便和可能的性能。 对于仅连接两个数组,可以使用接受多个包含要添加到数组中的元素的参数的事实来代替将一个数组中的元素添加到另一个数组的末尾而不产生新数组。使用它也可以代替它,但是这样做似乎没有性能优势。 在ECMAScript