8.2 用户指南

优质
小牛编辑
145浏览
2023-12-01

quickstart 提供了一个简单的例子,演示如何运行一个单机版的Kafka Connect。 这一节描述如何配置,如何管理Kafka Connect 的更多细节。

运行 Kafka Connect

Kafka Connect 当前支持两种执行方式: 单机 (单个进程) 和 分布式.

在单机模式下所有的工作都是在一个进程中运行的。connect的配置项很容易配置和开始使用,当只有一台机器(worker)的时候也是可用的(例如,收集日志文件到kafka),但是不利于Kafka Connect 的容错。你可以通过下面的命令启动一个单机进程:

> bin/connect-standalone.sh config/connect-standalone.properties connector1.properties [connector2.properties ...]

第一个参数是 worker 的配置文件. 其中包括 Kafka connection 参数,序列化格式,和如何频繁的提交offsets。 所提供的示例可以在本地良好的运行,使用默认提供的配置 config/server.properties 。它需要调整以配合不同的配置或生产环境部署。所有的workers(独立和分布式)都需要一些配置 :

  • bootstrap.servers - List of Kafka servers used to bootstrap connections to Kafka
  • key.converter - Converter class used to convert between Kafka Connect format and the serialized form that is written to Kafka. This controls the format of the keys in messages written to or read from Kafka, and since this is independent of connectors it allows any connector to work with any serialization format. Examples of common formats include JSON and Avro.
  • value.converter - Converter class used to convert between Kafka Connect format and the serialized form that is written to Kafka. This controls the format of the values in messages written to or read from Kafka, and since this is independent of connectors it allows any connector to work with any serialization format. Examples of common formats include JSON and Avro.

单机模式的重要配置如下:

  • offset.storage.file.filename - 存储 offset 数据的文件

此处配置的参数适用于由Kafka Connect使用的 producer 和 consumer 访问配置,offset 和 status topic。对于 Kafka source和 sink 任务的配置,可以使用相同的参数,但必须以consumer.producer. 作为前缀。 此外,从 worker 配置中继承的参数只有一个,就是 bootstrap.servers。大多数情况下,这是足够的,因为同一个集群通常用于所有的场景。但是需要注意的是一个安全集群,需要额外的参数才能允许连接。这些参数需要在 worker 配置中设置三次,一次用于管理访问,一次用于 Kafka sinks,还有一次用于 Kafka source。

其余参数用于 connector 的配置文件,你可以导入尽可能多的配置文件,但是所有的配置文件都将在同一个进程内(在不同的线程上)执行。

分布式模式下会自动进行负载均衡,允许动态的扩缩容,并提供对 active task,以及这个任务对应的配置和offset提交记录的容错。分布式执行方式和单机模式非常相似:

> bin/connect-distributed.sh config/connect-distributed.properties

和单机模式不同在于启动的实现类和决定 Kafka connect 进程如何工作的配置参数,如何分配 work,offsets 存储在哪里和任务状态。在分布式模式中,Kafka Connect 存储 offsets,配置和存储在 Kafka topic中的任务状态。建议手动创建Kafka 的 offsets,配置和状态,以实现自己所期望的分区数和备份因子。如果启动Kafka Connect之前没有创建 topic,则会使用默认分区数和复制因子自动创建创建 topic,但这可能不是最适合的。

特别是,除了上面提到的常用设置之外,以下配置参数在启动集群之前至关重要:

  • group.id (default connect-cluster) - unique name for the cluster, used in forming the Connect cluster group; note that this must not conflict with consumer group IDs
  • config.storage.topic (default connect-configs) - topic to use for storing connector and task configurations; note that this should be a single partition, highly replicated, compacted topic. You may need to manually create the topic to ensure the correct configuration as auto created topics may have multiple partitions or be automatically configured for deletion rather than compaction
  • offset.storage.topic (default connect-offsets) - topic to use for storing offsets; this topic should have many partitions, be replicated, and be configured for compaction
  • status.storage.topic (default connect-status) - topic to use for storing statuses; this topic can have multiple partitions, and should be replicated and configured for compaction

注意在分布式模式下 connector 配置不会通过命令行传递。相反,会使用下面提到的 REST API来创建,修改和销毁 connectors。

Configuring Connectors

Connector 配置是简单的key-value 映射的格式。对于单机模式,这些配置会在 properties 文件中定义,并通过命令行传递给 Connect 进程。在分布式模式中,它们将被包含在创建(或修改)connector 的请求的JSON格式串中。

大多数配置都依赖于 connectors,所以在这里不能概述。但是,有几个常见选项可以看一下:

  • name - Unique name for the connector. Attempting to register again with the same name will fail.
  • connector.class - The Java class for the connector
  • tasks.max - The maximum number of tasks that should be created for this connector. The connector may create fewer tasks if it cannot achieve this level of parallelism.
  • key.converter - (optional) Override the default key converter set by the worker.
  • value.converter - (optional) Override the default value converter set by the worker.

connector.class 配置支持多种名称格式:这个 connector class 的全名或者别名。如果 connector 是 org.apache.kafka.connect.file.FileStreamSinkConnector,则可以指定全名,也可以使用FileStreamSink 或 FileStreamSinkConnector 来简化配置。

Sink connectors 还有一个额外的选项来控制他的输出:

  • topics - A list of topics to use as input for this connector

对于任何其他选项,你应该查阅 connector的文档.

Transformations

connectors可以配置 transformations 操作,实现轻量级的消息单次修改,他们可以方便地用于数据修改和事件路由。

A transformation chain 可以在connector 配置中指定。

  • transforms - List of aliases for the transformation, specifying the order in which the transformations will be applied.
  • transforms.$alias.type - Fully qualified class name for the transformation.
  • transforms.$alias.$transformationSpecificConfig Configuration properties for the transformation

例如,让我们使用内置的 file soucre connector,并使用 transformation 来添加静态字段。

这个例子中,我们会使用 schemaless json 数据格式。为了使用 schemaless 格式,我们将 connect-standalone.properties 文件中下面两行从true改成false:

    key.converter.schemas.enable    value.converter.schemas.enable

这个 file source connector 读取每行数据作为一个字符串。我们会将每行数据包装进一个 Map 数据结构,然后添加一个二级字段来标识事件的来源。做这样一个操作,我们使用两个 transformations:

  • HoistField to place the input line inside a Map
  • InsertField to add the static field. In this example we'll indicate that the record came from a file connector

添加完 transformations, connect-file-source.properties 文件像下面这样:

    name=local-file-source    connector.class=FileStreamSource    tasks.max=1    file=test.txt    topic=connect-test    transforms=MakeMap, InsertSource    transforms.MakeMap.type=org.apache.kafka.connect.transforms.HoistField$Value    transforms.MakeMap.field=line    transforms.InsertSource.type=org.apache.kafka.connect.transforms.InsertField$Value    transforms.InsertSource.static.field=data_source    transforms.InsertSource.static.value=test-file-source

所有以transforms 为开头的行都将被添加了静态字段用于 transformations 。 你可以看到我们创建的两个 transformations: "InsertSource" 和 "MakeMap" 是我们给的 transformations 的别称. transformation 类型基于下面给的一系列内嵌 transformations。每个 transformation 类型都有额外的配置: HoistField 需要一个配置叫做 "field”,这是 map中原始字符串的字段名称。InsertField transformation 让我们指定字段名称和我们要添加的内容。

当我们对一个 sample file 运行 file source connector 操作,不做transformations 操作,然后使用kafka-console-consumer.sh 读取数据,结果如下:

    "foo"    "bar"    "hello world"

然后我们创建一个新的file connector,然后将这个transformations 添加到配置文件中。这次结果如下:

    {"line":"foo","data_source":"test-file-source"}    {"line":"bar","data_source":"test-file-source"}    {"line":"hello world","data_source":"test-file-source"}

你可以看到我们读取的行现在JSON map的一部分,并且还有一个静态值的额外字段。这只是用 transformations 做的一个简单的例子。.

Kafka Connect 包含几个广泛适用的数据和 routing transformations

  • InsertField - Add a field using either static data or record metadata
  • ReplaceField - Filter or rename fields
  • MaskField - Replace field with valid null value for the type (0, empty string, etc)
  • ValueToKey
  • HoistField - Wrap the entire event as a single field inside a Struct or a Map
  • ExtractField - Extract a specific field from Struct and Map and include only this field in results
  • SetSchemaMetadata - modify the schema name or version
  • TimestampRouter - Modify the topic of a record based on original topic and timestamp. Useful when using a sink that needs to write to different tables or indexes based on timestamps
  • RegexRouter - modify the topic of a record based on original topic, replacement string and a regular expression

如何配置每个transformation,参考下面:

org.apache.kafka.connect.transforms.InsertField

Insert field(s) using attributes from the record metadata or a configured static value.

Use the concrete transformation type designed for the record key (org.apache.kafka.connect.transforms.InsertField$Key) or value (org.apache.kafka.connect.transforms.InsertField$Value).

NameDescriptionTypeDefaultValid ValuesImportance
offset.fieldField name for Kafka offset - only applicable to sink connectors.
Suffix with ! to make this a required field, or ? to keep it optional (the default).
stringnullmedium
partition.fieldField name for Kafka partition. Suffix with ! to make this a required field, or ? to keep it optional (the default).stringnullmedium
static.fieldField name for static data field. Suffix with ! to make this a required field, or ? to keep it optional (the default).stringnullmedium
static.valueStatic field value, if field name configured.stringnullmedium
timestamp.fieldField name for record timestamp. Suffix with ! to make this a required field, or ? to keep it optional (the default).stringnullmedium
topic.fieldField name for Kafka topic. Suffix with ! to make this a required field, or ? to keep it optional (the default).stringnullmedium

org.apache.kafka.connect.transforms.ReplaceField

Filter or rename fields.

Use the concrete transformation type designed for the record key (org.apache.kafka.connect.transforms.ReplaceField$Key) or value (org.apache.kafka.connect.transforms.ReplaceField$Value).

NameDescriptionTypeDefaultValid ValuesImportance
blacklistFields to exclude. This takes precedence over the whitelist.list""medium
renamesField rename mappings.list""list of colon-delimited pairs, e.g. foo:bar,abc:xyzmedium
whitelistFields to include. If specified, only these fields will be used.list""medium

org.apache.kafka.connect.transforms.MaskField

Mask specified fields with a valid null value for the field type (i.e. 0, false, empty string, and so on).

Use the concrete transformation type designed for the record key (org.apache.kafka.connect.transforms.MaskField$Key) or value (org.apache.kafka.connect.transforms.MaskField$Value).

NameDescriptionTypeDefaultValid ValuesImportance
fieldsNames of fields to mask.listnon-empty listhigh

org.apache.kafka.connect.transforms.ValueToKey

Replace the record key with a new key formed from a subset of fields in the record value.

NameDescriptionTypeDefaultValid ValuesImportance
fieldsField names on the record value to extract as the record key.listnon-empty listhigh

org.apache.kafka.connect.transforms.HoistField

Wrap data using the specified field name in a Struct when schema present, or a Map in the case of schemaless data.

Use the concrete transformation type designed for the record key (org.apache.kafka.connect.transforms.HoistField$Key) or value (org.apache.kafka.connect.transforms.HoistField$Value).

NameDescriptionTypeDefaultValid ValuesImportance
fieldField name for the single field that will be created in the resulting Struct or Map.stringmedium

org.apache.kafka.connect.transforms.ExtractField

Extract the specified field from a Struct when schema present, or a Map in the case of schemaless data. Any null values are passed through unmodified.

Use the concrete transformation type designed for the record key (org.apache.kafka.connect.transforms.ExtractField$Key) or value (org.apache.kafka.connect.transforms.ExtractField$Value).

NameDescriptionTypeDefaultValid ValuesImportance
fieldField name to extract.stringmedium

org.apache.kafka.connect.transforms.SetSchemaMetadata

Set the schema name, version or both on the record's key (org.apache.kafka.connect.transforms.SetSchemaMetadata$Key) or value (org.apache.kafka.connect.transforms.SetSchemaMetadata$Value) schema.

NameDescriptionTypeDefaultValid ValuesImportance
schema.nameSchema name to set.stringnullhigh
schema.versionSchema version to set.intnullhigh

org.apache.kafka.connect.transforms.TimestampRouter

Update the record's topic field as a function of the original topic value and the record timestamp.

This is mainly useful for sink connectors, since the topic field is often used to determine the equivalent entity name in the destination system(e.g. database table or search index name).

NameDescriptionTypeDefaultValid ValuesImportance
timestamp.formatFormat string for the timestamp that is compatible with java.text.SimpleDateFormat.stringyyyyMMddhigh
topic.formatFormat string which can contain ${topic} and ${timestamp} as placeholders for the topic and timestamp, respectively.string${topic}-${timestamp}high

org.apache.kafka.connect.transforms.RegexRouter

Update the record topic using the configured regular expression and replacement string.

Under the hood, the regex is compiled to a java.util.regex.Pattern. If the pattern matches the input topic, java.util.regex.Matcher#replaceFirst() is used with the replacement string to obtain the new topic.

NameDescriptionTypeDefaultValid ValuesImportance
regexRegular expression to use for matching.stringvalid regexhigh
replacementReplacement string.stringhigh

org.apache.kafka.connect.transforms.Flatten

Flatten a nested data structure, generating names for each field by concatenating the field names at each level with a configurable delimiter character. Applies to Struct when schema present, or a Map in the case of schemaless data. The default delimiter is '.'.

Use the concrete transformation type designed for the record key (org.apache.kafka.connect.transforms.Flatten$Key) or value (org.apache.kafka.connect.transforms.Flatten$Value).

NameDescriptionTypeDefaultValid ValuesImportance
delimiterDelimiter to insert between field names from the input record when generating field names for the output recordstring.medium

org.apache.kafka.connect.transforms.Cast

Cast fields or the entire key or value to a specific type, e.g. to force an integer field to a smaller width. Only simple primitive types are supported -- integers, floats, boolean, and string.

Use the concrete transformation type designed for the record key (org.apache.kafka.connect.transforms.Cast$Key) or value (org.apache.kafka.connect.transforms.Cast$Value).

NameDescriptionTypeDefaultValid ValuesImportance
specList of fields and the type to cast them to of the form field1:type,field2:type to cast fields of Maps or Structs. A single type to cast the entire value. Valid types are int8, int16, int32, int64, float32, float64, boolean, and string.listlist of colon-delimited pairs, e.g. foo:bar,abc:xyzhigh

org.apache.kafka.connect.transforms.TimestampConverter

Convert timestamps between different formats such as Unix epoch, strings, and Connect Date/Timestamp types.Applies to individual fields or to the entire value.

Use the concrete transformation type designed for the record key (org.apache.kafka.connect.transforms.TimestampConverter$Key) or value (org.apache.kafka.connect.transforms.TimestampConverter$Value).

NameDescriptionTypeDefaultValid ValuesImportance
target.typeThe desired timestamp representation: string, unix, Date, Time, or Timestampstringhigh
fieldThe field containing the timestamp, or empty if the entire value is a timestampstring""high
formatA SimpleDateFormat-compatible format for the timestamp. Used to generate the output when type=string or used to parse the input if the input is a string.string""medium

REST API

由于Kafka Connect 旨在作为服务运行,它还提供了一个用于管理 connectors 的REST API。sssss默认情况下,此服务在端口8083上运行。以下是当前支持的功能:

  • GET /connectors - return a list of active connectors
  • POST /connectors - create a new connector; the request body should be a JSON object containing a string name field and an object config field with the connector configuration parameters
  • GET /connectors/{name} - get information about a specific connector
  • GET /connectors/{name}/config - get the configuration parameters for a specific connector
  • PUT /connectors/{name}/config - update the configuration parameters for a specific connector
  • GET /connectors/{name}/status - get current status of the connector, including if it is running, failed, paused, etc., which worker it is assigned to, error information if it has failed, and the state of all its tasks
  • GET /connectors/{name}/tasks - get a list of tasks currently running for a connector
  • GET /connectors/{name}/tasks/{taskid}/status - get current status of the task, including if it is running, failed, paused, etc., which worker it is assigned to, and error information if it has failed
  • PUT /connectors/{name}/pause - pause the connector and its tasks, which stops message processing until the connector is resumed
  • PUT /connectors/{name}/resume - resume a paused connector (or do nothing if the connector is not paused)
  • POST /connectors/{name}/restart - restart a connector (typically because it has failed)
  • POST /connectors/{name}/tasks/{taskId}/restart - restart an individual task (typically because it has failed)
  • DELETE /connectors/{name} - delete a connector, halting all tasks and deleting its configuration

Kafka Connect还提供用于获取有关 connector plugin sss信息的REST API:

  • GET /connector-plugins- return a list of connector plugins installed in the Kafka Connect cluster. Note that the API only checks for connectors on the worker that handles the request, which means you may see inconsistent results, especially during a rolling upgrade if you add new connector jars
  • PUT /connector-plugins/{connector-type}/config/validate - validate the provided configuration values against the configuration definition. This API performs per config validation, returns suggested values and error messages during validation.