SpringBoot整合Canal、RabbitMQ监听数据变更~

经过调研发现,使用Canal来监听MySQL的binlog变化可以实现这个需求,可是在监听到变化后需
首页 新闻资讯 行业资讯 SpringBoot整合Canal、RabbitMQ监听数据变更~

需求

我想要在SpringBoot中采用一种与业务代码解耦合的方式,来实现数据的变更记录,记录的内容是新数据,如果是更新操作还得有旧数据内容。

经过调研发现,使用Canal来监听MySQL的binlog变化可以实现这个需求,可是在监听到变化后需要马上保存变更记录,除非再做一些逻辑处理,于是我又结合了RabbitMQ来处理保存变更记录的操作。

步骤

  • 启动MySQL环境,并开启binlog

  • 启动Canal环境,为其创建一个MySQL账号,然后以Slave的形式连接MySQL

  • Canal服务模式设为TCP,用Java编写客户端代码,监听MySQL的binlog修改

  • Canal服务模式设为RabbitMQ,启动RabbitMQ环境,配置Canal和RabbitMQ的连接,用消息队列去接收binlog修改事件

环境搭建

环境搭建基于docker-compose:

version:"3"services:
    mysql:
        network_mode: mynetwork
        container_name: mymysql
        ports:-3306:3306restart: always
        volumes:-/etc/localtime:/etc/localtime-/home/mycontainers/mymysql/data:/data-/home/mycontainers/mymysql/mysql:/var/lib/mysql-/home/mycontainers/mymysql/conf:/etc/mysql
        environment:-MYSQL_ROOT_PASSWORD=root
        command:--character-set-server=utf8mb4--collation-server=utf8mb4_unicode_ci--log-bin=/var/lib/mysql/mysql-bin--server-id=1--binlog-format=ROW--expire_logs_days=7--max_binlog_size=500Mimage: mysql:5.7.20rabbitmq:   
        container_name: myrabbit
        ports:-15672:15672-5672:5672restart: always
        volumes:-/etc/localtime:/etc/localtime-/home/mycontainers/myrabbit/rabbitmq:/var/lib/rabbitmq
        network_mode: mynetwork
        environment:-RABBITMQ_DEFAULT_USER=admin-RABBITMQ_DEFAULT_PASS=123456image: rabbitmq:3.8-management
    canal-server:
        container_name: canal-server
        restart: always
        ports:-11110:11110-11111:11111-11112:11112volumes:-/home/mycontainers/canal-server/conf/canal.properties:/home/admin/canal-server/conf/canal.properties-/home/mycontainers/canal-server/conf/instance.properties:/home/admin/canal-server/conf/example/instance.properties-/home/mycontainers/canal-server/logs:/home/admin/canal-server/logs
        network_mode: mynetwork
        depends_on:-mysql-rabbitmq# - canal-adminimage: canal/canal-server:v1.1.5

我们需要修改下Canal环境的配置文件:canal.properties和instance.properties,映射Canal中的以下两个路径:

  • /home/admin/canal-server/conf/canal.properties配置文件中,canal.destinations意思是server上部署的instance列表,

  • /home/admin/canal-server/conf/example/instance.properties这里的/example是指instance即实例名,要和上面canal.properties内instance配置对应,canal会为实例创建对应的文件夹,一个Client对应一个实例

以下是我们需要准备的两个配置文件具体内容:

canal.properties

##########################################################     common argument   ############################################################### tcp bind ipcanal.ip=# register ip to zookeepercanal.register.ip=canal.port=11111canal.metrics.pull.port=11112# canal instance user/passwd# canal.user = canal# canal.passwd = E3619321C1A937C46A0D8BD1DAC39F93B27D4458# canal admin config# canal.admin.manager = canal-admin:8089# canal.admin.port = 11110# canal.admin.user = admin# canal.admin.passwd = 6BB4837EB74329105EE4568DDA7DC67ED2CA2AD9# admin auto register 自动注册# canal.admin.register.auto = true# 集群名,单机则不写# canal.admin.register.cluster =# Canal Server 名字# canal.admin.register.name = canal-admincanal.zkServers=# flush data to zkcanal.zookeeper.flush.period=1000canal.withoutNetty=false# tcp, kafka, rocketMQ, rabbitMQ, pulsarMQcanal.serverMode=tcp# flush meta cursor/parse position to filecanal.file.data.dir=${canal.conf.dir}
canal.file.flush.period=1000## memory store RingBuffer size, should be Math.pow(2,n)canal.instance.memory.buffer.size=16384## memory store RingBuffer used memory unit size , default 1kbcanal.instance.memory.buffer.memunit=1024## meory store gets mode used MEMSIZE or ITEMSIZEcanal.instance.memory.batch.mode=MEMSIZE
canal.instance.memory.rawEntry=true## detecing configcanal.instance.detecting.enable=false#canal.instance.detecting.sql = insert into retl.xdual values(1,now()) on duplicate key update x=now()canal.instance.detecting.sql=select1canal.instance.detecting.interval.time=3canal.instance.detecting.retry.threshold=3canal.instance.detecting.heartbeatHaEnable=false# support maximum transaction size, more than the size of the transaction will be cut into multiple transactions deliverycanal.instance.transaction.size=1024# mysql fallback connected to new master should fallback timescanal.instance.fallbackIntervalInSeconds=60# network configcanal.instance.network.receiveBufferSize=16384canal.instance.network.sendBufferSize=16384canal.instance.network.soTimeout=30# binlog filter configcanal.instance.filter.druid.ddl=truecanal.instance.filter.query.dcl=falsecanal.instance.filter.query.dml=falsecanal.instance.filter.query.ddl=falsecanal.instance.filter.table.error=falsecanal.instance.filter.rows=falsecanal.instance.filter.transaction.entry=falsecanal.instance.filter.dml.insert=falsecanal.instance.filter.dml.update=falsecanal.instance.filter.dml.delete=false# binlog format/image checkcanal.instance.binlog.format=ROW,STATEMENT,MIXED 
canal.instance.binlog.image=FULL,MINIMAL,NOBLOB# binlog ddl isolationcanal.instance.get.ddl.isolation=false# parallel parser configcanal.instance.parser.parallel=true## concurrent thread number, default 60% available processors, suggest not to exceed Runtime.getRuntime().availableProcessors()canal.instance.parser.parallelThreadSize=16## disruptor ringbuffer size, must be power of 2canal.instance.parser.parallelBufferSize=256# table meta tsdb infocanal.instance.tsdb.enable=truecanal.instance.tsdb.dir=${canal.file.data.dir:../conf}/${canal.instance.destination:}
canal.instance.tsdb.url=jdbc:h2:${canal.instance.tsdb.dir}/h2;CACHE_SIZE=1000;MODE=MYSQL;canal.instance.tsdb.dbUsername=canal
canal.instance.tsdb.dbPassword=canal# dump snapshot interval, default 24 hourcanal.instance.tsdb.snapshot.interval=24# purge snapshot expire , default 360 hour(15 days)canal.instance.tsdb.snapshot.expire=360##########################################################     destinations    ##############################################################canal.destinations=canal-exchange# conf root dircanal.conf.dir=../conf# auto scan instance dir add/remove and start/stop instancecanal.auto.scan=truecanal.auto.scan.interval=5# set this value to 'true' means that when binlog pos not found, skip to latest.# WARN: pls keep 'false' in production env, or if you know what you want.canal.auto.reset.latest.pos.mode=falsecanal.instance.tsdb.spring.xml=classpath:spring/tsdb/h2-tsdb.xml#canal.instance.tsdb.spring.xml = classpath:spring/tsdb/mysql-tsdb.xmlcanal.instance.global.mode=spring
canal.instance.global.lazy=falsecanal.instance.global.manager.address=${canal.admin.manager}#canal.instance.global.spring.xml = classpath:spring/memory-instance.xmlcanal.instance.global.spring.xml=classpath:spring/file-instance.xml#canal.instance.global.spring.xml = classpath:spring/default-instance.xml###########################################################         MQ Properties      ################################################################ aliyun ak/sk , support rds/mqcanal.aliyun.accessKey=canal.aliyun.secretKey=canal.aliyun.uid=canal.mq.flatMessage=truecanal.mq.canalBatchSize=50canal.mq.canalGetTimeout=100# Set this value to "cloud", if you want open message trace feature in aliyun.canal.mq.accessChannel=localcanal.mq.database.hash=truecanal.mq.send.thread.size=30canal.mq.build.thread.size=8###########################################################         RabbitMQ       ###############################################################rabbitmq.host=myrabbit
rabbitmq.virtual.host=/rabbitmq.exchange=canal-exchange
rabbitmq.username=admin
rabbitmq.password=RabbitMQ密码
rabbitmq.deliveryMode=

此时canal.serverMode = tcp,即TCP直连,我们先开启这个服务,然后手写Java客户端代码去连接它,等下再改为RabbitMQ。

通过注释可以看到,canal支持的服务模式有:tcp, kafka, rocketMQ, rabbitMQ, pulsarMQ,即主流的消息队列都支持。

instance.properties

################################################### mysql serverId , v1.0.26+ will autoGen#canal.instance.mysql.slaveId=123# enable gtid use true/falsecanal.instance.gtidon=false# position infocanal.instance.master.address=mymysql:3306canal.instance.master.journal.name=canal.instance.master.position=canal.instance.master.timestamp=canal.instance.master.gtid=# rds oss binlogcanal.instance.rds.accesskey=canal.instance.rds.secretkey=canal.instance.rds.instanceId=# table meta tsdb infocanal.instance.tsdb.enable=true#canal.instance.tsdb.url=jdbc:mysql://127.0.0.1:3306/canal_tsdb#canal.instance.tsdb.dbUsername=canal#canal.instance.tsdb.dbPassword=canal#canal.instance.standby.address =#canal.instance.standby.journal.name =#canal.instance.standby.position =#canal.instance.standby.timestamp =#canal.instance.standby.gtid=# username/passwordcanal.instance.dbUsername=canal
canal.instance.dbPassword=canal
canal.instance.connectionCharset=UTF-8# enable druid Decrypt database passwordcanal.instance.enableDruid=false#canal.instance.pwdPublicKey=MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBALK4BUxdDltRRE5/zXpVEVPUgunvscYFtEip3pmLlhrWpacX7y7GCMo2/JM6LeHmiiNdH1FWgGCpUfircSwlWKUCAwEAAQ==# table regexcanal.instance.filter.regex=.*\..*# table black regexcanal.instance.filter.black.regex=mysql\.slave_.*# table field filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)#canal.instance.filter.field=test1.t_product:id/subject/keywords,test2.t_company:id/name/contact/ch# table field black filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)#canal.instance.filter.black.field=test1.t_product:subject/product_image,test2.t_company:id/name/contact/ch# mq configcanal.mq.topic=canal-routing-key# dynamic topic route by schema or table regex#canal.mq.dynamicTopic=mytest1.user,topic2:mytest2\..*,.*\..*canal.mq.partition=0

把这两个配置文件映射好,再次提醒,注意实例的路径名,默认是:/example/instance.properties

修改canal配置文件

我们需要修改这个实例配置文件,去连接MySQL,确保以下的配置正确:

canal.instance.master.address=mymysql:3306canal.instance.dbUsername=canal
canal.instance.dbPassword=canal

mymysql是同为docker容器的MySQL环境,端口3306是指内部端口。

这里多说明一下,docker端口配置时假设为:13306:3306,那么容器对外的端口就是13306,内部是3306,在本示例中,MySQL和Canal都是容器环境,所以Canal连接MySQL需要满足以下条件:

  • 处于同一网段(docker-compose.yml中的mynetwork)

  • 访问内部端口(即3306,而非13306)

dbUsername和dbPassword为MySQL账号密码,为了开发方便可以使用root/root,但是我仍建议自行创建用户并分配访问权限:

# 进入docker中的mysql容器dockerexec-it mymysql bash# 进入mysql指令模式mysql-uroot-proot# 编写MySQL语句并执行>...
-- 选择mysqlusemysql;-- 创建canal用户,账密:canal/canalcreateuser'canal'@'%'identifiedby'canal';-- 分配权限,以及允许所有主机登录该用户grantSELECT,INSERT,UPDATE,DELETE,REPLICATIONSLAVE,REPLICATIONCLIENTon*.*to'canal'@'%';-- 刷新一下使其生效flushprivileges;-- 附带一个删除用户指令dropuser'canal'@'%';

用navicat或者shell去登录canal这个用户,可以访问即创建成功

整合SpringBoot Canal实现客户端

Maven依赖:

<canal.version>1.1.5</canal.version><!--canal--><dependency><groupId>com.alibaba.otter</groupId><artifactId>canal.client</artifactId><version>${canal.version}</version></dependency><dependency><groupId>com.alibaba.otter</groupId><artifactId>canal.protocol</artifactId><version>${canal.version}</version></dependency>复制代码

新增组件并启动:

importcom.alibaba.otter.canal.client.CanalConnector;importcom.alibaba.otter.canal.client.CanalConnectors;importcom.alibaba.otter.canal.protocol.CanalEntry;importcom.alibaba.otter.canal.protocol.Message;importorg.springframework.boot.CommandLineRunner;importorg.springframework.stereotype.Component;importjava.net.InetSocketAddress;importjava.util.List;@Componentpublicclass CanalClient {

    private final staticintBATCH_SIZE=1000;publicvoid run(){// 创建链接CanalConnector connector=CanalConnectors.newSingleConnector(new InetSocketAddress("localhost",11111),"canal-exchange","canal","canal");try {//打开连接connector.connect();//订阅数据库表,全部表connector.subscribe(".*\..*");//回滚到未进行ack的地方,下次fetch的时候,可以从最后一个没有ack的地方开始拿connector.rollback();while(true){// 获取指定数量的数据Message message=connector.getWithoutAck(BATCH_SIZE);//获取批量IDlong batchId=message.getId();//获取批量的数量intsize=message.getEntries().size();//如果没有数据if(batchId==-1||size==0){
                    try {//线程休眠2秒Thread.sleep(2000);} catch(InterruptedException e){
                        e.printStackTrace();}
                }else{//如果有数据,处理数据printEntry(message.getEntries());}//进行 batch id 的确认。确认之后,小于等于此 batchId 的 Message 都会被确认。connector.ack(batchId);}
        } catch(Exception e){
            e.printStackTrace();} finally {
            connector.disconnect();}
    }/**
     * 打印canal server解析binlog获得的实体类信息
     */private static void printEntry(List<CanalEntry.Entry>entrys){for(CanalEntry.Entry entry : entrys){if(entry.getEntryType()==CanalEntry.EntryType.TRANSACTIONBEGIN||entry.getEntryType()==CanalEntry.EntryType.TRANSACTIONEND){//开启/关闭事务的实体类型,跳过continue;}//RowChange对象,包含了一行数据变化的所有特征//比如isDdl 是否是ddl变更操作 sql 具体的ddl sql beforeColumns afterColumns 变更前后的数据字段等等CanalEntry.RowChange rowChage;try {
                rowChage=CanalEntry.RowChange.parseFrom(entry.getStoreValue());} catch(Exception e){
                throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:"+entry.toString(),e);}//获取操作类型:insert/update/delete类型CanalEntry.EventType eventType=rowChage.getEventType();//打印Header信息System.out.println(String.format("================》; binlog[%s:%s] , name[%s,%s] , eventType : %s",entry.getHeader().getLogfileName(),entry.getHeader().getLogfileOffset(),entry.getHeader().getSchemaName(),entry.getHeader().getTableName(),eventType));//判断是否是DDL语句if(rowChage.getIsDdl()){
                System.out.println("================》;isDdl: true,sql:"+rowChage.getSql());}//获取RowChange对象里的每一行数据,打印出来for(CanalEntry.RowData rowData : rowChage.getRowDatasList()){//如果是删除语句if(eventType==CanalEntry.EventType.DELETE){
                    printColumn(rowData.getBeforeColumnsList());//如果是新增语句}elseif(eventType==CanalEntry.EventType.INSERT){
                    printColumn(rowData.getAfterColumnsList());//如果是更新的语句}else{//变更前的数据System.out.println("------->; before");printColumn(rowData.getBeforeColumnsList());//变更后的数据System.out.println("------->; after");printColumn(rowData.getAfterColumnsList());}
            }
        }
    }

    private static void printColumn(List<CanalEntry.Column>columns){for(CanalEntry.Columncolumn:columns){
            System.out.println(column.getName()+" : "+column.getValue()+"    update="+column.getUpdated());}
    }
}

启动类Application:

@SpringBootApplicationpublicclass BaseApplication implements CommandLineRunner {@Autowiredprivate CanalClient canalClient;@Overridepublicvoid run(String...args)throws Exception {
        canalClient.run();}
}

启动程序,此时新增或修改数据库中的数据,我们就能从客户端中监听到

不过我建议监听的信息放到消息队列中,在空闲的时候去处理,所以直接配置Canal整合RabbitMQ更好。

Canal整合RabbitMQ

修改canal.properties中的serverMode:

canal.serverMode=rabbitMQ

修改instance.properties中的topic:

canal.mq.topic=canal-routing-key

然后找到关于RabbitMQ的配置:

###########################################################         RabbitMQ       ################################################################ 连接rabbit,写IP,因为同个网络下,所以可以写容器名rabbitmq.host=myrabbit
rabbitmq.virtual.host=/# 交换器名称,等等我们要去手动创建rabbitmq.exchange=canal-exchange# 账密rabbitmq.username=admin
rabbitmq.password=123456# 暂不支持指定端口,使用的是默认的5762,好在在本示例中适用

重新启动容器,进入RabbitMQ管理页面创建exchange交换器和队列queue:

  • 新建exchange,命名为:canal-exchange

  • 新建queue,命名为:canal-queue

  • 绑定exchange和queue,routing-key设置为:canal-routing-key,这里对应上面instance.properties的canal.mq.topic

顺带一提,上面这段可以忽略,因为在SpringBoot的RabbitMQ配置中,会自动创建交换器exchange和队列queue,不过手动创建的话,可以在忽略SpringBoot的基础上,直接在RabbitMQ的管理页面上看到修改记录的消息。

SpringBoot整合RabbitMQ

依赖:

<amqp.version>2.3.4.RELEASE</amqp.version><!--消息队列--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId><version>${amqp.version}</version></dependency>

application.yml:

spring:
  rabbitmq:#    host: myserverhosthost:192.168.0.108port:5672username: admin
    password: RabbitMQ密码# 消息确认配置项# 确认消息已发送到交换机(Exchange)publisher-confirm-type: correlated# 确认消息已发送到队列(Queue)publisher-returns:true

RabbitMQ配置类:

@Configurationpublicclass RabbitConfig {@BeanpublicRabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory){
        RabbitTemplate template=new RabbitTemplate();template.setConnectionFactory(connectionFactory);template.setMessageConverter(new Jackson2JsonMessageConverter());returntemplate;}/**
     * template.setMessageConverter(new Jackson2JsonMessageConverter());
     * 这段和上面这行代码解决RabbitListener循环报错的问题
     */@BeanpublicSimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory){
        SimpleRabbitListenerContainerFactory factory=new SimpleRabbitListenerContainerFactory();factory.setConnectionFactory(connectionFactory);factory.setMessageConverter(new Jackson2JsonMessageConverter());returnfactory;}
}

Canal消息生产者:

publicstatic final String CanalQueue="canal-queue";publicstatic final String CanalExchange="canal-exchange";publicstatic final String CanalRouting="canal-routing-key";复制代码/**
 * Canal消息提供者,canal-server生产的消息通过RabbitMQ消息队列发送
 */@Configurationpublicclass CanalProvider {/**
     * 队列
     */@BeanpublicQueue canalQueue(){/**
         * durable:是否持久化,默认false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在;暂存队列:当前连接有效
         * exclusive:默认为false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable
         * autoDelete:是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除
         */returnnew Queue(RabbitConstant.CanalQueue,true);}/**
     * 交换机,这里使用直连交换机
     */@BeanDirectExchange canalExchange(){returnnew DirectExchange(RabbitConstant.CanalExchange,true,false);}/**
     * 绑定交换机和队列,并设置匹配键
     */@BeanBinding bindingCanal(){returnBindingBuilder.bind(canalQueue()).to(canalExchange()).with(RabbitConstant.CanalRouting);}
}

Canal消息消费者:

/**
 * Canal消息消费者
 */@Component@RabbitListener(queues=RabbitConstant.CanalQueue)publicclass CanalComsumer {
    private final SysBackupService sysBackupService;publicCanalComsumer(SysBackupService sysBackupService){
        this.sysBackupService=sysBackupService;}@RabbitHandlerpublicvoid process(Map<String,Object>msg){
        System.out.println("收到canal消息:"+msg);booleanisDdl=(boolean)msg.get("isDdl");// 不处理DDL事件if(isDdl){return;}// TiCDC的id,应该具有唯一性,先保存再说inttid=(int)msg.get("id");// TiCDC生成该消息的时间戳,13位毫秒级long ts=(long)msg.get("ts");// 数据库Stringdatabase=(String)msg.get("database");// 表Stringtable=(String)msg.get("table");// 类型:INSERT/UPDATE/DELETEStringtype=(String)msg.get("type");// 每一列的数据值List<?>data=(List<?>)msg.get("data");// 仅当type为UPDATE时才有值,记录每一列的名字和UPDATE之前的数据值List<?>old=(List<?>)msg.get("old");// 跳过sys_backup,防止无限循环if("sys_backup".equalsIgnoreCase(table)){return;}// 只处理指定类型if(!"INSERT".equalsIgnoreCase(type)&&!"UPDATE".equalsIgnoreCase(type)&&!"DELETE".equalsIgnoreCase(type)){return;}
    }
}

测试一下,修改MySQL中的一条消息,Canal就会发送信息到RabbitMQ,我们就能从监听的RabbitMQ队列中得到该条消息。

33    2024-09-02 09:14:36    Spring RabbitMQ 数据