8.3 Connector 开发者指南

优质
小牛编辑
143浏览
2023-12-01

本指南介绍开发人员如何为 Kafka connector 编写新的 connectors,用于Kafka和其他系统之间移动数据。

Core Concepts 和 APIs

Connectors 和 Tasks

HDFSSinkConnector

要在Kafka和另一个系统之间复制数据,用户会为想要 pull 数据或者 push 数据的系统创建一个connector。 connector 有两类:SourceConnectors 从其他系统导入数据(e.g.JDBCSourceConnector 会将关系型数据库导入到Kafka中)和SinkConnectors导出数据(e.g. HDFSSinkConnector会将Kafka topic 的内容导出到 HDFS 文件)

Connectors 自身不执行任何数据复制:Connector的配置描述要复制的数据,并且Connector 负责负责将 job 分解为可分发给 worker 的一组 Tasks。这些Tasks也分为两类: SourceTaskSinkTask

通过分配,每个Task 必须将数据的一部分子集复制到Kafka或者从Kafka复制。在 Kafka Connect中,应该始终可以将这些分配的数据框架化为一组输入和输出流,这些流由具有一致结构的记录组成。有的时候这些映射是显而易见的:一组日志文件中的每个文件都认为是一个流,每条解析的行数据使用相同的模式和偏移量作为字节偏移量存储在文件中。在其他情况下,可能需要花费更多功夫来映射模型: 一个 JDBC connector可以将表映射成stream,但是offset不能确定。可以使用时间戳字段进行增量查询返回新的数据,最后查询的时间戳可以用作偏移量。

Streams 和 Records

每个stream 都应该是一串 key-value的就。keys和values可以有复杂的数据结构-提供了很多基本类型,arrays,objects和嵌套的数据结构。每个stream 都应该是一串 key-value的就。keys和values可以有复杂的数据结构-提供了很多基本类型,也可以用来表示arrays,objects和嵌套的数据结构。运行时的数据格式不承担任何特定的序列化格式:此转换由框架内部处理。

除了 key 和 value, records(sources生成的记录和发送到sinks的记录) 关联 stream IDs和offsets。框架会定期的提交已经处理数据的offsets,以便在发生故障时,可以从最后一次提交的offsets恢复处理,避免不必要的重新处理和重复事件。.

Dynamic Connectors

不是所有的jobs都是静态,所以Connector 的实现还要负责监控外部系统是否需要重新更改配置。例如,在JDBCSourceConnector的例子中,这个Connector可能分配一组 tables 给 Task。当一个新的 table 创建了,必须发现这个时间,以便通过更新配置来将新表分配给其中一个Tasks。当发现需要重新配置的变更(或者Tasks 数量)的时候,他会通知框架,并更新相应的Tasks

开发一个简单的 Connector

开发一个 connector 只需要实现两个接口, ConnectorTask接口. 一个简单的例子的源码在Kafkafile package中。 connector 用于单机模式,并拥有 SourceConnectorSourceTask实现来读取一个文件的每行记录,并将其作为记录发发送,SinkConnectorSinkTask将记录写入到文件。

本节的其余部分将通过一些代码演示创建 connector 的关键步骤,但开发人员还应参考完整的示例源代码,因为为简洁起见,省略了许多细节。

Connector Example

首先我们用一个简单的例子介绍一下 SourceConnectorSinkConnector 的实现和它非常相似。首先创建一个继承自 SourceConnector 的类,并添加一些字段来存储解析出的配置信息(要读取的文件名以及要发送数据的topic):

public class FileStreamSourceConnector extends SourceConnector {    private String filename;    private String topic;

最简单的方法是taskClass(),它定义了应该在 worker 进程中实例化以实际读取数据的类:

@Overridepublic Class<? extends Task> taskClass() {    return FileStreamSourceTask.class;}

我们会在后面定义 FileStreamSourceTask 类。接下来,我们要往FileStreamSourceConnector类中添加一些标准的生命周期方法。

:
@Overridepublic void start(Map<String, String> props) {    // The complete version includes error handling as well.    filename = props.get(FILE_CONFIG);    topic = props.get(TOPIC_CONFIG);}
@Overridepublic void stop() {    // Nothing to do since no background monitoring is required.}

最后,实现的真正核心是 taskConfigs()。因此即使允许我们按照maxTasks参数创建更多的 task,我们也只返回包含一个 entry 的 list:

@Overridepublic List<Map<String, String>> taskConfigs(int maxTasks) {    ArrayList<Map<String, String>> configs = new ArrayList<>();    // Only one input stream makes sense.    Map<String, String> config = new HashMap<>();    if (filename != null)        config.put(FILE_CONFIG, filename);    config.put(TOPIC_CONFIG, topic);    configs.add(config);    return configs;}

尽管这个示例中没有用到,但 SourceTask 还提供了两个用于提交源系统中的 offset 的 API:commitcommitRecord。这些 API 是为具有消息确认机制的源系统提供的。覆盖这些方法允许 source connector 在源系统中的消息被写入 Kafka 后,批量或单独确认消息。 commit API 将偏移量存储在源系统中,poll方法每次返回的偏移量都会被存储起来。这个API 的实现应该一直处于阻塞状态直到提交完成。在每个SourceRecord被写入 Kafka 之后,commitRecord API 会将其偏移量保存在源系统中。由于 Kafka Connect 会自动记录偏移量,因此不需要SourceTask来实现它们。在 connector 确实需要确认源系统中的消息的情况下,通常只需要其中一个API。

即使是多任务, 这个方法实现通常也很简单。它只需要确定 input task 的数量,这可能需要联系它从中拉取数据的远程服务,然后由这些 task 来分摊工作。由于这些多个 task 分摊工作的的模式非常普遍,因为在 ConnectorUtils 工具类中提供了一些实用程序来简化这些模式。

注意,这个简单的例子没有包括动态输入。有关如何触发 task 配置的更新,请参阅下一节的讨论。

Task Example - Source Task

接下来我们将描述相应的 SourceTask 的实现。 这里的实现很简短, 但如果要想完全涵盖本指南的内容就太长了。我们将用伪代码描述大多数实现,你也可以参考完整示例的源代码。

就像 Connector 一样,我们需要创建一个类并继承对应的基类Task。 它也有一些标准的生命周期方法:

public class FileStreamSourceTask extends SourceTask {    String filename;    InputStream stream;    String topic;
 @Override    public void start(Map<String, String> props) {        filename = props.get(FileStreamSourceConnector.FILE_CONFIG);        stream = openOrThrowError(filename);        topic = props.get(FileStreamSourceConnector.TOPIC_CONFIG);    }
 @Override    public synchronized void stop() {        stream.close();    }

这是一个轻量级的简化版本,但表明这些方法应该相对简单,并且它们唯一要做的就是分配和释放资源。 关于这个实现有两点要注意: 第一, start() 方法没有处理从之前的 offset 恢复的情形,这一点将在后面的章节中讨论。第二, stop() 方法是同步的。这是必须的,因为 SourceTasks 有一个专用的线程,可以无限期的阻塞下去,所以它们需要通过 Worker 中另一个线程的调用来停止。

接下来,我们要实现 task 的主要功能,poll()方法负责从输入系统获取事件并返回一个List<SourceRecord>

@Overridepublic List<SourceRecord> poll() throws InterruptedException {    try {        ArrayList<SourceRecord> records = new ArrayList<>();        while (streamValid(stream) && records.isEmpty()) {            LineAndOffset line = readToNextLine(stream);            if (line != null) {                Map<String, Object> sourcePartition = Collections.singletonMap("filename", filename);                Map<String, Object> sourceOffset = Collections.singletonMap("position", streamOffset);                records.add(new SourceRecord(sourcePartition, sourceOffset, topic, Schema.STRING_SCHEMA, line));            } else {                Thread.sleep(1);            }        }        return records;    } catch (IOException e) {        // Underlying stream was killed, probably as a result of calling stop. Allow to return        // null, and driving thread will handle any shutdown if necessary.    }    return null;}

同样,我们也省略了一些细节,但是我们可以看到重要的步骤:poll()方法会被反复调用,并且对于每次调用,它会循环尝试从文件中读取记录。对于它读取的每一行,它也会跟踪文件偏移量。它使用这些信息创建一个输出 SourceRecord,带有四部分信息:source partition(本示例中只有一个,即正在读取的单个文件),source offset(文件中的字节偏移量),output topic name 和 output value(读取的行,并且我们使用一个模式指定该值始终是一个字符串)。 SourceRecord构造函数的其他变体还可以包含特定的输出分区和密钥。

.注意,这个实现使用了普通的Java InputStream接口,没有可用数据时会休眠。这是可以接受的,因为 Kafka 为每个 task 提供了一个专用线程。虽然 task 的实现必须覆盖基本的 poll() 接口,但实现起来有很大的灵活性。在这种情况下,基于 NIO 的实现将更加高效,但这种简单的方法很有效,实现起来很快,并且与旧版本的 Java 兼容。

Sink Tasks

之前的部分描述了如何实现一个简单的SourceTask。 与 SourceConnectorSinkConnector不同的是,SourceTaskSinkTask 有着完全不同的接口:SourceTask 使用 pull 接口, SinkTask 使用 push 接口。两者共享相同的生命周期方法,但 SinkTask 接口有点特殊:

public abstract class SinkTask implements Task {    public void initialize(SinkTaskContext context) {        this.context = context;    }
 public abstract void put(Collection<SinkRecord> records);
 public void flush(Map<TopicPartition, OffsetAndMetadata> currentOffsets) {    }

SinkTask 的文档包含了所有的细节,但这个接口几乎和 SourceTask一样简单。put()方法应包含了大部分实现,包括接受 SinkRecords 集合,以及执行任何必需的转换,并将它们存储在目标系统中。此方法无需确保数据在返回之前已完全写入目标系统。实际上,在许多情况下 internal buffering 会很有用,使得我们可以一次发送整批记录,从而减少将事件插入下游数据存储的开销。SinkRecords 而本质上和 SourceRecords 有着相同的信息: Kafka topic, partition, offset and the event key and value。

flush() 方法用于提交 offset 数据,这使得 task 可以从故障中恢复并且从安全点恢复,因此不会有事件丢失。该方法应该将任何未完成的数据推送到目标系统,然后阻塞,直到写入被确认。 参数 offsets 通常会被忽略,但在某些情况下很有用,例如实现需要在目标存储区中存储 offset 信息以提供精确的 exactly-once 交付。例如,HDFS connector 可以做到这一点,它使用原子性移动操作来确保 flush() 操作以原子方式将数据和 offset 提交到 HDFS 中的最终位置。

Resuming from Previous Offsets

SourceTask 的实现包含每条记录的 stream ID (本示例中是输入的文件名) 和 offset (记录在文件中的位置)。该框架使用它定期提交 offset 数据,以便在发生故障的情况下,task 可以恢复并且最小化 reprocess 和 可能重复的事件的数量(或者从最近 Kafka Connect 正常停止过的 offset 恢复,例如在独立模式下或作业重新配置导致的停止)。这个提交过程由框架完全自动化,但只有 connector 知道如何退回到 input stream 中的正确位置并从该位置恢复。

为了在启动时正确恢复,task 可以使用它的 initialize() 方法中传入的 SourceContext 来访问 offset 数据。在 initialize() 中,我们会添加更多的代码来读取 offset 数据(如果存在)并寻找对应的位置:

    stream = new FileInputStream(filename);    Map<String, Object> offset = context.offsetStorageReader().offset(Collections.singletonMap(FILENAME_FIELD, filename));    if (offset != null) {        Long lastRecordedOffset = (Long) offset.get("position");        if (lastRecordedOffset != null)            seekToOffset(stream, lastRecordedOffset);    }

当然,您可能需要为每个 input stream 读取多个 key。OffsetStorageReader 接口允许您发出批量读取以有效加载所有 offset,然后通过查找每个输入流来把它们放到对应的位置。

动态输入/输出流

Kafka Connect 旨在定义批量数据复制作业,例如复制整个数据库,而不是创建许多作业以分别复制每张表。这种设计的一个结果是,connector 的输入或输出流的集合可以随时间变化。

Source connector 需要监视源系统的变化,例如数据库中表的增加与删除。当数据库中的表发生改变时,他们应通过ConnectorContext对象通知框架需要重新配置。例如,在 SourceConnector 中:

    if (inputsChanged())        this.context.requestTaskReconfiguration();

该框架将及时请求新的配置信息并更新 task,使它们能够在再次配置之前正常地提交进度。请注意,在 SourceConnector 中,该监视功能当前由 connector 实现决定。如果需要额外的线程来执行监视,则连接器必须自行分配线程。

理想情况下,用于监视变动的的代码将独立于 Connector,不会对 task 有任何影响。然而,变动也可以影响 task,最常见的是其输入流之一在输入系统中被破坏,例如,如果一个表从数据库中删除。如果TaskConnector之前遇到问题,这在Connector轮询数据源变动的时候很常见,需要由 Task 处理后续的 error。谢天谢地,这通常可以通过捕捉和处理适当的 exception 来处理。

SinkConnectors 通常只需要处理 stream 中增加的内容,这可能会转化为其输出中的新条目(例如新的数据库表)。该框架负责管理 Kafka 输入发生的任何更改,例如,由于批量订阅而导致输入主题集发生更改时。 SinkTasks 应该期望新的输入流,这可能需要在下游系统中创建新资源,例如数据库中的新表。在这些情况下最棘手的情况可能是多个 SinkTasks 第一次看到新的输入流并同时尝试创建新的资源。另一方面,SinkConnectors通常不需要特殊的代码来处理动态的流集。

连接配置验证

Kafka Connect 允许在提交要执行的 connector 之前验证 connector 配置,并且可以提供有关错误和建议值的反馈。为了利用这一点,连接器开发人员需要提供一个config()的实现来将配置定义公开给框架。

下面 FileStreamSourceConnector 中的代码定义了配置并将其公开给框架。

    private static final ConfigDef CONFIG_DEF = new ConfigDef()        .define(FILE_CONFIG, Type.STRING, Importance.HIGH, "Source filename.")        .define(TOPIC_CONFIG, Type.STRING, Importance.HIGH, "The topic to publish data to");
 public ConfigDef config() {        return CONFIG_DEF;    }

ConfigDef 类用于指定一组预期的配置。对于每个配置,您可以指定名称,类型,默认值,文档,组信息,组中的顺序,配置值的宽度以及适合在UI中显示的名称。另外,您可以通过覆盖 Validator 类来提供用于单个配置验证的特殊验证逻辑。此外,由于配置之间可能存在依赖关系,例如,配置的有效值和可见性可能会根据配置中的其它值而改变。为了解决这个问题,ConfigDef 允许你指定配置的依赖关系,并提供了一个 Recommender 的实现来获取有效的值,并给定当前配置值的配置可见性。

另外,Connector 中的 validate() 方法提供了一个默认验证的实现,该实现返回允许配置的列表以及每个配置的配置错误和推荐值。但是,它不使用建议的值进行配置验证。您可以覆盖默认实现以提供自定义的配置验证,该自定义配置验证可能使用推荐值。

处理模式

FileStream connector 是个很好的例子,因为它们很简单,但它们也有着简单的结构化数据——每行只是一个字符串。几乎所有实用的 connector 都需要具有更复杂数据格式的模式。

要创建更复杂的数据,您需要使用 Kafka Connect 的data API。大多数结构化记录除了原始类型外还需要与两个类进行交互:SchemaStruct

其 API 文档提供了一个完整的参考,但下面是一个创建 SchemaStruct 的简单示例::

Schema schema = SchemaBuilder.struct().name(NAME)    .field("name", Schema.STRING_SCHEMA)    .field("age", Schema.INT_SCHEMA)    .field("admin", new SchemaBuilder.boolean().defaultValue(false).build())    .build();
Struct struct = new Struct(schema)    .put("name", "Barbara Liskov")    .put("age", 75);

如果您正在实现一个 source connector,则需要确定何时以及如何创建模式。在可能的情况下,应尽可能避免重新计算它们。例如,如果您的 connector 确定有固定模式,请静态创建并重用单个实例。

但是,许多 connector 都有动态模式。一个简单的例子就是 database connector。考虑到即使数据库只有一个表,也不会为整个 connector 预定义模式(因为它随表格而变化)。但是在 connector 的整个生命周期中,它也可能不会因为只有一张表而使用固定模式,因为用户可能会执行 ALTER TABLE 命令,connector必须能够检测到这些变化并做出适当的反应。

Sink connectors 通常更简单,因为它们在消耗数据,因此不需要创建模式。但是,他们应该同样仔细地验证他们收到的模式是否具有预期的格式。当模式不匹配时,通常表明上游 produce 正在生成无法正确转换到目标系统的无效数据,sink connector 应该抛出异常来向系统指示此错误。

Kafka Connect Administration

Kafka Connect 的 REST layer 层提供了一组 API 对群集进行管理。这包括查看 connector 配置和任务状态的 API,以及更改其当前行为的 API(例如更改配置和重新启动任务)。

当 connector 首次提交到群集时,worker 将重新平衡群集中的全部 connector 及其 task,以便每个 worker的工作量大致相同。当 connector 增加或减少它们需要的任务数量或连接器的配置发生变化时,也使用相同的重新平衡过程。您可以使用 REST API 来查看 connector 及其 task 的当前状态,包括每个 connector 分配的 worker 的 ID。例如,查询文件源的状态(使用 GET/connectors/file-source/status)可能会产生如下输出:

{"name": "file-source","connector": {    "state": "RUNNING",    "worker_id": "192.168.1.208:8083"},"tasks": [    {    "id": 0,    "state": "RUNNING",    "worker_id": "192.168.1.209:8083"    }]}

Connector及其 task 将状态更新发布到群集中所有 worker 监视的共享主题(使用status.storage.topic配置)。由于 worker 异步使用此主题,因此状态更改通过状态 API 可见之前通常会有(短)延迟。以下状态可用于 connector 或其 task 之一:

  • UNASSIGNED: Connector/Task 还没有分配给一个worker。
  • RUNNING: Connector/Task 正在运行。
  • PAUSED: Connector/Task 已经被管理暂停。
  • FAILED: Connector/Task 失败了(通常是通过引发一个异常,在状态输出中报告)。

在大多数情况下,connector 和 task 状态都会匹配,但在发生变动或 task 失败时,它们在短时间内可能会有所不同。例如,首次启动 connector 时,connector 及其 task 全部转换为 RUNNING 状态之前可能会有明显的延迟。由于 Connect 不会自动重启失败的 task,因此 task 失败时状态也会发生变化。要手动重新启动 connector/task,可以使用上面列出的重启 API。请注意,如果尝试在重新平衡过程中重启 task,Connect 将返回409(冲突)状态码。您可以在重新平衡完成后重试,但可能没有必要,因为重新平衡实际上会重启群集中的所有 connector 和 task。

有时候暂时停止 connector 的消息处理很有用。例如,如果远程系统正在进行维护,则 source connector 最好停止从该系统中轮询获取新数据,而不是使用异常垃圾消息填充日志。对于这个用例,Connect 提供了一个 pause/resume API。当 source connector 暂停时,Connect 将停止轮询它以获取其他记录。当 sink connector 暂停时,Connect 将停止向其发送新消息。暂停状态是持久化的,因此即使重新启动群集,connector 也不会再次开始消息处理,直到 task 恢复。请注意,在所有 connector 的 task 都转换到 PAUSED 状态之前可能会有一段延迟,因为它们可能需要一些时间才能完成暂停过程中的任何处理。另外,失败的任务在重启之前不会转换到 PAUSED 状态。