博客
关于我
Flink CDC的使用
阅读量:479 次
发布时间:2019-03-06

本文共 4602 字,大约阅读时间需要 15 分钟。

MySQL数据准备与Flink CDC实时数据捕获

MySQL数据准备

在开始使用Flink CDC捕获MySQL变更数据之前,需要先准备好MySQL数据库。以下是具体的操作步骤:

  • 创建并使用数据库
  • create database if not exists test;
    use test;
    1. 创建学生表
    2. drop table if exists stu;
      create table stu (
      id int primary key auto_increment,
      name varchar(100),
      age int
      );
      1. 插入初始数据
      2. insert into stu(name, age) values("张三", 18);
        insert into stu(name, age) values("李四", 20);
        insert into stu(name, age) values("王五", 21);

        注意事项:确保表中有主键,否则Flink CDC可能无法正常工作。

        开启MySQL binlog

        为了实现Flink CDC对MySQL数据库的变更数据实时捕获,需要先开启MySQL的二进制日志。

      3. 修改MySQL配置文件
      4. sudo vim /etc/my.cnf
        1. 在配置文件中添加以下内容:
        2. server-id = 1
          log-bin=mysql-bin
          binlog_format=row
          binlog-do-db=test

          注意事项:启用binlog的数据库需要根据实际情况调整设置,确保二进制日志文件路径和权限正确。

          1. 重启MySQL服务
          2. sudo systemctl restart mysqld

            Flink代码开发

            本节将介绍如何使用Flink CDC从MySQL数据库实时捕获增删改数据。

            依赖管理

          3. 添加Flink CDC依赖
          4. com.ververica
            flink-connector-mysql-cdc
            2.4.0
            1. 其他Flink依赖(如 flink-table-api-java-bridge 等)
            2. org.apache.flink
              flink-table-api-java-bridge
              ${flink.version}
              import com.ververica.cdc.connectors.mysql.source.MySqlSource;
              import com.ververica.cdc.connectors.mysql.table.StartupOptions;
              import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
              import org.apache.flink.api.common.eventtime.WatermarkStrategy;
              import org.apache.flink.streaming.api.datastream.DataStreamSource;
              import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
              public class FlinkCDCDemo {
              public static void main(String[] args) throws Exception {
              StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
              env.setParallelism(4);
              MySqlSource
              mySqlSource = MySqlSource.builder()
              .hostname("node4")
              .port(3306)
              .username("root")
              .password("000000")
              .databaseList("test")
              .tableList("test.stu")
              .deserializer(new JsonDebeziumDeserializationSchema())
              .startupOptions(StartupOptions.initial())
              .build();
              DataStreamSource
              dataStreamSource = env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "mysql_source")
              .setParallelism(1);
              dataStreamSource.print();
              env.execute();
              }
              }

              注意事项:确保MySQL的binlog已经启用,并且Flink运行环境的版本与依赖版本匹配。

              测试与验证

              添加新数据

              执行以下SQL语句:

              mysql> insert into stu(name, age) values("赵六", 23);

              输出示例

              {
              "before": null,
              "after": {
              "id": 4,
              "name": "赵六",
              "age": 23
              },
              "source": {
              "version": "1.9.7.Final",
              "connector": "mysql",
              "name": "mysql_binlog_source",
              "ts_ms": 1719831654000,
              "snapshot": "false",
              "db": "test",
              "sequence": null,
              "table": "stu",
              "server_id": 1,
              "gtid": null,
              "file": "mysql-bin.000001",
              "pos": 2300,
              "row": 0,
              "thread": 13,
              "query": null
              },
              "op": "c",
              "ts_ms": 1719831654692,
              "transaction": null
              }

              修改数据

              执行以下SQL语句:

              mysql> update stu set name="zl", age=19 where name="赵六";

              输出示例

              {
              "before": {
              "id": 4,
              "name": "赵六",
              "age": 23
              },
              "after": {
              "id": 4,
              "name": "zl",
              "age": 19
              },
              "source": {
              "version": "1.9.7.Final",
              "connector": "mysql",
              "name": "mysql_binlog_source",
              "ts_ms": 1719831987000,
              "snapshot": "false",
              "db": "test",
              "sequence": null,
              "table": "stu",
              "server_id": 1,
              "gtid": null,
              "file": "mysql-bin.000001",
              "pos": 2604,
              "row": 0,
              "thread": 13,
              "query": null
              },
              "op": "u",
              "ts_ms": 1719831987238,
              "transaction": null
              }

              删除数据

              执行以下SQL语句:

              mysql> delete from stu where id=4;

              输出示例

              {
              "before": {
              "id": 4,
              "name": "zl",
              "age": 19
              },
              "after": null,
              "source": {
              "version": "1.9.7.Final",
              "connector": "mysql",
              "name": "mysql_binlog_source",
              "ts_ms": 1719832151000,
              "snapshot": "false",
              "db": "test",
              "sequence": null,
              "table": "stu",
              "server_id": 1,
              "gtid": null,
              "file": "mysql-bin.000001",
              "pos": 2913,
              "row": 0,
              "thread": 13,
              "query": null
              },
              "op": "d",
              "ts_ms": 1719832151198,
              "transaction": null
              }

              注意事项:通过IDEA控制台可以实时查看Flink程序的输出日志,确保数据捕获和处理过程中没有错误发生。

    转载地址:http://iaqbz.baihongyu.com/

    你可能感兴趣的文章
    MYSQL语句。
    查看>>
    MySQL调优是程序员拿高薪的必备技能?
    查看>>
    MySQL调大sort_buffer_size,并发量一大,查询排序为啥又会变慢
    查看>>
    Mysql账号权限查询(grants)
    查看>>
    mysql转达梦7_达梦7的子查询分解示例说明
    查看>>
    MYSQL输入密码后闪退的解决方法
    查看>>
    MySQL迁移到达梦:如何轻松、高质量完成迁移任务
    查看>>
    mysql返回的时间和实际数据存储的时间有误差(java+mysql)
    查看>>
    mysql还有哪些自带的函数呢?别到处找了,看这个就够了。
    查看>>
    Mysql进入数据库
    查看>>
    mysql进阶 with-as 性能调优
    查看>>
    mysql进阶-查询优化-慢查询日志
    查看>>
    wargame narnia writeup
    查看>>
    MySQL进阶篇SQL优化(InnoDB锁问题排查与解决)
    查看>>
    Mysql进阶索引篇03——2个新特性,11+7条设计原则教你创建索引
    查看>>
    mysql远程连接设置
    查看>>
    MySql连接出现1251Client does not support authentication protocol requested by server解决方法
    查看>>
    Mysql连接时报时区错误
    查看>>
    MySql连接时提示:unknown Mysql server host
    查看>>
    MySQL连环炮,你扛得住嘛?
    查看>>