下一代高性能、分布式、海量数据集成框架
首先安装并设置Java(Java 8 或 11,其他高于 Java 8 的版本理论上也可以使用)JAVA_HOME。
准备 MySQL、Kafka 环境。
https://seatunnel.apache.org/docs/2.3.1/about/
参见官网地址
https://seatunnel.apache.org/zh-CN/docs/2.3.1/start-v2/locally/deployment
https://dlcdn.apache.org/incubator/seatunnel/2.3.1/apache-seatunnel-incubating-2.3.1-bin.tar.gz
https://repo.maven.apache.org/maven2/org/apache/seatunnel/connector-cdc-mysql/2.3.1/connector-cdc-mysql-2.3.1.jar
https://repo.maven.apache.org/maven2/org/apache/seatunnel/connector-jdbc/2.3.1/connector-jdbc-2.3.1.jar
https://repo.maven.apache.org/maven2/org/apache/seatunnel/connector-kafka/2.3.1/connector-kafka-2.3.1.jar
将以上的jar包 放到 apache-seatunnel-incubating-2.3.1/connectors/seatunnel/ 路径下
https://cdn.mysql.com//Downloads/Connector-J/mysql-connector-j-8.0.33.tar.gz
下载后里边的 mysql-connector-j-8.0.33.jar 放到 apache-seatunnel-incubating-2.3.1/plugins/jdbc/lib 路径下
该文件决定了seatunnel启动后数据输入、处理、输出的方式和逻辑。下面是一个MySQL-CDC 到 Kafka 配置文件的例子。
配置文件及路径
apache-seatunnel-incubating-2.3.1/config/mysql2kafka.config.template
env {
execution.parallelism = 1
job.mode = "STREAMING"
checkpoint.interval = 15000
}
source {
MySQL-CDC {
username = "username" # 修改成自己的
password = "password" # 修改成自己的
hostname = 192.168.xxx.xxx # 修改成自己的
base-url="jdbc:mysql://192.168.xxx.xxx:3306/schema" # 修改成自己的
# 启动时同步历史数据,然后同步增量数据。
startup.mode=INITIAL
catalog {
factory=MySQL
}
table-names=[
"schema.tablename" # 修改成自己的 schema.tablename
]
# compatible_debezium_json options
format = compatible_debezium_json
debezium = {
# include schema into kafka message
key.converter.schemas.enable = false
value.converter.schemas.enable = false
# include ddl
include.schema.changes = true
# topic prefix
database.server.name = "mysql_cdc_1"
}
# compatible_debezium_json fixed schema
schema = {
fields = {
topic = string
key = string
value = string
}
}
}
}
sink {
Kafka {
## topic = "mysql_cdc_1" ,会自动创建 mysql_cdc_1.schema.tablename
bootstrap.servers = "192.168.x.xxx:9092"
# compatible_debezium_json options
format = compatible_debezium_json
}
}
apache-seatunnel-incubating-2.3.1/config/seatunnel.yaml
seatunnel:
engine:
backup-count: 1
queue-type: blockingqueue
print-execution-info-interval: 60
print-job-metrics-info-interval: 60
slot-service:
dynamic-slot: true
checkpoint:
interval: 10000
timeout: 60000
max-concurrent: 5
tolerable-failure: 2
storage:
type: localfile
max-retained: 3
plugin-config:
storage.type: localfile
fs.defaultFS: file:///tmp/ # Ensure that the directory has written permission
sh bin/seatunnel.sh --config config/mysql2kafka.config.template -e local &
为了方便查看安装个可视化工具 kafka-map
https://github.com/dushixiang/kafka-map/releases/download/v1.3.1/kafka-map.tar.gz
至此简单使用就完成了。