本文主要描述Alibaba Canal中間件,官方文檔請參考:
1)gitlab:https://github.com/alibaba/canal
2)主要原理介紹:https://github.com/alibaba/canal/wiki/canal%E4%BB%8B%E7%BB%8D
2)運維操作文檔:https://github.com/alibaba/canal/wiki/AdminGuide
下文的介紹,基于大家對上述文檔的基本了解!
1)Canal版本為:1.0.24
2)通過Canal同步數據庫數據變更事件,并由下游的消費者消費,將數據轉存到ES或者跨機房的DB中。
一、設計目標
1、監控canal組件以及客戶端消費者
2、通過平臺,能夠實時查看監控數據。canal問題的定位應該快速,且運行狀態數據可見。
3、按需提供報警策略。
4、平臺支持添加canal集群的監控。
5、canal組件的部署和使用遵守約定,canal的實施應該快速。
我們希望構建一個canal服務:根據用戶需求,能夠快速構建canal集群,包括環境隔離;此外canal組件、上游的MySQL、下游的consumer等數據鏈路的整體狀態都在監控之中,且數據可見。我們希望任何利益相關者,都可以參與到數據決策中,并按需提供報警、預警機制。
二、基于Canal架構設計
1、整體架構
1)、每個Canal 集群應該至少有2個Canal實例,軟硬件配置應該對等。我們不應該在同一個Cluster的多個節點上,配置有任何差異。
2)、一個Canal可以多個“instances”,每個instance對應一個“MySQL實例”的一個database(專業來說,一個instance對應一個MySQL實例,支持其上的多個databases);簡單而言,我們認為一個instance相當于一個邏輯Slave。
3)、由2、可以得出,每個Canal Instance的全局處理的數據總量與一個正常的MySQL Slave相同,如果保持同等SLA,從Canal instance角度考慮,它的硬件能力應該與MySQL Slave保持相同。(同為單線程處理)。
4)、原則上,每個Canal可以支持“數十個instance”,但是instance的個數最終會影響instance同步數據的效能。我們建議,一個Canal盡量保持一個instance;除非Slave數據變更極小,我們才會考慮合并instances,以提高Canal組件的利用效率。
5)、每個instance,一個單獨的處理線程,用于負責“binlog dump”、“解析”、“入隊和存儲”。
6)、Canal集群模式,必須依賴Zookeeper,但是對Zookeeper的數據交互并不頻繁。
7)、Canal集群運行態,為“M-S”模式。但是“M-S”的粒度為“instance”級別。如果當前Canal的instance,與MySQL建立連接并進行binlog解析時,發生一定次數的“網絡異常”等,將會判定為當前instance失效,并stop(備注:此時會刪除注冊在ZK的相關臨時節點)。同時,集群中的每個Canal都會注冊所有“destination”(每個destination將有一個instance服務)的狀態變更事件,如果“臨時節點”被刪除(或者不存在),則會出發搶占,搶占成功,則成為此instance的Master。
(源碼:CanalController.initGlobalConfig(),
ServerRunningMonitor.start(),
HeartBeatHAController.onFailed()
)
8)、根據7、,我們得知,如果Canal組件中有多個instances,有可能這些instances的Master會分布在不同的Canal節點上。
9)、在運維層面,我們基于“default-instance.xml”配置,基于“spring”模式;每個instance的配置,放置在各自的文件夾下。(${canal.root}/conf/${destination}/instance.properties)
10)、每個Canal節點,在啟動時會初始化一個“嵌入式server”(NettyServer),此server主要目的是向Consumer提供服務。server的“ip:port”信息會注冊在ZK中,此后Consumer通過ZK來感知。
(源碼:
ServerRunningMonitor.initRunning(),
ClusterNodeAccessStrategy構造方法,
ZookeeperPathUtils.getDestinationServerRunning(destination)
)
11)、在Canal運行期間,可以動態的增加instances配置、修改instances配置。
2、Canal內部組件解析
1)Canal節點,可以有多個instances,每個instance在運行時為一個單獨的Spring Context,對象實例為“CanalInstanceWithSpring”。
2)每個instances有一個單獨的線程處理整個數據流過程。
3)instance內部有EventParser、EventSink、EventStore、metaManager主要四個組件構成,當然還有其他的守護組件比如monitor、HA心跳檢測、ZK事件監聽等。對象實例初始化和依賴關系,可以參見“default-instance.xml”,其配置模式為普通的Spring。
(源碼參見:SpringCanalInstanceGenerator)
4)Parser主要用于解析指定"數據庫"的binlog,內部基于JAVA實現的“binlog dump”、“show master status”等。Parser會與ZK交互,并獲取當前instance所有消費者的cursor,并獲其最小值,作為此instance解析binlog的起始position。目前的實現,一個instance同時只能有一個consumer處于active消費狀態,ClientId為定值“1001”,“cursor”中包含consumer消費binlog的position,數字類型。有次可見,Canal instance本身并沒有保存binlog的position,Parser中繼操作是根據consumer的消費cursor位置來決定;對于信息缺失時,比如Canal集群初次online,且在“default-instance.xml”中也沒有指定“masterPositiion”信息(每個instance.properties是可以指定起始position的),那么將根據“show master status”指令獲取當前binlog的最后位置。
(源碼:MysqlEventParser.findStartPosition())
5)Parser每次、批量獲取一定條數的binlog,將binlog數據封裝成event,并經由EventSink將消息轉發給EventStore,Sink的作用就是“協調Parser和Store”,確保binglog的解析速率與Store隊列容量相容。
(參見源碼:AbstractEventParser.start(),
EntryEventSink.sink()
)
6)EventStore,用于暫存“尚未消費”的events的存儲隊列,默認基于內存的阻塞隊列實現。Store中的數據由Sink組件提交入隊,有NettyServer服務的消費者消費確認后出隊,隊列的容量和容量模式由“canal.properties”中的“memory”相關配置決定。當Store中容量溢滿時,將會阻塞Sink操作(間接阻塞Parser),所以消費者的效能會直接影響instance的同步效率。
7)metaManager:主要用于保存Parser組件、CanalServer(即本文中提到的NettyServer)、Canal Instances的meta數據,其中Parser組件涉及到的binlog position、CanalServer與消費者交互時ACK的Cursor信息、instance的集群運行時信息等。根據官方解釋,我們在production級別、高可靠業務要求場景下,metaManager建議基于Zookeeper實現。
其中有關Position信息由CanalLogPositionManager類負責,其實現類有多個,在Cluster模式下,建議基于FailbackLogPositionManager,其內部有“primary”、“failback”兩級組合,優先基于primary來存取Position,只有當primary異常時會“降級”使用failback;其配置模式,建議與“default-instance.xml”保持一致。
(參看源碼:CanalMetaManager,PeriodMixedMetaManager)
3、Consumer端
1)Consumer允許分布式部署,多個對等節點互備。但是任何時候,同一個destination的消費者只能有一個(client實例),這種排他、協調操作由zookeeper承擔。在Cluster模式下,指定zkServer的地址,那么Consumer將會從meta信息中獲取指定destination所對應的instance運行在哪個Canal節點上,且CanalServer(即NettyServer)的ip:port信息,那么此時Consumer將根據“ip:port”與NettyServer建立連接,并進行數據交互。
(參見源碼:SimpleCanalConnector.connect(),
ClientRunningMonitor.start()
)
2)Consumer有序消費消息,嚴格意義上說,我們強烈建議Consumer為單線程逐條處理。盡管研發同學,有很多策略可以讓消息的處理過程使用多線程,但是對于消息的ACK將需要特殊的關注,而且非有序情境下,或許會對你的數據一致性有一定的影響。
3)消費者的消費效率,取決于“業務本身”,我們建議業務處理盡可能“短平快”。如果你的業務處理相對耗時,也不建議大家再使用“比如MQ、kafka”等其他異步存儲做橋接,因為這本質上對提高endpoint端效能沒有太大幫助,反而增加了架構的復雜性。
4)我們嚴格限制:消費者在處理業務時,必須捕獲所有異常,并將異常的event和處理過程的exception打印到業務日志,以備將來進行數據補償;捕獲異常,有助于Consumer可以繼續處理后續的event,那么整個canal鏈路不會因為一條消息而導致全部阻塞或者rollback。
5)Consumer單線程運行,阻塞、流式處理消息,獲取event的方式為pull + batch;每個batch的size由配置決定,一個batch獲取結束后,將會逐個調用業務的process方法,并在整個batch處理結束后,按需進行ack或者rollback。
6)需要注意:rollback操作是根據batchId進行,即回滾操作將會導致一個batch的消息會被重發;后續有重復消費的可能,這意味著業務需要有兼容數據冪等的能力。
7)消費者的ClientId為定值:1001,不可修改。
三、部署與最佳實踐(建議)
1、Canal集群部署
1)Production場景,節點個數至少為2,考慮到Canal自身健壯性,也不建議Canal單組集群的節點數量過多。
2)Canal節點為“網絡IO高耗”、“CPU高耗”(并發要求較高,體現在instance處理、consumer交互頻繁)型應用,對磁盤IO、內存消耗很低。
3)不建議Canal與其他應用混合部署,我們認定Canal為核心組件,其可用性應該被保障在99.99%+。
4)每個Canal集群的instances個數,并沒有嚴格限制,但其所能承載的數據量(TPS,包括consumer + binlog parser)是評估instances個數的主要條件。考慮到Production級別數據變更的場景不可控,我們建議每個Canal集群的instance個數,應該在1~3個。
5)對于核心數據庫、TPS操作較高的數據庫,應該使用單獨的Canal。
6)Canal集群的個數多,或者分散,或者利用率低,并不是我們特別關注的事情,不要因為過度考慮“資源利用率”、“Consumer的集中化”而讓Canal負重。
7)Canal的配置,絕大部分可以使用“默認”,但是要求在Production場景,instance模式必須使用Spring,配置方式采用“default-instance.xml”。“default-instance.xml”默認配置已滿足我們HA環境下的所有設計要求。(版本:1.0.24)
8)Canal機器的配置要求(最低):4Core、8G;建議:8Core、16G。
9)Canal的上游,即MySQL實例,可以是“Master”或者任意level的Slave,但是無論如何,其binlog_format必須為ROW,通過使用“show variables like 'binlog_format"”來確定。目前已經驗證,使用mixed模式可能導致某些UPDATE操作事件無法被消費者解析的問題。
2、Zookeeper集群
1)Zookeeper集群,要求至少3個節點。網絡聯通性應該盡可能的良好。
2)多個Canal Cluster可以共享一個ZK集群,而且建議共享。那么只需要在canal.properties文件中“zkServers”配置項增加“rootPath”后綴即可,比如“10.0.1.21:2181,10.0.1.22:2181/canal/g1”。但是不同的Canal cluster,其rootPath應該不同。我們約定所有的Canal集群,rootpath的都以“/canal/”開頭。(這對我們后續的ZK監控比較有利,我們只需要遍歷"/canal"的子節點即可知道集群信息)
3)業界也有一種通用的部署方式,zookeeper集群與canal共生部署,三個節點,每個節點上都部署一個ZK和canal;這種部署模式的出發點也是比較簡單,分析canal問題時只需要通過本地zk即可。(僅為建議)
4)需要非常注意,rootpath必須首先創建,否則canal啟動時將會拋出異常!
3、Consumer集群
1)Consumer實例為普通application,JAVA項目,Spring環境。
2)Consumer集群至少2個節點,分布式部署。運行態為M-S。
3)每個Consumer實例為單線程,Consumer本身對CPU、內存消耗較低,但是對磁盤有一定的要求,因為我們將會打印大量的日志。建議磁盤為200G + ,logback的日志格式應該遵守我司規范,后續承接ELK基礎數據平臺。
4)一個Application中,允許有多個Consumer實例。
5)Consumer的業務處理部分,必須捕獲全部異常,否則異常逃逸將可能導致整個鏈路的阻塞;對于異常情況下,建議進行日志記錄,稍后按需進行數據補償。
6)Consumer的業務處理部分,我們要求盡可能的快,業務處理簡單;最重要的是千萬不要在業務處理部分使用比如“Thread.sleep”、“Lock”等阻塞線程的操作,這可能導致主線程無法繼續;如果必須,建議使用分支線程。
7)如果你對消息的順序、事務不敏感,也允許你在業務處理部分使用多線程,這一部分有一定的歧義,所以需要開發者自己評估。從原理上說,多線程可以提高消息消費的效率,但是對數據一致性可能會有影響。但是Consumer的Client框架,仍然堅守單線程、有序交付。
8)在CanalServer和Consumer端,都能指定“filter”,即“過濾不關注的schema消息”;在CanalServer啟動時將會首先加載“instance.properties”中的filter配置并生效,此后如果instance的消費者上線且也指定了filter,那么此filter信息將會被注冊ZK中,那么CanalServer將會基于ZK獲取此信息,并將Consumer端的filter作為最終決策;由此可見,我們在Consumer端指定filter的靈活性更高(當然隱蔽性也增加,這對排查問題需要一些提前溝通),無論如何,CanalServer不會傳送“不符合filter”的消息給Consumer。
4、Filter規則描述:適用于instance.properties和Consumer端的subscribe()方法
1) 所有表:.* or .*\\..*
2) canal schema下所有表: canal\\..*
3) canal下的以canal打頭的表:canal\\.canal.*
4) canal schema下的一張表:canal.test1
5) 多個規則組合使用:canal\\..*,mysql.test1,mysql.test2 (逗號分隔)
5、運行狀態監控
非常遺憾的是,Canal監控能力相當的弱,內部程序中幾乎沒有JMX的任何export機制,所以如果需要監控比如“slave延遲”、“消費速率”、“position”等,需要開發代碼。思路如下:
1)開發一個JAVA WEB項目。
2)讀取ZK中的相關META信息,解析出每個destination對于的slave地址,并創建JDBC連接,發送“show master status”等指令,查看此slave binlog的位置,用于判斷Canal延遲。
3)讀取ZK中相關META信息,解析出每個destination對應的consumer cursor,與2)進行對比,用于判定consumer的消費延遲。
四、Canal核心配置樣例
1、canal.properties (${canal.root}/conf)
Java代碼 ## 當前canal節點部署的instances列表,以“,”分割 ##比如:test,example canal.destinations= example ##canal配置文件主目錄,保持默認即可。 ##除非你為了提高canal的動態管理能力,將conf文件遷移到了其他目錄(比如NFS目錄等) canal.conf.dir = ../conf # 是否開啟“instance”配置修改自動掃描和重載 ##1)conf.dir目錄下新增、刪除instance配置目錄 ##2)instance配置目錄下的instance.properties變更 ##不包含:canal.properties,spring/*.xml的配置變更 ##如果環境隔離、測試充分的環境下,或者應用試用初期,可以開啟 ##對于高風險項目,建議關閉。 canal.auto.scan = true canal.auto.scan.interval = 5 ##instance管理模式,Production級別我們要求使用spring canal.instance.global.mode = spring ##直接初始化和啟動instance canal.instance.global.lazy = false ##Production級別,HA模式下,基于default-instance.xml ##需要即備的ZK集群,且不應該修改此文件的默認配置。 ##如果有自定義的場景,應該新建${instance}-instance.xml文件 canal.instance.global.spring.xml = classpath:spring/default-instance.xml ##canal server的唯一標識,沒有實際意義,但是我們建議同一個cluster上的不同節點,其ID盡可能唯一(后續升級) ##數字類型 canal.id = 1 ##canal server因為binding的本地IP地址,建議使用內網(唯一,集群可見,consumer可見)IP地址,比如“10.0.1.21”。 #此IP主要為canalServer提供TCP服務而使用,將會被注冊到ZK中,Consumer將與此IP建立連接。 canal.ip = ##conal server的TCP端口 canal.port = 11111 ##Production場景,HA模式下,比如使用ZK作為服務管理,此處至少指定“多數派ZK Node”的IP列表 ##如果你的多個Canal Cluster共享ZK,那么每個Canal還需要使用唯一的“rootpath”。 canal.zkServers = 10.0.1.21:2818,10.0.1.22,10.0.2.21:2818/canal/g1 # flush data to zk ##適用于metaManager,基于period模式 ##metaManager優先將數據(position)保存在內存,然后定時、間歇性的將數據同步到ZK中。 ##此參數用于控制同步的時間間隔,建議為“1000”(1S),單位:ms。 ##運維或者架構師,應該觀察ZK的效能,如果TPS過于頻繁,可以提高此值、或者按Canal集群分離ZK集群。 ##目前架構下,Consumer向CanalServer提交ACK時會導致ZK數據的同步。 canal.zookeeper.flush.period = 1000 ##canal將parse、position數據寫入的本地文件目錄,HA環境下無效。 ##(file-instance.xml) canal.file.data.dir = ${canal.conf.dir} canal.file.flush.period = 1000 ##內存模式,EventStore為Memory類型時。(default-instance.xml) ##可選值: ##1) MEMSIZE 根據buffer.size * buffer.memunit的大小,限制緩存記錄的大小,簡答來說,就是內存容量大小限制 ##2) ITEMSIZE 根據buffer.size進行限制,簡單來說,就是根據event的條數限制。 ##如果Canal上的instances個數有限,且Consumer的消費效率很高,甚至接近或者高于binlog解析效率,那么可以適度增加memory有關的數值。 ##此外batchMode還與消費者的batchSize有些關系,消費者每次能消費的數據量,取決于此mode。 ##如果mode為itemSize,則consumer每次獲取的消息的條數為batchSize條。 ##如果mode為memSize,那么consumer消費的數據總量為batchSize * memunit canal.instance.memory.batch.mode = MEMSIZE canal.instance.memory.buffer.size = 16384 canal.instance.memory.buffer.memunit = 1024 # 所能支撐的事務的最大長度,超過閾值之后,一個事務的消息將會被拆分,并多次提交到eventStore中,但是將無法保證事務的完整性 canal.instance.transaction.size = 1024 # 當instance.properties配置文件中指定“master”、“standby”時,當canal與“master”聯通性故障時,觸發連接源的切換, ##那么切換時,在新的mysql庫上查找binlog時需要往前“回退”查找的時間,單位:秒。 ##良好架構下,我們建議不使用“standby”,限定一個數據庫源。因為多個源時,數據庫的調整頻繁、協調不足,可能會引入一些數據問題。 canal.instance.fallbackIntervalInSeconds = 60 ## 有關HA心跳檢測部分,主要用在Parser管理dump連接時使用。 ## 我們在HA環境時建議開啟。 canal.instance.detecting.enable = true #如果你需要限定某個database的可用性驗證(比如庫鎖), #最好使用復雜的、有效的SQL,比如:insert into {database}.{tmpTable} .... canal.instance.detecting.sql = select 1 ##心跳檢測頻率,單位秒 canal.instance.detecting.interval.time = 6 ##重試次數 ##非常注意:interval.time * retry.threshold值,應該參考既往DBA同學對數據庫的故障恢復時間, ##“太短”會導致集群運行態角色“多跳”;“太長”失去了活性檢測的意義,導致集群的敏感度降低,Consumer斷路可能性增加。 canal.instance.detecting.retry.threshold = 5 #如果在instance.properties配置了“master”、“standby”,且此參數開啟時,在“探測失敗”后,會選擇備庫進行binlog獲取 #建議關閉 canal.instance.detecting.heartbeatHaEnable = false # CanalServer、instance有關的TCP網絡配置,建議保持抱人 canal.instance.network.receiveBufferSize = 16384 canal.instance.network.sendBufferSize = 16384 canal.instance.network.soTimeout = 30 # Parser組件,有關binlog解析的過濾 ##是否過濾dcl語句,比如“grant/create user”等 canal.instance.filter.query.dcl = false ##dml語句:insert/update/delete等 canal.instance.filter.query.dml = false ##ddl語句:create table/alter table/drop table以及一些index變更 canal.instance.filter.query.ddl = false canal.instance.filter.table.error = false canal.instance.filter.rows = false # binlog格式和“鏡像”格式檢測,建議保持默認 canal.instance.binlog.format = ROW,STATEMENT,MIXED canal.instance.binlog.image = FULL,MINIMAL,NOBLOB # ddl是否隔離發送,保持默認 canal.instance.get.ddl.isolation = falsecanal.properties為全局配置,約束所有的instances、CanalServer等。
2、instance.properties (${canal.root}/conf/{instance})
Java代碼 ## 每個instance都會偽裝成一個mysql slave, ## 考慮到binlog同步的機制,我們需要指定slaveId,注意此ID對于此canal前端的MySQL實例而言,必須是唯一的。 ## 同一個Canal cluster中相同instance,此slaveId應該一樣。 ## 我們約定,所有Canal的instance,其slaveId以“1111”開頭,后面補充四位數字。 canal.instance.mysql.slaveId = 11110001 # 數據庫相關:master庫 ##備注,master并不是要求是“MySQL 數據庫Master”, ## 而是Canal instance集群模式下,HA運行態中“master”(首選節點) ## 當在故障恢復、Canal遷移時,我們需要手動指定binlog名稱以及postition或者timestamp,確保新Canal不會丟失數據。 ## 數據庫實例地址,ip:port canal.instance.master.address = 127.0.0.1:3306 ##指定起始的binlog文件名,保持默認 canal.instance.master.journal.name = ##此binlog文件的position位置(offset),數字類型。獲取此position之后的數據。 canal.instance.master.position = ##此binlog的起始時間戳,獲取此timestamp之后的數據。 canal.instance.master.timestamp = ##standby庫 ##考慮到我司現狀,暫不使用standby #canal.instance.standby.address = #canal.instance.standby.journal.name = #canal.instance.standby.position = #canal.instance.standby.timestamp = # 數據庫連接的用戶名和密碼 # 貌似Consumer與CanalServer建立連接時也用的是此用戶名和密碼 canal.instance.dbUsername = canal canal.instance.dbPassword = canal # 默認數據庫 canal.instance.defaultDatabaseName = canal.instance.connectionCharset = UTF-8 # schema過濾規則,類似于MySQL binlog的filter # canal將會過濾那些不符合要求的table,這些table的數據將不會被解析和傳送 # filter格式,Consumer端可以指定,只不過是后置的。 ## 無論是CanalServer還是Consumer,只要有一方指定了filter都會生效,consumer端如果指定,則會覆蓋CanalServer端。 canal.instance.filter.regex = .*\\..* # table black regex canal.instance.filter.black.regex =3、default-instance.xml (${canal.root}/conf/spring)
建議保持默認