Canal教程(1)集群模式部署与topic配置

TangLu 未命名 2023-02-20 9961 0

一、Canal工作原理简析

Canal基于MySQL Binlog工作,Canal会将自己伪装成MySQL的从库去向主库进行数据同步请求,然后将获取到的Binlog解析成特定格式推送给消费队列进行增量数据的订阅和消费。通常应用场景是抓取业务表的新增变化数据,用于制作实时统计等。


二、Canal集群部署前的准备工作

1、安装JDK 1.8


2、安装Zookeeper与Kafka集群

在初期时候Canal只能使用TCP模式,客户端代码需要由开发人员自行完成。从Canal 1.1开始支持将接受到的Binlog传递给后端消息队列,目前支持的MQ系统有kafka和rocketmq。Zookeeper与Kafka的部署可参考本站文章——《Kafka集群部署使用与扩容教程


3、MySQL主库相关配置

· MySQL需要配置Binlog为row格式

[mysqld]
log-bin=mysql-bin # 开启 binlog
binlog-format=ROW # 选择 ROW 模式
server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复


· 为Canal创建用户并授权,因为要模拟从库,所以就需要从库相关权限

grant select,replication slave,replication client on *.* to  'canal' @ '192.168.94.10'  identified by '123456'  #IP需要修改为Canal节点所在的信息


三、Canal集群模式部署

1、下载Canal

Canal的安装包可以通过Github进行下载(https://github.com/alibaba/canal)。从1.1.4版本引入了canal-admin,可通过WebUI对Canal进行管理

wget https://github.com/alibaba/canal/releases/download/canal-1.1.4/canal.deployer-1.1.4.tar.gz
tar zxvf canal.deployer-1.1.4.tar.gz -C /usr/local/canal


2、Canal全局配置

Canal的全局配置文件为conf/canal.properties,详细配置可以参考官方文档(https://github.com/alibaba/canal/wiki/AdminGuide),这里只给出一些核心参数,比如Canal工作模式、zk与kafka集群信息,其他保持默认

canal.ip = 172.20.1.199            #canal绑定的本地IP
canal.register.ip = 172.20.1.199       #canal注册到外部zookeeper、admin的ip(针对docker的外部可见ip)
canal.port = 11111              #canal端口信息
canal.destinations = example         #当前server上部署的instance列表,当定义了canal.destinations后,需要在canal.conf.dir对应的目录下建立同名目录。比如设置了canal.destinations = example1,example2就需要创建example1和example2两个目录,每个目录里各自有一份instance.properties
canal.auto.scan = true            #开启instance自动扫描,当canal.conf.dir目录下的instance配置变化会自动进行新增、删除、reload配置
canal.auto.scan.interval = 5         #自动扫描间隔
canal.serverMode = kafka           #默认tcp模式是让代码来对数据进行订阅和消费,这里修改为kafka,让客户端直接从kafka消费数据
canal.mq.flatMessage = false          #是否将binlog转为JSON格式,不转换的话则使用原生格式
canal.mq.servers = 192.168.94.10:9092,192.168.94.11:9092,192.168.94.12:9092   #kafka集群地址
canal.zkServers = 192.168.94.10:2181,192.168.94.11:2181,192.168.94.12:2181    #zk集群地址
canal.instance.global.spring.xml = classpath:spring/default-instance.xml     #默认file-instance不支持HA持久化,如果要做高可用的话修改为default
canal.mq.compressionType = lz4         #开启消息体压缩
canal.mq.maxRequestSize = 10485760       #消息体大小,默认和KAFKA一致为1M,这里调到了10M
canal.mq.batchSize = 16384                        #flatMessage开启下可以调整该值来控制kafka批量消息大小,默认是16384 (16K)。如果实际binlog event对象值大于该值就不会有批量发送的作用,调大该值可以提高吞吐量,该值不能超过broker端的最大发送字节


3、Canal实例配置

Canal实例配置文件为conf/$instance/instance.properties,该配置文件最主要的就是需要过滤的库表以及topic信息。每一个$instance目录都代表一个实例,用于配置不同的数据库信息。Canal HA模式也是通过instance name进行区分,在zk中如果发现存在相同的instance会被自动归为一个集群中,所以在创建实例目录的时候一定要注意

#################################################
canal.instance.mysql.slaveId=2131  #ID不可与主库和其他从库重复
canal.instance.gtidon=false

# position info  需要同步的数据库信息,如果不指定任何位点信息默认从当前数据库位点开始同步
canal.instance.master.address=192.168.10.56:3306
canal.instance.master.journal.name=       #指定binlog文件名
canal.instance.master.position=
canal.instance.master.timestamp=
canal.instance.master.gtid=

canal.instance.tsdb.enable=true
canal.instance.dbUsername=canal   #canal在数据库中的用户
canal.instance.dbPassword=123456  #canal用户的密码
canal.instance.connectionCharset = UTF-8
canal.instance.enableDruid=false
canal.instance.filter.regex=employees\\..*,linuxe\\..*      #需要解析的库表,支持正则匹配,如canal\\..*代表canal库下所有表。这里代表linuxe和employees库下所有表,具体规则可见下面表格
canal.instance.filter.black.regex=

# mq config
canal.mq.topic=linuxe  #mq里的topic名,不能带下划线,如果配置了动态topic,那么不符合动态正则的将放这个topic里
canal.mq.dynamicTopic=linuxe,employees  #根据库表动态创建topic,支持多种配置语法,多个配置之间使用逗号或分号分隔,这里为2个库创建各自的topic
canal.mq.partition=0

canal.instance.filter.regex需要抓取的库表信息,以逗号分隔,转义符需要双斜杠(\\)

· .*或者.*\\..*:所有库表

· canal\\..*:canal库下所有表 

· canal\\.canal.*:canal库下以canal打头的表

· canal\\.test1:canal库下的一张表

· canal\\..*,mysql.test1,mysql.test2:多个规则组合使用

canal.mq.dynamicTopicmq里的动态topic规则,支持配置格式:schema 或 schema.table,多个配置之间使用逗号或分号分隔。建议MQ开启自动创建topic的能力

· test\\.test:指定匹配的单表,发送到以test_test为名字的topic上

· .*\\..*:匹配所有库下所有表,并且发送到各自表名的topic上

· test:指定匹配对应的库,该库下的所有表都会发送到库名的topic上

· test\\..*:指定匹配的表达式,针对匹配的表会发送到各自表名的topic上

· test,test1\\.test1:指定多个表达式,会将test库的表都发送到test的topic上,test1\\.test1的表发送到对应的test1_test1 topic上,其余的表发送到默认的canal.mq.topic值



4、启动Canal

bin/startup.sh  #启动后需要观察2个日志,logs/example/example.log和logs/canal/canal.log


5、启动每Canal节点后,通过zk客户端可以查看当前正在工作的Canal节点。当Canal主节点故障时,客户端能通过zk获取到另外节点继续工作

zkcli.sh
get /otter/canal/destinations/example/running


6、推荐配置Canal Admin来使用图形化界面管理Canal


评论