Canal教程(1)集群模式部署与topic配置
一、Canal工作原理简析
Canal基于MySQL Binlog工作,Canal会将自己伪装成MySQL的从库去向主库进行数据同步请求,然后将获取到的Binlog解析成特定格式推送给消费队列进行增量数据的订阅和消费。通常应用场景是抓取业务表的新增变化数据,用于制作实时统计等。
二、Canal集群部署前的准备工作
1、安装JDK 1.8
2、安装Zookeeper与Kafka集群
在初期时候Canal只能使用TCP模式,客户端代码需要由开发人员自行完成。从Canal 1.1开始支持将接受到的Binlog传递给后端消息队列,目前支持的MQ系统有kafka和rocketmq。Zookeeper与Kafka的部署可参考本站文章——《Kafka集群部署使用与扩容教程》
3、MySQL主库相关配置
· MySQL需要配置Binlog为row格式
[mysqld] log-bin=mysql-bin # 开启 binlog binlog-format=ROW # 选择 ROW 模式 server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复
· 为Canal创建用户并授权,因为要模拟从库,所以就需要从库相关权限
grant select,replication slave,replication client on *.* to 'canal' @ '192.168.94.10' identified by '123456' #IP需要修改为Canal节点所在的信息
三、Canal集群模式部署
1、下载Canal
Canal的安装包可以通过Github进行下载(https://github.com/alibaba/canal)。从1.1.4版本引入了canal-admin,可通过WebUI对Canal进行管理
wget https://github.com/alibaba/canal/releases/download/canal-1.1.4/canal.deployer-1.1.4.tar.gz tar zxvf canal.deployer-1.1.4.tar.gz -C /usr/local/canal
2、Canal全局配置
Canal的全局配置文件为conf/canal.properties,详细配置可以参考官方文档(https://github.com/alibaba/canal/wiki/AdminGuide),这里只给出一些核心参数,比如Canal工作模式、zk与kafka集群信息,其他保持默认
canal.ip = 172.20.1.199 #canal绑定的本地IP canal.register.ip = 172.20.1.199 #canal注册到外部zookeeper、admin的ip(针对docker的外部可见ip) canal.port = 11111 #canal端口信息 canal.destinations = example #当前server上部署的instance列表,当定义了canal.destinations后,需要在canal.conf.dir对应的目录下建立同名目录。比如设置了canal.destinations = example1,example2就需要创建example1和example2两个目录,每个目录里各自有一份instance.properties canal.auto.scan = true #开启instance自动扫描,当canal.conf.dir目录下的instance配置变化会自动进行新增、删除、reload配置 canal.auto.scan.interval = 5 #自动扫描间隔 canal.serverMode = kafka #默认tcp模式是让代码来对数据进行订阅和消费,这里修改为kafka,让客户端直接从kafka消费数据 canal.mq.flatMessage = false #是否将binlog转为JSON格式,不转换的话则使用原生格式 canal.mq.servers = 192.168.94.10:9092,192.168.94.11:9092,192.168.94.12:9092 #kafka集群地址 canal.zkServers = 192.168.94.10:2181,192.168.94.11:2181,192.168.94.12:2181 #zk集群地址 canal.instance.global.spring.xml = classpath:spring/default-instance.xml #默认file-instance不支持HA持久化,如果要做高可用的话修改为default canal.mq.compressionType = lz4 #开启消息体压缩 canal.mq.maxRequestSize = 10485760 #消息体大小,默认和KAFKA一致为1M,这里调到了10M canal.mq.batchSize = 16384 #flatMessage开启下可以调整该值来控制kafka批量消息大小,默认是16384 (16K)。如果实际binlog event对象值大于该值就不会有批量发送的作用,调大该值可以提高吞吐量,该值不能超过broker端的最大发送字节
3、Canal实例配置
Canal实例配置文件为conf/$instance/instance.properties,该配置文件最主要的就是需要过滤的库表以及topic信息。每一个$instance目录都代表一个实例,用于配置不同的数据库信息。Canal HA模式也是通过instance name进行区分,在zk中如果发现存在相同的instance会被自动归为一个集群中,所以在创建实例目录的时候一定要注意
################################################# canal.instance.mysql.slaveId=2131 #ID不可与主库和其他从库重复 canal.instance.gtidon=false # position info 需要同步的数据库信息,如果不指定任何位点信息默认从当前数据库位点开始同步 canal.instance.master.address=192.168.10.56:3306 canal.instance.master.journal.name= #指定binlog文件名 canal.instance.master.position= canal.instance.master.timestamp= canal.instance.master.gtid= canal.instance.tsdb.enable=true canal.instance.dbUsername=canal #canal在数据库中的用户 canal.instance.dbPassword=123456 #canal用户的密码 canal.instance.connectionCharset = UTF-8 canal.instance.enableDruid=false canal.instance.filter.regex=employees\\..*,linuxe\\..* #需要解析的库表,支持正则匹配,如canal\\..*代表canal库下所有表。这里代表linuxe和employees库下所有表,具体规则可见下面表格 canal.instance.filter.black.regex= # mq config canal.mq.topic=linuxe #mq里的topic名,不能带下划线,如果配置了动态topic,那么不符合动态正则的将放这个topic里 canal.mq.dynamicTopic=linuxe,employees #根据库表动态创建topic,支持多种配置语法,多个配置之间使用逗号或分号分隔,这里为2个库创建各自的topic canal.mq.partition=0
canal.instance.filter.regex | 需要抓取的库表信息,以逗号分隔,转义符需要双斜杠(\\) | · .*或者.*\\..*:所有库表 · canal\\..*:canal库下所有表 · canal\\.canal.*:canal库下以canal打头的表 · canal\\.test1:canal库下的一张表 · canal\\..*,mysql.test1,mysql.test2:多个规则组合使用 |
canal.mq.dynamicTopic | mq里的动态topic规则,支持配置格式:schema 或 schema.table,多个配置之间使用逗号或分号分隔。建议MQ开启自动创建topic的能力 | · test\\.test:指定匹配的单表,发送到以test_test为名字的topic上 · .*\\..*:匹配所有库下所有表,并且发送到各自表名的topic上 · test:指定匹配对应的库,该库下的所有表都会发送到库名的topic上 · test\\..*:指定匹配的表达式,针对匹配的表会发送到各自表名的topic上 · test,test1\\.test1:指定多个表达式,会将test库的表都发送到test的topic上,test1\\.test1的表发送到对应的test1_test1 topic上,其余的表发送到默认的canal.mq.topic值 |
4、启动Canal
bin/startup.sh #启动后需要观察2个日志,logs/example/example.log和logs/canal/canal.log
5、启动每Canal节点后,通过zk客户端可以查看当前正在工作的Canal节点。当Canal主节点故障时,客户端能通过zk获取到另外节点继续工作
zkcli.sh get /otter/canal/destinations/example/running
6、推荐配置Canal Admin来使用图形化界面管理Canal
评论