8.2 用户指南
quickstart 提供了一个简单的例子,演示如何运行一个单机版的Kafka Connect。 这一节描述如何配置,如何管理Kafka Connect 的更多细节。
运行 Kafka Connect
Kafka Connect 当前支持两种执行方式: 单机 (单个进程) 和 分布式.
在单机模式下所有的工作都是在一个进程中运行的。connect的配置项很容易配置和开始使用,当只有一台机器(worker)的时候也是可用的(例如,收集日志文件到kafka),但是不利于Kafka Connect 的容错。你可以通过下面的命令启动一个单机进程:
> bin/connect-standalone.sh config/connect-standalone.properties connector1.properties [connector2.properties ...]
第一个参数是 worker 的配置文件. 其中包括 Kafka connection 参数,序列化格式,和如何频繁的提交offsets。 所提供的示例可以在本地良好的运行,使用默认提供的配置 config/server.properties
。它需要调整以配合不同的配置或生产环境部署。所有的workers(独立和分布式)都需要一些配置 :
bootstrap.servers
- List of Kafka servers used to bootstrap connections to Kafkakey.converter
- Converter class used to convert between Kafka Connect format and the serialized form that is written to Kafka. This controls the format of the keys in messages written to or read from Kafka, and since this is independent of connectors it allows any connector to work with any serialization format. Examples of common formats include JSON and Avro.value.converter
- Converter class used to convert between Kafka Connect format and the serialized form that is written to Kafka. This controls the format of the values in messages written to or read from Kafka, and since this is independent of connectors it allows any connector to work with any serialization format. Examples of common formats include JSON and Avro.
单机模式的重要配置如下:
offset.storage.file.filename
- 存储 offset 数据的文件
此处配置的参数适用于由Kafka Connect使用的 producer 和 consumer 访问配置,offset 和 status topic。对于 Kafka source和 sink 任务的配置,可以使用相同的参数,但必须以consumer.
和 producer.
作为前缀。 此外,从 worker 配置中继承的参数只有一个,就是 bootstrap.servers
。大多数情况下,这是足够的,因为同一个集群通常用于所有的场景。但是需要注意的是一个安全集群,需要额外的参数才能允许连接。这些参数需要在 worker 配置中设置三次,一次用于管理访问,一次用于 Kafka sinks,还有一次用于 Kafka source。
其余参数用于 connector 的配置文件,你可以导入尽可能多的配置文件,但是所有的配置文件都将在同一个进程内(在不同的线程上)执行。
分布式模式下会自动进行负载均衡,允许动态的扩缩容,并提供对 active task,以及这个任务对应的配置和offset提交记录的容错。分布式执行方式和单机模式非常相似:
> bin/connect-distributed.sh config/connect-distributed.properties
和单机模式不同在于启动的实现类和决定 Kafka connect 进程如何工作的配置参数,如何分配 work,offsets 存储在哪里和任务状态。在分布式模式中,Kafka Connect 存储 offsets,配置和存储在 Kafka topic中的任务状态。建议手动创建Kafka 的 offsets,配置和状态,以实现自己所期望的分区数和备份因子。如果启动Kafka Connect之前没有创建 topic,则会使用默认分区数和复制因子自动创建创建 topic,但这可能不是最适合的。
特别是,除了上面提到的常用设置之外,以下配置参数在启动集群之前至关重要:
group.id
(defaultconnect-cluster
) - unique name for the cluster, used in forming the Connect cluster group; note that this must not conflict with consumer group IDsconfig.storage.topic
(defaultconnect-configs
) - topic to use for storing connector and task configurations; note that this should be a single partition, highly replicated, compacted topic. You may need to manually create the topic to ensure the correct configuration as auto created topics may have multiple partitions or be automatically configured for deletion rather than compactionoffset.storage.topic
(defaultconnect-offsets
) - topic to use for storing offsets; this topic should have many partitions, be replicated, and be configured for compactionstatus.storage.topic
(defaultconnect-status
) - topic to use for storing statuses; this topic can have multiple partitions, and should be replicated and configured for compaction
注意在分布式模式下 connector 配置不会通过命令行传递。相反,会使用下面提到的 REST API来创建,修改和销毁 connectors。
Configuring Connectors
Connector 配置是简单的key-value 映射的格式。对于单机模式,这些配置会在 properties 文件中定义,并通过命令行传递给 Connect 进程。在分布式模式中,它们将被包含在创建(或修改)connector 的请求的JSON格式串中。
大多数配置都依赖于 connectors,所以在这里不能概述。但是,有几个常见选项可以看一下:
name
- Unique name for the connector. Attempting to register again with the same name will fail.connector.class
- The Java class for the connectortasks.max
- The maximum number of tasks that should be created for this connector. The connector may create fewer tasks if it cannot achieve this level of parallelism.key.converter
- (optional) Override the default key converter set by the worker.value.converter
- (optional) Override the default value converter set by the worker.
connector.class
配置支持多种名称格式:这个 connector class 的全名或者别名。如果 connector 是 org.apache.kafka.connect.file.FileStreamSinkConnector,则可以指定全名,也可以使用FileStreamSink 或 FileStreamSinkConnector 来简化配置。
Sink connectors 还有一个额外的选项来控制他的输出:
topics
- A list of topics to use as input for this connector
对于任何其他选项,你应该查阅 connector的文档.
Transformations
connectors可以配置 transformations 操作,实现轻量级的消息单次修改,他们可以方便地用于数据修改和事件路由。
A transformation chain 可以在connector 配置中指定。
transforms
- List of aliases for the transformation, specifying the order in which the transformations will be applied.transforms.$alias.type
- Fully qualified class name for the transformation.transforms.$alias.$transformationSpecificConfig
Configuration properties for the transformation
例如,让我们使用内置的 file soucre connector,并使用 transformation 来添加静态字段。
这个例子中,我们会使用 schemaless json 数据格式。为了使用 schemaless 格式,我们将 connect-standalone.properties
文件中下面两行从true改成false:
key.converter.schemas.enable value.converter.schemas.enable
这个 file source connector 读取每行数据作为一个字符串。我们会将每行数据包装进一个 Map 数据结构,然后添加一个二级字段来标识事件的来源。做这样一个操作,我们使用两个 transformations:
- HoistField to place the input line inside a Map
- InsertField to add the static field. In this example we'll indicate that the record came from a file connector
添加完 transformations, connect-file-source.properties
文件像下面这样:
name=local-file-source connector.class=FileStreamSource tasks.max=1 file=test.txt topic=connect-test transforms=MakeMap, InsertSource transforms.MakeMap.type=org.apache.kafka.connect.transforms.HoistField$Value transforms.MakeMap.field=line transforms.InsertSource.type=org.apache.kafka.connect.transforms.InsertField$Value transforms.InsertSource.static.field=data_source transforms.InsertSource.static.value=test-file-source
所有以transforms
为开头的行都将被添加了静态字段用于 transformations 。 你可以看到我们创建的两个 transformations: "InsertSource" 和 "MakeMap" 是我们给的 transformations 的别称. transformation 类型基于下面给的一系列内嵌 transformations。每个 transformation 类型都有额外的配置: HoistField 需要一个配置叫做 "field”,这是 map中原始字符串的字段名称。InsertField transformation 让我们指定字段名称和我们要添加的内容。
当我们对一个 sample file 运行 file source connector 操作,不做transformations 操作,然后使用kafka-console-consumer.sh
读取数据,结果如下:
"foo" "bar" "hello world"
然后我们创建一个新的file connector,然后将这个transformations 添加到配置文件中。这次结果如下:
{"line":"foo","data_source":"test-file-source"} {"line":"bar","data_source":"test-file-source"} {"line":"hello world","data_source":"test-file-source"}
你可以看到我们读取的行现在JSON map的一部分,并且还有一个静态值的额外字段。这只是用 transformations 做的一个简单的例子。.
Kafka Connect 包含几个广泛适用的数据和 routing transformations
- InsertField - Add a field using either static data or record metadata
- ReplaceField - Filter or rename fields
- MaskField - Replace field with valid null value for the type (0, empty string, etc)
- ValueToKey
- HoistField - Wrap the entire event as a single field inside a Struct or a Map
- ExtractField - Extract a specific field from Struct and Map and include only this field in results
- SetSchemaMetadata - modify the schema name or version
- TimestampRouter - Modify the topic of a record based on original topic and timestamp. Useful when using a sink that needs to write to different tables or indexes based on timestamps
- RegexRouter - modify the topic of a record based on original topic, replacement string and a regular expression
如何配置每个transformation,参考下面:
org.apache.kafka.connect.transforms.InsertField
Insert field(s) using attributes from the record metadata or a configured static value.Use the concrete transformation type designed for the record key (org.apache.kafka.connect.transforms.InsertField$Key
) or value (org.apache.kafka.connect.transforms.InsertField$Value
).
Name | Description | Type | Default | Valid Values | Importance |
---|---|---|---|---|---|
offset.field | Field name for Kafka offset - only applicable to sink connectors. Suffix with ! to make this a required field, or ? to keep it optional (the default). | string | null | medium | |
partition.field | Field name for Kafka partition. Suffix with ! to make this a required field, or ? to keep it optional (the default). | string | null | medium | |
static.field | Field name for static data field. Suffix with ! to make this a required field, or ? to keep it optional (the default). | string | null | medium | |
static.value | Static field value, if field name configured. | string | null | medium | |
timestamp.field | Field name for record timestamp. Suffix with ! to make this a required field, or ? to keep it optional (the default). | string | null | medium | |
topic.field | Field name for Kafka topic. Suffix with ! to make this a required field, or ? to keep it optional (the default). | string | null | medium |
org.apache.kafka.connect.transforms.ReplaceField
Filter or rename fields.Use the concrete transformation type designed for the record key (org.apache.kafka.connect.transforms.ReplaceField$Key
) or value (org.apache.kafka.connect.transforms.ReplaceField$Value
).
Name | Description | Type | Default | Valid Values | Importance |
---|---|---|---|---|---|
blacklist | Fields to exclude. This takes precedence over the whitelist. | list | "" | medium | |
renames | Field rename mappings. | list | "" | list of colon-delimited pairs, e.g. foo:bar,abc:xyz | medium |
whitelist | Fields to include. If specified, only these fields will be used. | list | "" | medium |
org.apache.kafka.connect.transforms.MaskField
Mask specified fields with a valid null value for the field type (i.e. 0, false, empty string, and so on).Use the concrete transformation type designed for the record key (org.apache.kafka.connect.transforms.MaskField$Key
) or value (org.apache.kafka.connect.transforms.MaskField$Value
).
Name | Description | Type | Default | Valid Values | Importance |
---|---|---|---|---|---|
fields | Names of fields to mask. | list | non-empty list | high |
org.apache.kafka.connect.transforms.ValueToKey
Replace the record key with a new key formed from a subset of fields in the record value.Name | Description | Type | Default | Valid Values | Importance |
---|---|---|---|---|---|
fields | Field names on the record value to extract as the record key. | list | non-empty list | high |
org.apache.kafka.connect.transforms.HoistField
Wrap data using the specified field name in a Struct when schema present, or a Map in the case of schemaless data.Use the concrete transformation type designed for the record key (org.apache.kafka.connect.transforms.HoistField$Key
) or value (org.apache.kafka.connect.transforms.HoistField$Value
).
Name | Description | Type | Default | Valid Values | Importance |
---|---|---|---|---|---|
field | Field name for the single field that will be created in the resulting Struct or Map. | string | medium |
org.apache.kafka.connect.transforms.ExtractField
Extract the specified field from a Struct when schema present, or a Map in the case of schemaless data. Any null values are passed through unmodified.Use the concrete transformation type designed for the record key (org.apache.kafka.connect.transforms.ExtractField$Key
) or value (org.apache.kafka.connect.transforms.ExtractField$Value
).
Name | Description | Type | Default | Valid Values | Importance |
---|---|---|---|---|---|
field | Field name to extract. | string | medium |
org.apache.kafka.connect.transforms.SetSchemaMetadata
Set the schema name, version or both on the record's key (org.apache.kafka.connect.transforms.SetSchemaMetadata$Key
) or value (org.apache.kafka.connect.transforms.SetSchemaMetadata$Value
) schema.Name | Description | Type | Default | Valid Values | Importance |
---|---|---|---|---|---|
schema.name | Schema name to set. | string | null | high | |
schema.version | Schema version to set. | int | null | high |
org.apache.kafka.connect.transforms.TimestampRouter
Update the record's topic field as a function of the original topic value and the record timestamp.This is mainly useful for sink connectors, since the topic field is often used to determine the equivalent entity name in the destination system(e.g. database table or search index name).
Name | Description | Type | Default | Valid Values | Importance |
---|---|---|---|---|---|
timestamp.format | Format string for the timestamp that is compatible with java.text.SimpleDateFormat . | string | yyyyMMdd | high | |
topic.format | Format string which can contain ${topic} and ${timestamp} as placeholders for the topic and timestamp, respectively. | string | ${topic}-${timestamp} | high |
org.apache.kafka.connect.transforms.RegexRouter
Update the record topic using the configured regular expression and replacement string.Under the hood, the regex is compiled to a java.util.regex.Pattern
. If the pattern matches the input topic, java.util.regex.Matcher#replaceFirst()
is used with the replacement string to obtain the new topic.
Name | Description | Type | Default | Valid Values | Importance |
---|---|---|---|---|---|
regex | Regular expression to use for matching. | string | valid regex | high | |
replacement | Replacement string. | string | high |
org.apache.kafka.connect.transforms.Flatten
Flatten a nested data structure, generating names for each field by concatenating the field names at each level with a configurable delimiter character. Applies to Struct when schema present, or a Map in the case of schemaless data. The default delimiter is '.'.Use the concrete transformation type designed for the record key (org.apache.kafka.connect.transforms.Flatten$Key
) or value (org.apache.kafka.connect.transforms.Flatten$Value
).
Name | Description | Type | Default | Valid Values | Importance |
---|---|---|---|---|---|
delimiter | Delimiter to insert between field names from the input record when generating field names for the output record | string | . | medium |
org.apache.kafka.connect.transforms.Cast
Cast fields or the entire key or value to a specific type, e.g. to force an integer field to a smaller width. Only simple primitive types are supported -- integers, floats, boolean, and string.Use the concrete transformation type designed for the record key (org.apache.kafka.connect.transforms.Cast$Key
) or value (org.apache.kafka.connect.transforms.Cast$Value
).
Name | Description | Type | Default | Valid Values | Importance |
---|---|---|---|---|---|
spec | List of fields and the type to cast them to of the form field1:type,field2:type to cast fields of Maps or Structs. A single type to cast the entire value. Valid types are int8, int16, int32, int64, float32, float64, boolean, and string. | list | list of colon-delimited pairs, e.g. foo:bar,abc:xyz | high |
org.apache.kafka.connect.transforms.TimestampConverter
Convert timestamps between different formats such as Unix epoch, strings, and Connect Date/Timestamp types.Applies to individual fields or to the entire value.Use the concrete transformation type designed for the record key (org.apache.kafka.connect.transforms.TimestampConverter$Key
) or value (org.apache.kafka.connect.transforms.TimestampConverter$Value
).
Name | Description | Type | Default | Valid Values | Importance |
---|---|---|---|---|---|
target.type | The desired timestamp representation: string, unix, Date, Time, or Timestamp | string | high | ||
field | The field containing the timestamp, or empty if the entire value is a timestamp | string | "" | high | |
format | A SimpleDateFormat-compatible format for the timestamp. Used to generate the output when type=string or used to parse the input if the input is a string. | string | "" | medium |
REST API
由于Kafka Connect 旨在作为服务运行,它还提供了一个用于管理 connectors 的REST API。sssss默认情况下,此服务在端口8083上运行。以下是当前支持的功能:
GET /connectors
- return a list of active connectorsPOST /connectors
- create a new connector; the request body should be a JSON object containing a stringname
field and an objectconfig
field with the connector configuration parametersGET /connectors/{name}
- get information about a specific connectorGET /connectors/{name}/config
- get the configuration parameters for a specific connectorPUT /connectors/{name}/config
- update the configuration parameters for a specific connectorGET /connectors/{name}/status
- get current status of the connector, including if it is running, failed, paused, etc., which worker it is assigned to, error information if it has failed, and the state of all its tasksGET /connectors/{name}/tasks
- get a list of tasks currently running for a connectorGET /connectors/{name}/tasks/{taskid}/status
- get current status of the task, including if it is running, failed, paused, etc., which worker it is assigned to, and error information if it has failedPUT /connectors/{name}/pause
- pause the connector and its tasks, which stops message processing until the connector is resumedPUT /connectors/{name}/resume
- resume a paused connector (or do nothing if the connector is not paused)POST /connectors/{name}/restart
- restart a connector (typically because it has failed)POST /connectors/{name}/tasks/{taskId}/restart
- restart an individual task (typically because it has failed)DELETE /connectors/{name}
- delete a connector, halting all tasks and deleting its configuration
Kafka Connect还提供用于获取有关 connector plugin sss信息的REST API:
GET /connector-plugins
- return a list of connector plugins installed in the Kafka Connect cluster. Note that the API only checks for connectors on the worker that handles the request, which means you may see inconsistent results, especially during a rolling upgrade if you add new connector jarsPUT /connector-plugins/{connector-type}/config/validate
- validate the provided configuration values against the configuration definition. This API performs per config validation, returns suggested values and error messages during validation.