圖解 Kafka 生產者初始化核心流程

語言: CN / TW / HK

點選上方 "華仔聊技術 " 右上角 選擇設為星標

硬核技術文章不會錯過!

閱讀本文大約需要 30 分鐘。

大家好,我是 華仔, 又跟大家見面了。

從今天開始我將以 Kafka 2.7 版本為主,通過 場景驅動 的方式帶大家一點點的對 Kafka 原始碼進行深度剖析,正式開啟  Kafka的原始碼之旅 」, 跟我一起來掌握 Kafka 原始碼核心架構設計思想吧

今天這篇我們先來聊聊 Kafka 生產者初始化時用到的核心元件以及傳送的核心流程,帶你梳理生產者初始化整體的原始碼分析脈絡。

認真讀完這篇文章,我相信你會對 Kafka 生產初始化原始碼有更加深刻的理解。

這篇文章乾貨很多,希望你可以耐心讀完。

01 總體概述

我們都知道在 Kafka 中,我們把產生訊息的一方稱為生產者即 Producer,它是 Kafka 核心元件之一,也是訊息的來源所在。那麼這些生產者產生的訊息是如何傳到 Kafka 服務端的呢?初始化過程是怎麼樣的呢?接下來會逐一講解說明。

02 生產者初始化核心元件及流程剖析

我們先從生產者客戶端構造  KafkaProducer 開始講起:

Properties properties = new Properties();
//構造 KafkaProducer
KafkaProducer producer = new KafkaProducer(properties);
//呼叫send非同步回調發送
producer.send(record,new DemoCallBack(record.topic(), record.key(), record.value()));

上面程式碼主要做了2件事情:

1)初始化 KafkaProducer 例項

2)呼叫 send 介面傳送資料,支援同步和非同步回撥方式

待構造完  KafkaProducer   就正式進入生產者原始碼的入口了,如下圖所示:

接下來我們分析一下  KafkaProducer 的原始碼, 先看下該類裡面的 重要欄位

public class KafkaProducer<K, V> implements Producer<K, V> {
private final Logger log;
private static final String JMX_PREFIX = "kafka.producer";
public static final String NETWORK_THREAD_PREFIX = "kafka-producer-network-thread";
public static final String PRODUCER_METRIC_GROUP_NAME = "producer-metrics";
// 生產者客戶端Id
private final String clientId;
// 訊息分割槽器
private final Partitioner partitioner;
// 訊息的最大的長度,預設1M,生產環境可以提高到10M
private final int maxRequestSize;
// 傳送訊息的緩衝區的大小,預設32M
private final long totalMemorySize;
// 叢集元資料
private final ProducerMetadata metadata;
// 訊息累加器
private final RecordAccumulator accumulator;
// 執行傳送訊息的類
private final Sender sender;
// 執行傳送訊息的執行緒
private final Thread ioThread;
// 訊息壓縮型別
private final CompressionType compressionType;
// key的序列化器
private final Serializer<K> keySerializer;
// value的序列化器
private final Serializer<V> valueSerializer;
// 生產者客戶端引數配置
private final ProducerConfig producerConfig;
// 等待元資料更新的最大時間,預設1分鐘
private final long maxBlockTimeMs;
// 生產者攔截器
private final ProducerInterceptors<K, V> interceptors;
// api版本
private final ApiVersions apiVersions;
// 事務管理器
private final TransactionManager transactionManager;
........
}

重要且核心欄位含義如下:

1) clientId: 生產者客戶端的ID。

2) partitioner: 訊息的分割槽器,即通過某些演算法將訊息分配到某一個分割槽中。

3) maxRequestSize: 訊息的最大的長度,預設1M,生產環境可以提高到10M。

4) totalMemorySize: 傳送訊息的緩衝區的大小,預設32M。

5) metadata: 叢集的元資料。

6) accumulator: 訊息累加器,主要負責緩衝訊息。

7) sender: 執行傳送訊息的類,主要負責傳送訊息。

8) ioThread: 執行傳送訊息的執行緒,主要負責封裝Sender類。

9) compressionType: 訊息壓縮的型別。

10) keySerializer: key的序列化器。

11) valueSerializer: value的序列化器。

12) producerConfig :生產者客戶端的配置引數。

13) maxBlockTimeMs: 等待元資料更新和緩衝區分配的最長時間,預設1分鐘。

14) interceptors: 生產者攔截器。主要負責在訊息傳送前後對訊息進行攔截和處理。

接下來我們看下   KafkaProducer   的構造方法,來剖析生產者傳送訊息的過程中涉及到的 核心元件

原始碼位置:

kafka\clients\src\main\java\org\apache\kafka\clients\producer\KafkaProducer.java  323行

如果有不會安裝原始碼環境的話,可以參考之前的  Kafka原始碼之旅入門篇

public class KafkaProducer<K, V> implements Producer<K, V> {
......
KafkaProducer(Map<String, Object> configs,
Serializer<K> keySerializer,
Serializer<V> valueSerializer,
ProducerMetadata metadata,
KafkaClient kafkaClient,
ProducerInterceptors<K, V> interceptors,
Time time) {
// 1.生產者配置初始化
ProducerConfig config = new ProducerConfig(ProducerConfig.appendSerializerToConfig(configs, keySerializer,
valueSerializer));
try {
// 2.獲取客戶端配置引數
Map<String, Object> userProvidedConfigs = config.originals();
this.producerConfig = config;
this.time = time;
// 3.用於事務傳遞的TransactionalId,保證會話的可靠性,如果配置表示啟用冪等+事務
String transactionalId = (String) userProvidedConfigs.get(ProducerConfig.TRANSACTIONAL_ID_CONFIG);
// 4.設定生產者客戶端id
this.clientId = config.getString(ProducerConfig.CLIENT_ID_CONFIG);
LogContext logContext;
// 根據事務id是否配置來記錄不同日誌
if (transactionalId == null)
logContext = new LogContext(String.format("[Producer clientId=%s] ", clientId));
else
logContext = new LogContext(String.format("[Producer clientId=%s, transactionalId=%s] ", clientId, transactionalId));
log = logContext.logger(KafkaProducer.class);
log.trace("Starting the Kafka producer");
........省略Metrics
// 5.設定對應的分割槽器
this.partitioner = config.getConfiguredInstance(ProducerConfig.PARTITIONER_CLASS_CONFIG, Partitioner.class);
// 6.失敗重試的退避時間,配置引數:retry.backoff.ms 預設100ms
long retryBackoffMs = config.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG);
// 7.定義key、value對應的序列化器
if (keySerializer == null) {
this.keySerializer = config.getConfiguredInstance(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
this.keySerializer.configure(config.originals(Collections.singletonMap(ProducerConfig.CLIENT_ID_CONFIG, clientId)), true);
} else {
config.ignore(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG);
this.keySerializer = keySerializer;
}
if (valueSerializer == null) {
this.valueSerializer = config.getConfiguredInstance(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
this.valueSerializer.configure(config.originals(Collections.singletonMap(ProducerConfig.CLIENT_ID_CONFIG, clientId)), false);
} else {
config.ignore(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG);
this.valueSerializer = valueSerializer;
}
// load interceptors and make sure they get clientId
userProvidedConfigs.put(ProducerConfig.CLIENT_ID_CONFIG, clientId);
ProducerConfig configWithClientId = new ProducerConfig(userProvidedConfigs, false);
// 8.定義生產者攔截器列表
List<ProducerInterceptor<K, V>> interceptorList = (List) configWithClientId.getConfiguredInstances(
ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, ProducerInterceptor.class);
if (interceptors != null)
this.interceptors = interceptors;
else
this.interceptors = new ProducerInterceptors<>(interceptorList);
ClusterResourceListeners clusterResourceListeners = configureClusterResourceListeners(keySerializer,
valueSerializer, interceptorList, reporters);
// 9.設定訊息的最大的長度,預設1M,生產環境可以提高到10M
this.maxRequestSize = config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG);
// 10.設定傳送訊息的緩衝區的大小,預設32M
this.totalMemorySize = config.getLong(ProducerConfig.BUFFER_MEMORY_CONFIG);
// 11.設定訊息壓縮型別
this.compressionType = CompressionType.forName(config.getString(ProducerConfig.COMPRESSION_TYPE_CONFIG));
// 12.設定等待元資料更新的最大時間,預設1分鐘
this.maxBlockTimeMs = config.getLong(ProducerConfig.MAX_BLOCK_MS_CONFIG);
// 13.設定訊息投遞的超時時間
int deliveryTimeoutMs = configureDeliveryTimeout(config, log);
this.apiVersions = new ApiVersions();
// 事務管理器
this.transactionManager = configureTransactionState(config, logContext);
....省略,看下面各小節原始碼
config.logUnused();
AppInfoParser.registerAppInfo(JMX_PREFIX, clientId, metrics, time.milliseconds());
log.debug("Kafka producer started");
} catch (Throwable t) {
// call close methods if internal objects are already constructed this is to prevent resource leak. see KAFKA-2121
close(Duration.ofMillis(0), true);
// now propagate the exception
throw new KafkaException("Failed to construct kafka producer", t);
}
}
}

下面通過一張圖來描述  KafkaProducer   初始化原始碼過程:

Kafka Producer 初始化核心元件如下:

1)初始化生產者配置 (ProducerConfig)

2)設定客戶端配置檔案的配置資訊 (userProvidedConfigs)

3)設定事務ID (transactionaID)

4)設定生產者客戶端ID (clientId)

5)設定對應的分割槽器 (partitioner) 支援自定義,用來將訊息分配給某個主題的某個分割槽的。

6)設定失敗重試的退避時間 (retryBackoffMs) 。在客戶端請求服務端時,可能因為網路或服務端異常造成請求超時。這時請求失敗會重試,但是如果重試的頻率過高又可能造成服務端網路擁堵。因此必須等一段時間再請求,預設100ms。

7)初始化key的序列化器 (keySerializer) 和value的序列化器 (valueSerializer) 。key和value的序列化器是使用者在初始化 KafkaProducer 的時候自定義的。

8)設定生產者攔截器 (ProducerInterceptor) ,攔截器的主要作用是按照一定的規則統一對訊息進行處理。

9)設定 訊息的最大的長度 (maxRequestSize) 。預設是1M,超了會報異常。在生產環境中建議設定為10M。

10)設定傳送訊息的緩衝區的大小 (totalMemorySize) ,預設是32M。

11)設定訊息壓縮的型別 (compressionType) 。預設是none表示不壓縮。在訊息傳送的過程中,為了提升傳送訊息的吞吐量會把訊息進行壓縮再發送。

12)設定等待元資料更新和緩衝區分配的最長時間 (maxBlockTimeMs) ,預設60S。

13)設定訊息投遞超時時間 (deliveryTimeoutMs) ,預設120S。訊息投遞時間是從傳送到收到響應的時間。

我們分析了   KafkaProducer   的核心元件,接下來我們分析下初始化過程中的核心流程。

01

初始化訊息累加器

// 初始化訊息累加器---緩衝區
this.accumulator = new RecordAccumulator(logContext,
config.getInt(ProducerConfig.BATCH_SIZE_CONFIG),
this.compressionType,
lingerMs(config),
retryBackoffMs,
deliveryTimeoutMs,
metrics,
PRODUCER_METRIC_GROUP_NAME,
time,
apiVersions,
transactionManager,
new BufferPool(this.totalMemorySize, config.getInt(ProducerConfig.BATCH_SIZE_CONFIG), metrics, time, PRODUCER_METRIC_GROUP_NAME));

初始化訊息累加器物件 accumulator ,部分重要引數如下:

1) batchSize : 訊息批次大小,預設16KB;

2) compressionType: 訊息壓縮方式,主要包括none、gzip、snappy、lz4、zstd。預設是不進行壓縮,如果你的 Topic 佔用的磁碟空間比較多的話,可以考慮啟用壓縮,以節省資源。

3) lingerMs: 訊息 batch 延遲多久再發送的時間,這是吞吐量與延時之間的權衡。為了不頻繁傳送網路請求,設定延遲時間後 batch 會盡量積累更多的訊息再發送出去。

4) retryBackoffMs: 設定失敗重試的退避時間。

5) deliveryTimeoutMs: 設定訊息投遞超時時間。

6) apiVersion: 客戶端 api版本。

7) transactionalManager 事務管理器。

8) BufferPool 分配 後續篇在進行深度剖析。

訊息累加器---緩衝區的設計是 Kafka Producer 非常優秀和經典的設計 ,Kafka 中訊息不是生產後立馬就傳送給服務端的,而是 會先寫入一個緩衝池中,然後直到多條訊息組成了一個 Batch,達到一定條件才會一次網路通訊把 Batch 傳送過去 利用該設計來避免 JVM 頻繁的 Full GC 的問題 ,後續會單獨對其進行深度剖析。

02

初始化叢集元資料

元資料的獲取涉及的元件比較多,主要分為:

1)KafkaProducer 主執行緒負責載入元資料

2)Sender 子執行緒負責拉取元資料

首先我們來看下 KafkaProducer  主執行緒是如何載入元資料。

元資料 metadata 的初始化的時候是在   KafkaProducer  主執行緒裡面的,原始碼如下:

// 初始化 Kafka 叢集元資料,元資料會儲存到客戶端中,並與服務端元資料保持一致
if (metadata != null) {
this.metadata = metadata;
} else {
// 初始化叢集元資料
this.metadata = new ProducerMetadata(retryBackoffMs,
// 元資料過期時間:預設5分鐘
config.getLong(ProducerConfig.METADATA_MAX_AGE_CONFIG),
// topic最大空閒時間,如果在規定時間沒有被訪問,將從快取刪除,下次訪問時強制獲取元資料
config.getLong(ProducerConfig.METADATA_MAX_IDLE_CONFIG),
logContext,
clusterResourceListeners,
Time.SYSTEM);
// 啟動metadata的載入程式
this.metadata.bootstrap(addresses);
}

它會儲存在客戶端記憶體中,並與服務端保持準實時的資料一致性, 元資料 主要包含:

1) Kafka 叢集節點資訊

2) Topic 資訊

3) Topic對應的分割槽資訊

4)ISR列表資訊以及分佈情況

5)Leader Partition 所在節點

等等

從上面原始碼我們可以看出在   KafkaProducer   的構造方法中初始化了元資料類 metadata ,然後呼叫 metadata.bootstrap() 來啟動載入程式,這個時候 metaData 物件裡並沒有具體的元資料資訊,因為客戶端還沒傳送元資料更新的請求 獲取是通過喚醒 Sender 執行緒進行傳送的

而具體的傳送和拉取,我們將在下一篇中進行剖析。

03

初始化 Sender 執行緒

// 初始化 Sender 傳送執行緒類,並同時初始化NetworkClient
this.sender = newSender(logContext, kafkaClient, this.metadata);

這裡非常關鍵,初始化  Sender 傳送執行緒類,並同時初始化  NetworkClient ,它為 sender 提供了網路IO的能力,後續我們會對其深度剖析。

04

ioThread 啟用 Sender 執行緒

String ioThreadName = NETWORK_THREAD_PREFIX + " | " + clientId;
// 用 ioThread 執行緒來封裝 Sender 執行緒類,使用 demon 守護執行緒方式來啟動 Sender 執行緒類
this.ioThread = new KafkaThread(ioThreadName, this.sender, true);
this.ioThread.start();


public KafkaThread(final String name, Runnable runnable, boolean daemon) {
super(runnable, name);
configureThread(name, daemon);
}


private void configureThread(final String name, boolean daemon) {
setDaemon(daemon);
setUncaughtExceptionHandler((t, e) -> log.error("Uncaught exception in thread '{}':", name, e));
}

從上面原始碼可以看出使用 ioThread 執行緒來封裝 Sender 執行緒類,並使用 demon 守護執行緒方式來啟動 Sender 執行緒類。

這裡的設計模式非常值得我們去學習,就是在設計一些後臺執行緒的時候,可以把 執行緒 本身 執行緒執行 的邏輯分開,Sender 執行緒就是執行緒執行的具體邏輯,而 KafkaThread 其實代表了這個 執行緒本身 執行緒的名字 未捕獲的異常處理 deamon 執行緒設定 對 Kafka Thread 的啟動會自動執行 Sender 執行緒的 Run() 方法。

05

doSend 傳送

使用者可以直接使用 producer.send() 進行資料的傳送,先看一下 Send() 介面的原始碼實現。

// 向 topic 非同步傳送資料,此時回撥為空
public Future<RecordMetadata> send(ProducerRecord<K, V> record) {
return send(record, null);
}


// 向 topic 非同步地傳送資料,當傳送確認後喚起回撥函式
public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {
// intercept the record, which can be potentially modified; this method does not throw exceptions
ProducerRecord<K, V> interceptedRecord = this.interceptors.onSend(record);
return doSend(interceptedRecord, callback);
}

生產環境我們一般會使用帶回調函式的方式去傳送,所以最終實現還是呼叫了  KafkaProducer   doSend() 介面。

該方法只是把訊息傳送到緩衝區後直接返回,真正的傳送是需要等待 Sender 執行緒把訊息從緩衝區將訊息取出來後再進行傳送。

原始碼比較長,這裡只簡單的分析下都做了哪些事情,後續再進行深度剖析,原始碼如下

private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) {
TopicPartition tp = null;
try {
....省略
// 1.等待元資料更新即確認資料要傳送到的 topic 的 metadata 是可用的
clusterAndWaitTime = waitOnMetadata(record.topic(), record.partition(), nowMs, maxBlockTimeMs);
....省略
// 2.序列化 record的key和value
byte[] serializedKey;
serializedKey = keySerializer.serialize(record.topic(), record.headers(), record.key());
byte[] serializedValue;
serializedValue = valueSerializer.serialize(record.topic(), record.headers(), record.value());
// 3.獲取record訊息對應的分割槽
int partition = partition(record, serializedKey, serializedValue, cluster);
tp = new TopicPartition(record.topic(), partition);
....省略
// 4.驗證訊息的大小
ensureValidRecordSize(serializedSize);
// 5.組裝回調方法和攔截器為一個物件
Callback interceptCallback = new InterceptorCallback<>(callback, this.interceptors, tp);
....省略
// 6.向 accumulator 中追加資料
RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey,
serializedValue, headers, interceptCallback, remainingWaitMs, true, nowMs);
// 7.新的批次需要重新進行分割槽
if (result.abortForNewBatch) {
int prevPartition = partition;
partitioner.onNewBatch(record.topic(), cluster, prevPartition);
partition = partition(record, serializedKey, serializedValue, cluster);
tp = new TopicPartition(record.topic(), partition);
// producer callback will make sure to call both 'callback' and interceptor callback
interceptCallback = new InterceptorCallback<>(callback, this.interceptors, tp);
result = accumulator.append(tp, timestamp, serializedKey,
serializedValue, headers, interceptCallback, remainingWaitMs, false, nowMs);
}
// 8.如果 batch 已經滿了, 則喚醒 sender 執行緒傳送資料
if (result.batchIsFull || result.newBatchCreated) {
log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition);
this.sender.wakeup();
}
return result.future;
} catch (ApiException e) {
....省略
return new FutureFailure(e);
} catch (InterruptedException e) {
....省略
throw new InterruptException(e);
} catch (KafkaException e) {
....省略
throw e;
} catch (Exception e) {
....省略
throw e;
}
}

06

整體傳送流程

03 總結

這裡,我們一起來總結一下這篇文章的重點。

1、通過 場景驅動 的方式從生產者調用出發,丟擲初始化和傳送的過程是怎樣的?

2、帶你梳理了 Kafka Producer 初始化原始碼全貌 」,包含主執行緒的 核心元件模組以及訊息累加器的初始化、元資料初始化、 Sender 執行緒初始化流程。

3、最後通過一張整體傳送流程圖來勾勒出生產者傳送訊息的全貌。

下一篇我們來深度剖析 叢集 元資料獲取和管理流程 」,大家期待,我們下期見。

如果我的文章對你有所幫助,還請幫忙 點贊、在看、轉發 一下,非常感謝!

堅持總結, 持續輸出高質量文章   關注我: 華仔聊技

點個“贊”和“在看”鼓勵一下嘛~