Apache SeaTunnel 同步MySQL 数据 到 Apache Kafka (SeaTunnel Engine)

洪博涛
2023-12-01

Apache SeaTunnel 简介

下一代高性能、分布式、海量数据集成框架

核心特性

  • 组件丰富:内置丰富插件,支持各种数据产品方便快捷的传输和集成数据
  • 高扩展性:基于模块化和插件化设计,支持热插拔,带来更好的扩展性
  • 简单易用:特有的架构设计下,使得开发配置更简单,几乎零代码,无使用成本
  • 成熟稳定:经历多家企业,大规模生产环境使用和海量数据的洗礼,稳定健壮

一、准备工作

首先安装并设置Java(Java 8 或 11,其他高于 Java 8 的版本理论上也可以使用)JAVA_HOME。
准备 MySQL、Kafka 环境。

官网地址

https://seatunnel.apache.org/docs/2.3.1/about/

软件安装及安装连接器插件

参见官网地址
https://seatunnel.apache.org/zh-CN/docs/2.3.1/start-v2/locally/deployment

Apache SeaTunnel 下载

https://dlcdn.apache.org/incubator/seatunnel/2.3.1/apache-seatunnel-incubating-2.3.1-bin.tar.gz

连接器插件下载

https://repo.maven.apache.org/maven2/org/apache/seatunnel/connector-cdc-mysql/2.3.1/connector-cdc-mysql-2.3.1.jar

https://repo.maven.apache.org/maven2/org/apache/seatunnel/connector-jdbc/2.3.1/connector-jdbc-2.3.1.jar

https://repo.maven.apache.org/maven2/org/apache/seatunnel/connector-kafka/2.3.1/connector-kafka-2.3.1.jar

将以上的jar包 放到 apache-seatunnel-incubating-2.3.1/connectors/seatunnel/ 路径下

MySQL 驱动下载

https://cdn.mysql.com//Downloads/Connector-J/mysql-connector-j-8.0.33.tar.gz

下载后里边的 mysql-connector-j-8.0.33.jar 放到 apache-seatunnel-incubating-2.3.1/plugins/jdbc/lib 路径下

编辑配置作业配置文件

该文件决定了seatunnel启动后数据输入、处理、输出的方式和逻辑。下面是一个MySQL-CDC 到 Kafka 配置文件的例子。

配置文件及路径
apache-seatunnel-incubating-2.3.1/config/mysql2kafka.config.template

env {
  execution.parallelism = 1
  job.mode = "STREAMING"
  checkpoint.interval = 15000
}

source {
  MySQL-CDC {
    username = "username"  # 修改成自己的
    password = "password"  # 修改成自己的
    hostname = 192.168.xxx.xxx  # 修改成自己的
    base-url="jdbc:mysql://192.168.xxx.xxx:3306/schema"  # 修改成自己的
    # 启动时同步历史数据,然后同步增量数据。
    startup.mode=INITIAL
    catalog {
        factory=MySQL
    }
    table-names=[
        "schema.tablename"  # 修改成自己的 schema.tablename
    ]

    # compatible_debezium_json options
    format = compatible_debezium_json
    debezium = {
        # include schema into kafka message
        key.converter.schemas.enable = false
        value.converter.schemas.enable = false
        # include ddl
        include.schema.changes = true
        # topic prefix
        database.server.name =  "mysql_cdc_1"
    }
    # compatible_debezium_json fixed schema
    schema = {
        fields = {
            topic = string
            key = string
            value = string
        }
    }
  }
}

sink {
  Kafka {
    ## topic = "mysql_cdc_1" ,会自动创建 mysql_cdc_1.schema.tablename
    bootstrap.servers = "192.168.x.xxx:9092"
    # compatible_debezium_json options
    format = compatible_debezium_json
  }
}

检查点储存配置修改(测试方便改为本地)

apache-seatunnel-incubating-2.3.1/config/seatunnel.yaml

seatunnel:
  engine:
    backup-count: 1
    queue-type: blockingqueue
    print-execution-info-interval: 60
    print-job-metrics-info-interval: 60
    slot-service:
      dynamic-slot: true
    checkpoint:
      interval: 10000
      timeout: 60000
      max-concurrent: 5
      tolerable-failure: 2
      storage:
        type: localfile
        max-retained: 3
        plugin-config:
          storage.type: localfile
          fs.defaultFS: file:///tmp/ # Ensure that the directory has written permission

二、运行及效果查看

sh bin/seatunnel.sh --config config/mysql2kafka.config.template -e local &

为了方便查看安装个可视化工具 kafka-map

https://github.com/dushixiang/kafka-map/releases/download/v1.3.1/kafka-map.tar.gz

至此简单使用就完成了。

 类似资料: