当前位置: 首页 > 工具软件 > zero-log > 使用案例 >

Flink监听bin-log日志

宁弘亮
2023-12-01

前置条件

一、开启Mysql的binlog日志。

修改mysql配置文件

在[mysqld]下添加如下:

log_bin=mysql-bin
binlog-format=ROW
server-id=1

完整配置如下:

[mysql]
default-character-set=utf8


[mysqld]
character-set-server=utf8
default-storage-engine=INNODB

max_allowed_packet = 2048M

group_concat_max_len = 1024M

sql_mode='STRICT_TRANS_TABLES,NO_ZERO_IN_DATE,NO_ZERO_DATE,ERROR_FOR_DIVISION_BY_ZERO,NO_AUTO_CREATE_USER,NO_ENGINE_SUBSTITUTION'


log_bin=mysql-bin
binlog-format=ROW
server-id=1

[client]
port = 3306
host = localhost
user = root
password = root


 

二、创建mysql数据库表

1、创建数据库demo

2、在demo数据库新建张student表。

CREATE TABLE `student` (
    `id` INT ( 11 ) NOT NULL,
    `name` VARCHAR ( 128 ) DEFAULT NULL,
    `age` INT ( 11 ) DEFAULT NULL,
PRIMARY KEY ( `id` ) 
) ENGINE = INNODB DEFAULT CHARSET = utf8mb4

创建java项目

一、POM依赖。

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
        <flink.version>1.13.5</flink.version>
        <debezium.version>1.5.4.Final</debezium.version>
        <geometry.version>2.2.0</geometry.version>
        <java.version>8</java.version>
        <scala.binary.version>2.12</scala.binary.version>
        <maven.compiler.source>${java.version}</maven.compiler.source>
        <maven.compiler.target>${java.version}</maven.compiler.target>
        <slf4j.version>1.7.25</slf4j.version>
        <log4j.version>2.16.0</log4j.version>
        <spotless.version>2.4.2</spotless.version>
        <!-- Enforce single fork execution due to heavy mini cluster use in the tests -->
        <flink.forkCount>1</flink.forkCount>
        <flink.reuseForks>true</flink.reuseForks>
        <log4j.configuration>log4j2-test.properties</log4j.configuration>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    </properties>

    <dependencies>
        <dependency>
            <groupId>com.ververica</groupId>
            <!-- add the dependency matching your database -->
            <artifactId>flink-connector-mysql-cdc</artifactId>
            <!-- the dependency is available only for stable releases. -->
            <version>2.1.1</version>
        </dependency>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.11</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.75</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
            <type>test-jar</type>
        </dependency>

        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-simple</artifactId>
            <version>${slf4j.version}</version>
            <scope>compile</scope>
        </dependency>
    </dependencies>

二、java代码:

public class FlinkMain {

    public static void main(String[] args) throws Exception {
        MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
                .hostname("localhost")
                .port(3306)
                .databaseList("demo") // set captured database
                .tableList("demo.student") // set captured table
                .username("root")
                .password("root")
                .deserializer(new JsonDebeziumDeserializationSchema()) // converts SourceRecord to JSON String
                .build();

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // enable checkpoint
        env.enableCheckpointing(3000);

        env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source")
                // set 4 parallel source tasks
                .setParallelism(4)
                .print().setParallelism(1); // use parallelism 1 for sink to keep message ordering

        env.execute("Print MySQL Snapshot + Binlog");
    }
}

执行sql查看效果:

INSERT INTO `student`(`id`, `name`, `age`) VALUES (1, '张三', 28);

控制台打印结果:

{
"before": null, 
    "after": {
        "id": 1, 
        "name": "张三", 
        "age": 28
    }, 
    "source": {
        "version": "1.5.4.Final", 
        "connector": "mysql", 
        "name": "mysql_binlog_source", 
        "ts_ms": 1649733618000, 
        "snapshot": "false", 
        "db": "demo", 
        "sequence": null, 
        "table": "student", 
        "server_id": 1, 
        "gtid": null, 
        "file": "mysql-bin.000001", 
        "pos": 2162, 
        "row": 0, 
        "thread": null, 
        "query": null
    }, 
    "op": "c", 
    "ts_ms": 1649733618127, 
    "transaction": null
}

 

 

 

 类似资料: