1 环境准备
修改config.sh配置文件,添加以下内容
export HADOOP_CLASSPATH=`hadoop classpath`
修改flink-conf.yml配置文件,添加以下内容
env.java.opts.all: -Dfile.encoding=UTF-8 //解决CDC采集MySQL中文乱码问题
${FLINK_HOME}/lib下添加相应的Jar包
flink-doris-connector-1.17-1.5.1.jar
flink-sql-connector-mysql-cdc-2.4.2.jar
2 同步步骤
2.1 整库同步
/opt/module/flink-1.17.2/bin/flink run-application -t yarn-application \
-Dexecution.checkpointing.interval=10s \
-Dparallelism.default=1 \
-c org.apache.doris.flink.tools.cdc.CdcTools \
lib/flink-doris-connector-1.17-1.5.1.jar \
mysql-sync-database \
--database doris_crud_test \
--mysql-conf hostname=hadoop102 \
--mysql-conf username=root \
--mysql-conf password=123456 \
--mysql-conf database-name=edu2023 \
--sink-conf fenodes=hadoop102:8029 \
--sink-conf username=root \
--sink-conf password=123456 \
--sink-conf jdbc-url=jdbc:mysql://hadoop102:9030 \
--sink-conf sink.label-prefix=label2 \
--table-conf replication_num=2;
2.2 单表同步
/opt/module/flink-1.17.2/bin/flink run-application -t yarn-application \
-Dexecution.checkpointing.interval=10s \
-Dparallelism.default=1 \
-c org.apache.doris.flink.tools.cdc.CdcTools \
lib/flink-doris-connector-1.17-1.5.1.jar \
mysql-sync-database \
--database doris_crud_test \
--mysql-conf hostname=hadoop102 \
--mysql-conf username=root \
--mysql-conf password=123456 \
--mysql-conf database-name=test \
--including-tables "cart_info" \
--sink-conf fenodes=hadoop102:8029 \
--sink-conf username=root \
--sink-conf password=123456 \
--sink-conf jdbc-url=jdbc:mysql://hadoop102:9030 \
--sink-conf sink.label-prefix=label3 \
--table-conf replication_num=2;
3 性能测试
查询:1700W数据,3秒
写入:1700W数据,20分钟
修改:
删除:
补充
a.多数据源往同一张表写数据
注意:多数据源不会同时处理同一条数据(修改/删除),各数据源的机构代码不同
Doris:使用官方提供的包,必须保证源表字段名和Doris中的字段名一模一样,包括大小写(底层是JSON),实际业务场景字段名大小写不一样,需要修改官方的flink-doris-connector包
Paimon:官方提供的包就,支持多数据源往同一张表写数据