我想要在SpringBoot中采用一种与业务代码解耦合的方式,来实现数据的变更记录,记录的内容是新数据,如果是更新操作还得有旧数据内容。
经过调研发现,使用Canal来监听MySQL的binlog变化可以实现这个需求,可是在监听到变化后需要马上保存变更记录,除非再做一些逻辑处理,于是我又结合了RabbitMQ来处理保存变更记录的操作。
启动MySQL环境,并开启binlog
启动Canal环境,为其创建一个MySQL账号,然后以Slave的形式连接MySQL
Canal服务模式设为TCP,用Java编写客户端代码,监听MySQL的binlog修改
Canal服务模式设为RabbitMQ,启动RabbitMQ环境,配置Canal和RabbitMQ的连接,用消息队列去接收binlog修改事件
环境搭建基于docker-compose:
version:"3"services: mysql: network_mode: mynetwork container_name: mymysql ports:-3306:3306restart: always volumes:-/etc/localtime:/etc/localtime-/home/mycontainers/mymysql/data:/data-/home/mycontainers/mymysql/mysql:/var/lib/mysql-/home/mycontainers/mymysql/conf:/etc/mysql environment:-MYSQL_ROOT_PASSWORD=root command:--character-set-server=utf8mb4--collation-server=utf8mb4_unicode_ci--log-bin=/var/lib/mysql/mysql-bin--server-id=1--binlog-format=ROW--expire_logs_days=7--max_binlog_size=500Mimage: mysql:5.7.20rabbitmq: container_name: myrabbit ports:-15672:15672-5672:5672restart: always volumes:-/etc/localtime:/etc/localtime-/home/mycontainers/myrabbit/rabbitmq:/var/lib/rabbitmq network_mode: mynetwork environment:-RABBITMQ_DEFAULT_USER=admin-RABBITMQ_DEFAULT_PASS=123456image: rabbitmq:3.8-management canal-server: container_name: canal-server restart: always ports:-11110:11110-11111:11111-11112:11112volumes:-/home/mycontainers/canal-server/conf/canal.properties:/home/admin/canal-server/conf/canal.properties-/home/mycontainers/canal-server/conf/instance.properties:/home/admin/canal-server/conf/example/instance.properties-/home/mycontainers/canal-server/logs:/home/admin/canal-server/logs network_mode: mynetwork depends_on:-mysql-rabbitmq# - canal-adminimage: canal/canal-server:v1.1.5
我们需要修改下Canal环境的配置文件:canal.properties和instance.properties,映射Canal中的以下两个路径:
/home/admin/canal-server/conf/canal.properties配置文件中,canal.destinations意思是server上部署的instance列表,
/home/admin/canal-server/conf/example/instance.properties这里的/example是指instance即实例名,要和上面canal.properties内instance配置对应,canal会为实例创建对应的文件夹,一个Client对应一个实例
以下是我们需要准备的两个配置文件具体内容:
########################################################## common argument ############################################################### tcp bind ipcanal.ip=# register ip to zookeepercanal.register.ip=canal.port=11111canal.metrics.pull.port=11112# canal instance user/passwd# canal.user = canal# canal.passwd = E3619321C1A937C46A0D8BD1DAC39F93B27D4458# canal admin config# canal.admin.manager = canal-admin:8089# canal.admin.port = 11110# canal.admin.user = admin# canal.admin.passwd = 6BB4837EB74329105EE4568DDA7DC67ED2CA2AD9# admin auto register 自动注册# canal.admin.register.auto = true# 集群名,单机则不写# canal.admin.register.cluster =# Canal Server 名字# canal.admin.register.name = canal-admincanal.zkServers=# flush data to zkcanal.zookeeper.flush.period=1000canal.withoutNetty=false# tcp, kafka, rocketMQ, rabbitMQ, pulsarMQcanal.serverMode=tcp# flush meta cursor/parse position to filecanal.file.data.dir=${canal.conf.dir} canal.file.flush.period=1000## memory store RingBuffer size, should be Math.pow(2,n)canal.instance.memory.buffer.size=16384## memory store RingBuffer used memory unit size , default 1kbcanal.instance.memory.buffer.memunit=1024## meory store gets mode used MEMSIZE or ITEMSIZEcanal.instance.memory.batch.mode=MEMSIZE canal.instance.memory.rawEntry=true## detecing configcanal.instance.detecting.enable=false#canal.instance.detecting.sql = insert into retl.xdual values(1,now()) on duplicate key update x=now()canal.instance.detecting.sql=select1canal.instance.detecting.interval.time=3canal.instance.detecting.retry.threshold=3canal.instance.detecting.heartbeatHaEnable=false# support maximum transaction size, more than the size of the transaction will be cut into multiple transactions deliverycanal.instance.transaction.size=1024# mysql fallback connected to new master should fallback timescanal.instance.fallbackIntervalInSeconds=60# network configcanal.instance.network.receiveBufferSize=16384canal.instance.network.sendBufferSize=16384canal.instance.network.soTimeout=30# binlog filter configcanal.instance.filter.druid.ddl=truecanal.instance.filter.query.dcl=falsecanal.instance.filter.query.dml=falsecanal.instance.filter.query.ddl=falsecanal.instance.filter.table.error=falsecanal.instance.filter.rows=falsecanal.instance.filter.transaction.entry=falsecanal.instance.filter.dml.insert=falsecanal.instance.filter.dml.update=falsecanal.instance.filter.dml.delete=false# binlog format/image checkcanal.instance.binlog.format=ROW,STATEMENT,MIXED canal.instance.binlog.image=FULL,MINIMAL,NOBLOB# binlog ddl isolationcanal.instance.get.ddl.isolation=false# parallel parser configcanal.instance.parser.parallel=true## concurrent thread number, default 60% available processors, suggest not to exceed Runtime.getRuntime().availableProcessors()canal.instance.parser.parallelThreadSize=16## disruptor ringbuffer size, must be power of 2canal.instance.parser.parallelBufferSize=256# table meta tsdb infocanal.instance.tsdb.enable=truecanal.instance.tsdb.dir=${canal.file.data.dir:../conf}/${canal.instance.destination:} canal.instance.tsdb.url=jdbc:h2:${canal.instance.tsdb.dir}/h2;CACHE_SIZE=1000;MODE=MYSQL;canal.instance.tsdb.dbUsername=canal canal.instance.tsdb.dbPassword=canal# dump snapshot interval, default 24 hourcanal.instance.tsdb.snapshot.interval=24# purge snapshot expire , default 360 hour(15 days)canal.instance.tsdb.snapshot.expire=360########################################################## destinations ##############################################################canal.destinations=canal-exchange# conf root dircanal.conf.dir=../conf# auto scan instance dir add/remove and start/stop instancecanal.auto.scan=truecanal.auto.scan.interval=5# set this value to 'true' means that when binlog pos not found, skip to latest.# WARN: pls keep 'false' in production env, or if you know what you want.canal.auto.reset.latest.pos.mode=falsecanal.instance.tsdb.spring.xml=classpath:spring/tsdb/h2-tsdb.xml#canal.instance.tsdb.spring.xml = classpath:spring/tsdb/mysql-tsdb.xmlcanal.instance.global.mode=spring canal.instance.global.lazy=falsecanal.instance.global.manager.address=${canal.admin.manager}#canal.instance.global.spring.xml = classpath:spring/memory-instance.xmlcanal.instance.global.spring.xml=classpath:spring/file-instance.xml#canal.instance.global.spring.xml = classpath:spring/default-instance.xml########################################################### MQ Properties ################################################################ aliyun ak/sk , support rds/mqcanal.aliyun.accessKey=canal.aliyun.secretKey=canal.aliyun.uid=canal.mq.flatMessage=truecanal.mq.canalBatchSize=50canal.mq.canalGetTimeout=100# Set this value to "cloud", if you want open message trace feature in aliyun.canal.mq.accessChannel=localcanal.mq.database.hash=truecanal.mq.send.thread.size=30canal.mq.build.thread.size=8########################################################### RabbitMQ ###############################################################rabbitmq.host=myrabbit rabbitmq.virtual.host=/rabbitmq.exchange=canal-exchange rabbitmq.username=admin rabbitmq.password=RabbitMQ密码 rabbitmq.deliveryMode=
此时canal.serverMode = tcp,即TCP直连,我们先开启这个服务,然后手写Java客户端代码去连接它,等下再改为RabbitMQ。
通过注释可以看到,canal支持的服务模式有:tcp, kafka, rocketMQ, rabbitMQ, pulsarMQ,即主流的消息队列都支持。
################################################### mysql serverId , v1.0.26+ will autoGen#canal.instance.mysql.slaveId=123# enable gtid use true/falsecanal.instance.gtidon=false# position infocanal.instance.master.address=mymysql:3306canal.instance.master.journal.name=canal.instance.master.position=canal.instance.master.timestamp=canal.instance.master.gtid=# rds oss binlogcanal.instance.rds.accesskey=canal.instance.rds.secretkey=canal.instance.rds.instanceId=# table meta tsdb infocanal.instance.tsdb.enable=true#canal.instance.tsdb.url=jdbc:mysql://127.0.0.1:3306/canal_tsdb#canal.instance.tsdb.dbUsername=canal#canal.instance.tsdb.dbPassword=canal#canal.instance.standby.address =#canal.instance.standby.journal.name =#canal.instance.standby.position =#canal.instance.standby.timestamp =#canal.instance.standby.gtid=# username/passwordcanal.instance.dbUsername=canal canal.instance.dbPassword=canal canal.instance.connectionCharset=UTF-8# enable druid Decrypt database passwordcanal.instance.enableDruid=false#canal.instance.pwdPublicKey=MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBALK4BUxdDltRRE5/zXpVEVPUgunvscYFtEip3pmLlhrWpacX7y7GCMo2/JM6LeHmiiNdH1FWgGCpUfircSwlWKUCAwEAAQ==# table regexcanal.instance.filter.regex=.*\..*# table black regexcanal.instance.filter.black.regex=mysql\.slave_.*# table field filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)#canal.instance.filter.field=test1.t_product:id/subject/keywords,test2.t_company:id/name/contact/ch# table field black filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)#canal.instance.filter.black.field=test1.t_product:subject/product_image,test2.t_company:id/name/contact/ch# mq configcanal.mq.topic=canal-routing-key# dynamic topic route by schema or table regex#canal.mq.dynamicTopic=mytest1.user,topic2:mytest2\..*,.*\..*canal.mq.partition=0
把这两个配置文件映射好,再次提醒,注意实例的路径名,默认是:/example/instance.properties
我们需要修改这个实例配置文件,去连接MySQL,确保以下的配置正确:
canal.instance.master.address=mymysql:3306canal.instance.dbUsername=canal canal.instance.dbPassword=canal
mymysql是同为docker容器的MySQL环境,端口3306是指内部端口。
这里多说明一下,docker端口配置时假设为:13306:3306,那么容器对外的端口就是13306,内部是3306,在本示例中,MySQL和Canal都是容器环境,所以Canal连接MySQL需要满足以下条件:
处于同一网段(docker-compose.yml中的mynetwork)
访问内部端口(即3306,而非13306)
dbUsername和dbPassword为MySQL账号密码,为了开发方便可以使用root/root,但是我仍建议自行创建用户并分配访问权限:
# 进入docker中的mysql容器dockerexec-it mymysql bash# 进入mysql指令模式mysql-uroot-proot# 编写MySQL语句并执行>...
-- 选择mysqlusemysql;-- 创建canal用户,账密:canal/canalcreateuser'canal'@'%'identifiedby'canal';-- 分配权限,以及允许所有主机登录该用户grantSELECT,INSERT,UPDATE,DELETE,REPLICATIONSLAVE,REPLICATIONCLIENTon*.*to'canal'@'%';-- 刷新一下使其生效flushprivileges;-- 附带一个删除用户指令dropuser'canal'@'%';
用navicat或者shell去登录canal这个用户,可以访问即创建成功
Maven依赖:
<canal.version>1.1.5</canal.version><!--canal--><dependency><groupId>com.alibaba.otter</groupId><artifactId>canal.client</artifactId><version>${canal.version}</version></dependency><dependency><groupId>com.alibaba.otter</groupId><artifactId>canal.protocol</artifactId><version>${canal.version}</version></dependency>复制代码
新增组件并启动:
importcom.alibaba.otter.canal.client.CanalConnector;importcom.alibaba.otter.canal.client.CanalConnectors;importcom.alibaba.otter.canal.protocol.CanalEntry;importcom.alibaba.otter.canal.protocol.Message;importorg.springframework.boot.CommandLineRunner;importorg.springframework.stereotype.Component;importjava.net.InetSocketAddress;importjava.util.List;@Componentpublicclass CanalClient { private final staticintBATCH_SIZE=1000;publicvoid run(){// 创建链接CanalConnector connector=CanalConnectors.newSingleConnector(new InetSocketAddress("localhost",11111),"canal-exchange","canal","canal");try {//打开连接connector.connect();//订阅数据库表,全部表connector.subscribe(".*\..*");//回滚到未进行ack的地方,下次fetch的时候,可以从最后一个没有ack的地方开始拿connector.rollback();while(true){// 获取指定数量的数据Message message=connector.getWithoutAck(BATCH_SIZE);//获取批量IDlong batchId=message.getId();//获取批量的数量intsize=message.getEntries().size();//如果没有数据if(batchId==-1||size==0){ try {//线程休眠2秒Thread.sleep(2000);} catch(InterruptedException e){ e.printStackTrace();} }else{//如果有数据,处理数据printEntry(message.getEntries());}//进行 batch id 的确认。确认之后,小于等于此 batchId 的 Message 都会被确认。connector.ack(batchId);} } catch(Exception e){ e.printStackTrace();} finally { connector.disconnect();} }/** * 打印canal server解析binlog获得的实体类信息 */private static void printEntry(List<CanalEntry.Entry>entrys){for(CanalEntry.Entry entry : entrys){if(entry.getEntryType()==CanalEntry.EntryType.TRANSACTIONBEGIN||entry.getEntryType()==CanalEntry.EntryType.TRANSACTIONEND){//开启/关闭事务的实体类型,跳过continue;}//RowChange对象,包含了一行数据变化的所有特征//比如isDdl 是否是ddl变更操作 sql 具体的ddl sql beforeColumns afterColumns 变更前后的数据字段等等CanalEntry.RowChange rowChage;try { rowChage=CanalEntry.RowChange.parseFrom(entry.getStoreValue());} catch(Exception e){ throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:"+entry.toString(),e);}//获取操作类型:insert/update/delete类型CanalEntry.EventType eventType=rowChage.getEventType();//打印Header信息System.out.println(String.format("================》; binlog[%s:%s] , name[%s,%s] , eventType : %s",entry.getHeader().getLogfileName(),entry.getHeader().getLogfileOffset(),entry.getHeader().getSchemaName(),entry.getHeader().getTableName(),eventType));//判断是否是DDL语句if(rowChage.getIsDdl()){ System.out.println("================》;isDdl: true,sql:"+rowChage.getSql());}//获取RowChange对象里的每一行数据,打印出来for(CanalEntry.RowData rowData : rowChage.getRowDatasList()){//如果是删除语句if(eventType==CanalEntry.EventType.DELETE){ printColumn(rowData.getBeforeColumnsList());//如果是新增语句}elseif(eventType==CanalEntry.EventType.INSERT){ printColumn(rowData.getAfterColumnsList());//如果是更新的语句}else{//变更前的数据System.out.println("------->; before");printColumn(rowData.getBeforeColumnsList());//变更后的数据System.out.println("------->; after");printColumn(rowData.getAfterColumnsList());} } } } private static void printColumn(List<CanalEntry.Column>columns){for(CanalEntry.Columncolumn:columns){ System.out.println(column.getName()+" : "+column.getValue()+" update="+column.getUpdated());} } }
启动类Application:
@SpringBootApplicationpublicclass BaseApplication implements CommandLineRunner {@Autowiredprivate CanalClient canalClient;@Overridepublicvoid run(String...args)throws Exception { canalClient.run();} }
启动程序,此时新增或修改数据库中的数据,我们就能从客户端中监听到
不过我建议监听的信息放到消息队列中,在空闲的时候去处理,所以直接配置Canal整合RabbitMQ更好。
修改canal.properties中的serverMode:
canal.serverMode=rabbitMQ
修改instance.properties中的topic:
canal.mq.topic=canal-routing-key
然后找到关于RabbitMQ的配置:
########################################################### RabbitMQ ################################################################ 连接rabbit,写IP,因为同个网络下,所以可以写容器名rabbitmq.host=myrabbit rabbitmq.virtual.host=/# 交换器名称,等等我们要去手动创建rabbitmq.exchange=canal-exchange# 账密rabbitmq.username=admin rabbitmq.password=123456# 暂不支持指定端口,使用的是默认的5762,好在在本示例中适用
重新启动容器,进入RabbitMQ管理页面创建exchange交换器和队列queue:
新建exchange,命名为:canal-exchange
新建queue,命名为:canal-queue
绑定exchange和queue,routing-key设置为:canal-routing-key,这里对应上面instance.properties的canal.mq.topic
顺带一提,上面这段可以忽略,因为在SpringBoot的RabbitMQ配置中,会自动创建交换器exchange和队列queue,不过手动创建的话,可以在忽略SpringBoot的基础上,直接在RabbitMQ的管理页面上看到修改记录的消息。
依赖:
<amqp.version>2.3.4.RELEASE</amqp.version><!--消息队列--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId><version>${amqp.version}</version></dependency>
application.yml:
spring: rabbitmq:# host: myserverhosthost:192.168.0.108port:5672username: admin password: RabbitMQ密码# 消息确认配置项# 确认消息已发送到交换机(Exchange)publisher-confirm-type: correlated# 确认消息已发送到队列(Queue)publisher-returns:true
RabbitMQ配置类:
@Configurationpublicclass RabbitConfig {@BeanpublicRabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory){ RabbitTemplate template=new RabbitTemplate();template.setConnectionFactory(connectionFactory);template.setMessageConverter(new Jackson2JsonMessageConverter());returntemplate;}/** * template.setMessageConverter(new Jackson2JsonMessageConverter()); * 这段和上面这行代码解决RabbitListener循环报错的问题 */@BeanpublicSimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory){ SimpleRabbitListenerContainerFactory factory=new SimpleRabbitListenerContainerFactory();factory.setConnectionFactory(connectionFactory);factory.setMessageConverter(new Jackson2JsonMessageConverter());returnfactory;} }
Canal消息生产者:
publicstatic final String CanalQueue="canal-queue";publicstatic final String CanalExchange="canal-exchange";publicstatic final String CanalRouting="canal-routing-key";复制代码/** * Canal消息提供者,canal-server生产的消息通过RabbitMQ消息队列发送 */@Configurationpublicclass CanalProvider {/** * 队列 */@BeanpublicQueue canalQueue(){/** * durable:是否持久化,默认false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在;暂存队列:当前连接有效 * exclusive:默认为false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable * autoDelete:是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除 */returnnew Queue(RabbitConstant.CanalQueue,true);}/** * 交换机,这里使用直连交换机 */@BeanDirectExchange canalExchange(){returnnew DirectExchange(RabbitConstant.CanalExchange,true,false);}/** * 绑定交换机和队列,并设置匹配键 */@BeanBinding bindingCanal(){returnBindingBuilder.bind(canalQueue()).to(canalExchange()).with(RabbitConstant.CanalRouting);} }
Canal消息消费者:
/** * Canal消息消费者 */@Component@RabbitListener(queues=RabbitConstant.CanalQueue)publicclass CanalComsumer { private final SysBackupService sysBackupService;publicCanalComsumer(SysBackupService sysBackupService){ this.sysBackupService=sysBackupService;}@RabbitHandlerpublicvoid process(Map<String,Object>msg){ System.out.println("收到canal消息:"+msg);booleanisDdl=(boolean)msg.get("isDdl");// 不处理DDL事件if(isDdl){return;}// TiCDC的id,应该具有唯一性,先保存再说inttid=(int)msg.get("id");// TiCDC生成该消息的时间戳,13位毫秒级long ts=(long)msg.get("ts");// 数据库Stringdatabase=(String)msg.get("database");// 表Stringtable=(String)msg.get("table");// 类型:INSERT/UPDATE/DELETEStringtype=(String)msg.get("type");// 每一列的数据值List<?>data=(List<?>)msg.get("data");// 仅当type为UPDATE时才有值,记录每一列的名字和UPDATE之前的数据值List<?>old=(List<?>)msg.get("old");// 跳过sys_backup,防止无限循环if("sys_backup".equalsIgnoreCase(table)){return;}// 只处理指定类型if(!"INSERT".equalsIgnoreCase(type)&&!"UPDATE".equalsIgnoreCase(type)&&!"DELETE".equalsIgnoreCase(type)){return;} } }
测试一下,修改MySQL中的一条消息,Canal就会发送信息到RabbitMQ,我们就能从监听的RabbitMQ队列中得到该条消息。