当前位置: 首页 > 文档资料 > Canal 中文文档 >

Dev Guide

优质
小牛编辑
128浏览
2023-12-01

概述

本文主要介绍 canal 产品整体设计,帮助你快速地熟悉代码脉络。如果你对 canal 还一无所知,只是知道他能进行数据迁移同步,那么建议先行阅读 [[Introduction]] 和 [[AdminGuide]] 两篇文档了解。

产品设计

  • server 代表一个 canal 运行实例,对应于一个 jvm
  • instance 对应于一个数据队列 (1个 canal server 对应 1..n 个 instance )

  • instance 下的子模块

    • eventParser: 数据源接入,模拟 slave 协议和 master 进行交互,协议解析
    • eventSink: Parser 和 Store 链接器,进行数据过滤,加工,分发的工作
    • eventStore: 数据存储
    • metaManager: 增量订阅 & 消费信息管理器

整体类图设计

  • CanalLifeCycle: 所有 canal 模块的生命周期接口
  • CanalInstance: 组合 parser,sink,store 三个子模块,三个子模块的生命周期统一受 CanalInstance 管理
  • CanalServer: 聚合了多个 CanalInstance

EventParser 设计

  • 每个EventParser都会关联两个内部组件

    • CanalLogPositionManager : 记录binlog 最后一次解析成功位置信息,主要是描述下一次canal启动的位点
    • CanalHAController: 控制 EventParser 的链接主机管理,判断当前该链接哪个mysql数据库
  • 目前开源版本只支持 MySQL binlog , 默认通过 MySQL binlog dump 远程获取 binlog ,但也可以使用 LocalBinlog - 类 relay log 模式,直接消费本地文件中的 binlog

CanalLogPositionManager 设计

  • 如果 CanalEventStore 选择的是内存模式,可不保留解析位置,下一次 canal 启动时直接依赖 CanalMetaManager 记录的最后一次消费成功的位点即可. (最后一次ack提交的数据位点)
  • 如果 CanalEventStore 选择的是持久化模式,可通过 zookeeper 记录位点信息,canal instance 发生 failover 切换到另一台机器,可通过读取 zookeeper 获取位点信息
  • 可通过实现自己的 CanalLogPositionManager,比如记录位点信息到本地文件/nas 文件实现简单可用的无 HA 模式

CanalHAController类图设计

  • 失败检测常见方式可定时发送心跳语句到当前链接的数据库,超过一定次数检测失败时,尝试切换到备机
  • 如果有一套数据库主备信息管理系统,当数据库主备切换或者机器下线,推送配置到各个应用节点,HAController 收到后,控制 EventParser 进行链接切换

EventSink类图设计和扩展

  • 常见的 sink 业务有 1:n 和 n:1 形式,目前 GroupEventSink 主要是解决 n:1 的归并类业务

EventStore类图设计和扩展

  • 抽象 CanalStoreScavenge , 解决数据的清理,比如定时清理,满了之后清理,每次 ack 清理等
  • CanalEventStore 接口,主要包含 put/get/ack/rollback 的相关接口. put/get 操作会组成一个生产者/消费者模式,每个 store 都会有存储大小设计,存储满了,put 操作会阻塞等待 get 获取数据,所以不会无线占用存储,比如内存大小
    • EventStore 目前实现了 memory 模式,支持按照内存大小和内存记录数进行存储大小限制
    • 后续可开发基于本地文件的存储模式
    • 基于文件存储和内存存储,开发 mixed 模式,做成两级队列,内存 buffer 有空位时,将文件的数据读入到内存 buffer 中
    • mixed 模式实现可以让 canal 落地消费/订阅的模型,取 1 份binlog数据,提供多个客户端消费,消费有快有慢,各自保留消费位点

MetaManager类图设计和扩展

  • metaManager 目前支持了多种模式,最顶层 memory 和 zookeeper 模式,然后是 mixed 模式-先写内存,再写zookeeper
  • 可通过实现自己的 CanalMetaManager,比如记录位点信息到本地文件/nas文件,简单可用的无 HA 模式

应用扩展

上面介绍了相关模块的设计,这里介绍下如何将自己的扩展代码应用到canal中. 介绍之前首先需要了解instance的配置方式,可参见: AdminGuide 的 Spring 配置这一章节

  • canal instance 基于 spring 的管理方式,主要由两部分组成

    • xxx.properties
    • xxx-instance.xml
  • xxx-instance.xml 描述对应 instance 所使用的模块组件定义,默认的 instance 模块组件定义有

    • memory-instance.xml:选择 memory 模式的组件,速度优先,简单
    • default-instance.xml:选择 mixed/preiodmixed 模式的组件,可以提供 HA 的功能
    • group-instance.xml:提供 n:1 的 sink 模式

如果要应用自己的组件,就只需要定义一份自己的 instance.xml,比如 custom-intance.xml

示例:

<bean class="com.alibaba.otter.canal.instance.spring.support.PropertyPlaceholderConfigurer" lazy-init="false">
    <property name="ignoreResourceNotFound" value="true" />
    <property name="systemPropertiesModeName" value="SYSTEM_PROPERTIES_MODE_OVERRIDE"/><!-- 允许system覆盖 -->
    <property name="locationNames">
        <list>
        <value>classpath:canal.properties</value>
        <value>classpath:${canal.instance.destination:}/instance.properties</value> 
        </list>
     </property>
</bean>

<bean id="instance" class="com.alibaba.otter.canal.instance.spring.CanalInstanceWithSpring">
    <property name="destination" value="${canal.instance.destination}" />
    <property name="eventParser">
    <ref local="eventParser" />
    </property>
    <property name="eventSink">
    <ref local="eventSink" />
    </property>
    <property name="eventStore">
        <ref local="eventStore" />
    </property>
    <property name="metaManager">
    <ref local="metaManager" />
    </property>
    <property name="alarmHandler">
    <ref local="alarmHandler" />
    </property>
</bean>
......
  • 一份 instance.xml 中有一份或者多份 instance 定义,优先以 destination 名字查找对应的 instance bean 定义,如果没有,则按默认的名字 “instance” 查找 instance 对象,例如 xxxx-instance.xml 中定义 id 分别为 instance-1, instance-2 的两个 bean. 这两个 bean 将为同名的 instance 提供自定义的 eventParser , evnetSink , evnetStore , metaManager,alarmHandler.如果没有自定义这些 bean, 就使用 id="instance" 的 bean 来配置 canal instance.
  • 一份 instance bean 定义,需要包含 eventParser , evnetSink , evnetStore , metaManager,alarmHandler 的5个模块定义,( alarmHandler 主要是一些报警机制处理,因为简单没展开,可扩展)
<bean id="instance-1" class="com.alibaba.otter.canal.instance.spring.CanalInstanceWithSpring">
  <property name="destination" value="${canal.instance.destination}" />
  <property name="eventParser">
      <ref local="eventParser" />
  </property>
  ......
</bean>
<bean id="instance-2" class="com.alibaba.otter.canal.instance.spring.CanalInstanceWithSpring">
  <property name="destination" value="${canal.instance.destination}" />
  .......
</bean>

完成 custom-instance.xml 定义后,可通过 canal.properties 配置中进行引入

<pre class="java" name="code">canal.instance.{通道名字}.spring.xml = classpath:spring/custom-instance.xml

到这里,就完成了扩展组件的应用,启动canal instance后,就会使用自定义的的组件。