使用docker部署mysql并开启binlog的方法
作者:leiline
在验证flink-cdc-mysql时,需要部署一个开启了binlog的mysql服务。cdc文档中有一个demo可以尝试部署,但是我在验证的时候发现可能存在一些问题,所以就尝试自己部署一个mysql服务。与cdc demo中类似,使用docker部署是最快的方案。
网上整理了一些资料,发现思路大概是:
- 部署mysql docker服务
 - 编辑mysql 配置文件
 - 启动mysql 服务。
 
首先根据需要,编写docker-compose文件。
创建一个mysql目录,然后执行 vim docker-compose.yml 命令。
将下面内容粘贴进去,适当进行修改。
version: "3.7"
services:
  mysql:
    image: mysql:5.7.28
    container_name: mysql-binlog2
    command: --default-authentication-plugin=mysql_native_password
    restart: always
    environment:
      # root用户密码
      MYSQL_ROOT_PASSWORD: 123456
      TZ: Asia/Shanghai
    ports:
      - 3306:3306
    volumes:
      - /home/leiline/cdc/data/mysql/master/data:/var/lib/mysql
      - /home/leiline/cdc/data/mysql/master/log:/var/log/mysql
      - /home/leiline/cdc/data/mysql/master/conf:/etc/mysql
保存文件后,退出。
这里需要在服务器中创建目录,分别用来保存mysql的数据,日志和配置信息。
mkdir -p /home/leiline/cdc/data/mysql/master/data mkdir -p /home/leiline/cdc/data/mysql/master/log mkdir -p /home/leiline/cdc/data/mysql/master/conf
其中,我们要把配置信息放在conf目录下。mysql的配置信息保存在一个名称为my.cnf的文件中,将下列信息粘贴到my.cnf文件中。
## 局域网唯一 server_id=1 ## 指定不需要同步的数据库名称 binlog-ignore-db=master ## 开启二进制日志功能 log-bin=/var/lib/mysql/mysql-bin ## 设置二进制日志使用内存大小(事务) binlog_cache_size=1M ## 设置使用的二进制日志格式(mixed,statement,row) binlog_format=ROW ## 二进制日志过期清理时间。默认值为0,表示不自动清理。 expire_logs_days=7
我们在这里设置的Binlog二进制格式为ROW。
最后,启动docker-compose。执行命令 sudo docker-compose up -d docker-compose 会自动加载docker-compose.yml 文件中的内容,拉取 mysql:5.7.28 镜像,最后将创建mysql-binlog2 容器。
执行 docker ps 可以查看当前运行中的容器有哪些,以及一些概要信息。
sudo docker ps CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES c6e03b8a0656 mysql:5.7.28 "docker-entrypoint.s…" 2 hours ago Up About an hour 0.0.0.0:3306->3306/tcp, :::3306->3306/tcp, 33060/tcp mysql-binlog2
进入mysql中进行操作
到目前为止,mysql服务已经启动,我们可以进入mysql中查看相关配置信息,并进行数据操作。执行命令 sudo docker-compose exec mysql mysql -uroot -p123456 就可以进入到mysql服务中。
执行命令 show global variables like "%binlog%"; 和 show global variables like "%log_bin%"; 可以查询相关配置信息。
由截图可以看到,binlog_format的值为ROW,log_bin的值为ON。这样表示binlog已经开启,且二进制日志类型为ROW。
接下来我们就可以进行数据操作了。
插入mysql数据
我们还是借助flink cdc demo中的例子,将建表语句和insert语句在mysql中执行。
CREATE DATABASE mydb;
USE mydb;
CREATE TABLE products (
  id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
  name VARCHAR(255) NOT NULL,
  description VARCHAR(512)
);
ALTER TABLE products AUTO_INCREMENT = 101;
INSERT INTO products
VALUES (default,"scooter","Small 2-wheel scooter"),
       (default,"car battery","12V car battery"),
       (default,"12-pack drill bits","12-pack of drill bits with sizes ranging from #40 to #3"),
       (default,"hammer","12oz carpenter's hammer"),
       (default,"hammer","14oz carpenter's hammer"),
       (default,"hammer","16oz carpenter's hammer"),
       (default,"rocks","box of assorted rocks"),
       (default,"jacket","water resistent black wind breaker"),
       (default,"spare tire","24 inch spare tire");
CREATE TABLE orders (
  order_id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
  order_date DATETIME NOT NULL,
  customer_name VARCHAR(255) NOT NULL,
  price DECIMAL(10, 5) NOT NULL,
  product_id INTEGER NOT NULL,
  order_status BOOLEAN NOT NULL -- Whether order has been placed
) AUTO_INCREMENT = 10001;
INSERT INTO orders
VALUES (default, '2020-07-30 10:08:22', 'Jark', 50.50, 102, false),
       (default, '2020-07-30 10:11:09', 'Sally', 15.00, 105, false),
       (default, '2020-07-30 12:00:30', 'Edward', 25.25, 106, false);
flink mysql cdc 抽数
最后,我们要借助flink mysql cdc 进行binlog数据的抽取。
首先准备好sql语句:
SET execution.checkpointing.interval = 3s;
CREATE TABLE products (
    id INT,
    name STRING,
    description STRING,
    PRIMARY KEY (id) NOT ENFORCED
  ) WITH (
    'connector' = 'mysql-cdc',
    'hostname' = 'localhost',
    'port' = '3306',
    'username' = 'root',
    'database-name' = 'mydb',
    'table-name' = 'products'
  );
CREATE TABLE printer (
    id INT,
    name STRING,
    description STRING
  ) WITH (
    'connector' = 'print'
  );
INSERT INTO printer SELECT * FROM products;sql语句的内容很简单,就是将products表的日志数据抽取,然后打印出来。
因为我们只是一个demo测试,所以我们使用flink local模式执行SQL。与flink cdc demo一样,首先执行./bin/start-cluster.sh 然后进入到 sql-client中,./sql-client.sh embedded 这个时候我们进入到了flink sql客户端中,将上述SQL复制进sql-client中,sql-client会自动将SQL进行解析,执行,提交到flink集群中运行。
如果一切正常的话,我们可以打开地址 http://localhost:8081/ 可以看到flink dashboard。在taskmanager 的out中,可以看到输出内容。
通过对Mysql数据的修改,我们测试了增删改的操作,CDC都可以正常的进行数据采集。而将表进行truncate后,CDC是不会有任何结果的。
总结
本文首先部署了一个docker mysql服务,并且开启binlog,然后通过flink cdc 将mysql binlog日志抽取,并打印出来。
flink cdc 目前版本为2.3.0,对于mysql比较稳定的支持,后面我们会在生产上进行测试,并尝试满足业务需求。
到此这篇关于使用docker部署mysql并开启binlog的文章就介绍到这了,更多相关docker部署mysql开启binlog内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!
