實戰 kafka connector 與 debezium mysql
本文已參與「新人創作禮」活動,一起開啟掘金創作之路。
kafka connector
Kafka Connect 是一個可擴充套件、可靠的在Kafka和其他系統之間流傳輸的資料工具。它可以通過connectors(聯結器)簡單、快速的將大集合資料匯入和匯出kafka。
1. 安裝配置
1.1 下載並配置kafka
我們使用的是kafka_2.13-2.8.0.tgz 您可以自行下載
解壓到目錄 D:\soft\kafka_2.13-2.8.0 這是我的根目錄
修改 D:\soft\kafka_2.13-2.8.0\config 資料夾以下檔案內容
zookeeper.properties 需要修改的地方:
textile
dataDir=D:/soft/kafka_2.13-2.8.0/data/zookeeper
server.properties 需要修改的地方:
txt
log.dirs=D:/soft/kafka_2.13-2.8.0/data/kafka-logs
在D:\soft\kafka_2.13-2.8.0\bin\windows 下我們新建一個 a.bat 檔案內容如下
batch
start zookeeper-server-start.bat ../../config/zookeeper.properties
rem -E 如果sleep命令不可用可以用 ping -n 10 127.0.0.1>nul
sleep 5
start kafka-server-start.bat ../../config/server.properties
這個命令會先啟動 zookeeper 等zookeeper啟動5秒後 再啟動 kafka
1.2 下載圖形管理介面 kafka-map
官方 kafka-map: 一個美觀簡潔且強大的kafka web管理工具。
下載地址 http://github.com/dushixiang/kafka-map/releases/latest/download/kafka-map.tgz
解壓到 D:\soft\kafka_2.13-2.8.0\kafka-map
在 D:\soft\kafka_2.13-2.8.0\kafka-map 新建 startKafkaMap.bat 內容如下:
batch
java -jar kafka-map.jar
1.3 啟動驗證
啟動 a.bat
啟動 startKafkaMap.bat
訪問 http://localhost:8080/ ,賬戶和密碼預設都是 admin
看到如下頁面就是成功了
新增叢集
2. kafka connector入門樣例
我們先執行一下官方提供的一個例子,這個例子中會把server.properties 用FileStreamSource 讀取然後儲存到kafka 佇列
然後用FileStreamSink 消費佇列並儲存到 server.properties1 中
2.1 配置connector
D:\soft\kafka_2.13-2.8.0\config\connect-distributed.properties
textile
bootstrap.servers=localhost:9092
2.2 啟動 connector
在 D:\soft\kafka_2.13-2.8.0\bin\windows 目錄下先建立一個aConnect.bat 內容如下:
textile
start zookeeper-server-start.bat ../../config/zookeeper.properties
sleep 5
start kafka-server-start.bat ../../config/server.properties
sleep 5
start connect-distributed.bat ../../config/connect-distributed.properties
2.3 檢查環境
傳送 GET 請求到 http://localhost:8083/ 檢視狀態
檢視所有外掛 http://localhost:8083/connector-plugins
2.3 配置 file connector source 端
FileStreamSource 會把 "D:/soft/kafka_2.13-2.8.0/config/server.properties" 這個檔案逐行讀取後儲存到 "kafka-config-topic" 這個佇列中
http://localhost:8083/connectors
json
{
"name":"load-kafka-config",
"config":{
"connector.class":"FileStreamSource",
"file":"D:/soft/kafka_2.13-2.8.0/config/server.properties",
"topic":"kafka-config-topic"
}
}
介面返回結果截圖如下
檢視狀態
http://localhost:8083/connectors/load-kafka-config/status
在kafka-map中 點選箭頭所指內容
檢視topic 內容
2.4 配置 file connector sink端
FileStreamSink 會消費 "kafka-config-topic" 這個佇列並把 內容儲存到 "D:/soft/kafka_2.13-2.8.0/config/server.properties1"
http://localhost:8083/connectors
json
{
"name":"dump-kafka-config",
"config":{
"connector.class":"FileStreamSink",
"file":"D:/soft/kafka_2.13-2.8.0/config/server.properties1",
"topics":"kafka-config-topic"
}
}
介面返回結果如下;
執行完成後會發現 D:\soft\kafka_2.13-2.8.0\config 目錄下 多了一個server.properties1 檔案,而且檔案內容和 server.properties 一樣
2.5 檢視當前活躍的 connector
http://localhost:8083/connectors
2.6 刪除connector
DELETE http://localhost:8083/connectors/load-kafka-config
DELETE http://localhost:8083/connectors/dump-kafka-config
檢視 活躍的connetor,發現結果已經為空
3. 整合 debezium mysql connector
在這個例子中 我們會用 debezium mysql connector 監控mysql binlog 並把變更內容儲存到kafka 佇列中
3.1 mysql 準備
為了簡單專門用於複製的使用者我們就不建立了,我們直接用root 使用者
必須配置以下引數
config
[mysqld]
server-id = 8023
port=8023
log-bin=D:/soft/mysql/mysql8023/data1/8023/binlog/mysql-bin
transaction-isolation= READ-COMMITTED
binlog_format = ROW
gtid_mode =ON
enforce_gtid_consistency = ON
default-time-zone = '+8:00'
還有一些可選的優化引數請參考官方文件
http://debezium.io/documentation/reference/2.0/connectors/mysql.html#setting-up-mysql
準備好後就可以啟動mysql了,在此我們用的mysql版本為8.0.29
準備一個測試庫和一個測試表
``sql
CREATE DATABASE
test1` ;
CREATE TABLE demo
(
id
INT(10) NOT NULL,
name
VARCHAR(50) NULL DEFAULT NULL ,
PRIMARY KEY (id
) USING BTREE
);
-- 插入測試資料
INSERT INTO demo
(id
, name
) VALUES (111, '222');
```
3.2 debezium 準備
下載 debezium 整合kafka connect 外掛
http://repo1.maven.org/maven2/io/debezium/debezium-connector-mysql/2.0.0.Alpha1/debezium-connector-mysql-2.0.0.Alpha1-plugin.tar.gz
把 檔案debezium-connector-mysql-2.0.0.Alpha1-plugin.tar.gz 解壓到 D:\soft\kafka_2.13-2.8.0\plugin
在 connect-distributed.properties 配置如下引數,注意路徑不要包含 debezium-connector-mysql
config
plugin.path=D:/soft/kafka_2.13-2.8.0/plugin/
新增完成後需要重啟kafka 和 connect
檢視啟動日誌是否已經載入外掛
txt
[2022-05-11 16:09:48,408] INFO Registered loader: PluginClassLoader{pluginLocation=file:/D:/soft/kafka_2.13-2.8.0/plugin/debezium-connector-mysql/} (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:269)
[2022-05-11 16:09:48,410] INFO Added plugin 'io.debezium.connector.mysql.MySqlConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:198)
檢視可用的外掛
3.3 安裝connnector
傳送 POST 給 http://localhost:8083/connectors
請求正文為 connector 配置引數
json
{
"name": "conn-8023",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"database.hostname": "localhost",
"database.port": "8023",
"database.user": "root",
"database.password": "123456",
"database.server.id": "8023",
"database.server.name": "local8023",
"database.include.list": "test1",
"database.history.kafka.bootstrap.servers": "localhost:9092",
"database.history.kafka.topic": "dbhistory.local8023",
"include.schema.changes": "true"
}
}
參考圖片
3.4 檢視執行狀態
用kafka-map 檢視資料
檢視 local8023.test1.demo 的資料
json
{
"schema": {
...
},
"payload": {
"before": null,
"after": {
"id": 111,
"name": "222"
},
"source": {
...
},
"op": "r",
"ts_ms": 1652258099933,
"transaction": null
}
}
再執行一些sql測試一下
sql
-- 插入測試資料
INSERT INTO `demo` (`id`, `name`) VALUES (222, '新資料222');
update demo set name='111修改後' where id=111;
DELETE FROM demo WHERE id=222;
插入事件
json
"payload": {
"before": null,
"after": {
"id": 222,
"name": "新資料222"
},
"source": {
...
},
"op": "c",
"ts_ms": 1652258788884,
"transaction": null
}
修改事件
json
"payload": {
"before": {
"id": 111,
"name": "222"
},
"after": {
"id": 111,
"name": "111修改後"
},
"source": {
...
},
"op": "u",
"ts_ms": 1652258788887,
"transaction": null
}
刪除事件
json
"payload": {
"before": {
"id": 222,
"name": "新資料222"
},
"after": null,
"source": {
...
},
"op": "d",
"ts_ms": 1652258985267,
"transaction": null
}
刪除後的墓碑事件