對(duì)于將MySQL數(shù)據(jù)傳輸?shù)終afka,為了保證不丟失數(shù)據(jù),需要考慮以下幾個(gè)方面:
1. 使用可靠的Kafka生產(chǎn)者配置
bootstrap.servers=kafka_broker1:9092,kafka_broker2:9092,kafka_broker3:9092 acks=all retries=3
在配置文件中,建議設(shè)置Kafka生產(chǎn)者的bootstrap.servers為多個(gè)Kafka服務(wù)器地址,保證在一個(gè)服務(wù)器宕機(jī)時(shí),可以自動(dòng)切換到其他可用服務(wù)器,避免數(shù)據(jù)丟失。acks=all選項(xiàng)可以確保當(dāng)所有in-sync副本均已成功接受消息時(shí),才標(biāo)記為已處理。
2. 合理設(shè)置事務(wù)的提交間隔
producer.enable.idempotence=true transaction.timeout.ms=60000 linger.ms=10 batch.size=16384
事務(wù)的提交間隔需要根據(jù)系統(tǒng)負(fù)載、數(shù)據(jù)量大小、網(wǎng)絡(luò)延遲等因素來設(shè)置。設(shè)置事務(wù)timeout避免事務(wù)過長而超時(shí),而linger.ms決定了Producer端空閑多久將緩沖區(qū)中的數(shù)據(jù)批量發(fā)送,batch.size設(shè)置的則是Per Partition Buffer Memory的大小,即多少條消息才送一次。
3. 使用Kafka Connect和Debezium完成數(shù)據(jù)同步
name=mysql-connector connector.class=io.debezium.connector.mysql.MySqlConnector database.hostname=localhost database.port=3306 database.user=root database.password=mysql123 database.server.id=1 database.server.name=mysqlserver database.history.kafka.bootstrap.servers=kafka_broker1:9092,kafka_broker2:9092,kafka_broker3:9092 database.history.kafka.topic=MySQLHistory table.include.list=databaseName.tableName
使用Kafka Connect和Debezium可以將MySQL數(shù)據(jù)自動(dòng)同步到Kafka,并且在同步過程中可以保證“至少一次”傳輸。只有當(dāng)從MySQL讀取到所有數(shù)據(jù)記錄并成功傳輸?shù)終afka時(shí),Kafka Connect才會(huì)將“確認(rèn)”記錄寫回到binlog,從而保證數(shù)據(jù)不丟失。