ElasticSearch 查詢流程詳解

語言: CN / TW / HK

一、前言

前面已經介紹了 ElasticSearch 的寫入流程,瞭解了 ElasticSearch 寫入時的分散式特性的相關原理。ElasticSearch 作為一款具有強大搜索功能的儲存引擎,它的讀取是什麼樣的呢?讀取相比寫入簡單的多,但是在使用過程中有哪些需要我們注意的呢?本篇文章會進行詳細的分析。

在前面的文章我們已經知道 ElasticSearch 的讀取分為兩種 GET 和 SEARCH。這兩種操作是有一定的差異的,下面我們先對這兩種核心的資料讀取方式進行一一分析。

二、GET 的流程

2.1 整體流程

(圖片來自官網)

以下是從主分片或者副本分片檢索文件的步驟順序:

  • 客戶端向 Node 1 傳送獲取請求

  • 節點使用文件的 _id 來確定文件屬於分片 0 。分片 0 的副本分片存在於所有的三個節點上。在這種情況下,它將請求轉發到 Node 2

  • Node 2 將文件返回給 Node 1,然後將文件返回給客戶端。

注意:

  • 在處理讀取請求時,協調節點在每次請求的時候都會通過輪詢所有的副本分片來達到負載均衡。

  • 在文件被檢索時,已經被索引的文件可能已經存在於主分片上但是還沒有複製到副本分片。在這種情況下,副本分片可能會報告文件不存在,但是主分片可能成功返回文件。一旦索引請求成功返回給使用者,文件在主分片和副本分片都是可用的

2.2 GET 詳細流程

2.2.1 協調節點處理過程

在協調節點有個 http_server_worker 執行緒池。收到讀請求後它的具體過程為:

  • 收到請求,先獲取叢集的狀態資訊

  • 根據路由資訊計算 id 是在哪一個分片上

  • 因為一個分片可能有多個副本分片,所以上述的計算結果是一個列表

  • 呼叫 transportServer 的 sendRequest 方法向目標傳送請求

  • 上一步的方法內部會檢查是否為本地 node,如果是的話就不會發送到網路,否則會非同步傳送

  • 等待資料節點回復,如果成功則返回資料給客戶端,否則會重試

  • 重試會發送上述列表的下一個。

2.2.2 資料節點處理過程

資料節點上有一個 get 執行緒池。收到了請求後,處理過程為:

  • 在資料節點有個 shardTransporthander 的 messageReceived 的入口專門接收協調節點發送的請求

private class ShardTransportHandler implements TransportRequestHandler<Request> {  @Override  public void messageReceived(final Request request, final TransportChannel channel, Task task) {      asyncShardOperation(request, request.internalShardId, new ChannelActionListener<>(channel, transportShardAction, request));  }}

複製程式碼

  • shardOperation 方法會先檢查是否需要 refresh,然後呼叫 indexShard.getService().get()讀取資料並存儲到 GetResult 中。

if (request.refresh() && !request.realtime()) {  indexShard.refresh("refresh_flag_get");}GetResult result = indexShard.getService().get(                    request.type(), request.id(),                     request.storedFields(), request.realtime(),                    request.version(), request.versionType(),                     request.fetchSourceContext());

複製程式碼

  • indexShard.getService().get()最終會呼叫 GetResult getResult = innerGet(……)用來獲取結果。即 ShardGetService#innerGet

private GetResult innerGet(String type, String id, String[] gFields, boolean realtime, long version, VersionType versionType, long ifSeqNo, long ifPrimaryTerm, FetchSourceContext fetchSourceContext) {      ................      Engine.GetResult get = null;          ............      get = indexShard.get(new Engine.Get(realtime, realtime, type, id, uidTerm).version(version).versionType(versionType).setIfSeqNo(ifSeqNo).setIfPrimaryTerm(ifPrimaryTerm));          ..........      if (get == null || get.exists() == false) {          return new GetResult(shardId.getIndexName(), type, id, UNASSIGNED_SEQ_NO, UNASSIGNED_PRIMARY_TERM, -1, false, null, null, null);      }  try {      return innerGetLoadFromStoredFields(type, id, gFields, fetchSourceContext, get, mapperService);  } finally {      get.close();  }

複製程式碼

  • 上面程式碼的 indexShard.get 讀取真正的資料時會最終呼叫:

  • org.elasticsearch.index.engine.InternalEngine#gett

public GetResult get(Get get, BiFunction<String, SearcherScope, Engine.Searcher> searcherFactory) throws EngineException {    try (ReleasableLock ignored = readLock.acquire()) {        ensureOpen();        SearcherScope scope;        if (get.realtime()) {            VersionValue versionValue = null;            try (Releasable ignore = versionMap.acquireLock(get.uid().bytes())) {                // we need to lock here to access the version map to do this truly in RT                versionValue = getVersionFromMap(get.uid().bytes());            }            if (versionValue != null) {                if (versionValue.isDelete()) {                    return GetResult.NOT_EXISTS;                }    。。。。。。    //刷盤操作     refreshIfNeeded("realtime_get", versionValue.seqNo);

複製程式碼

注意:

get 過程會加讀鎖。處理 realtime 選項,如果為 true,則先判斷是否有資料可以刷盤,然後呼叫 Searcher 進行讀取。Searcher 是對 IndexSearcher 的封裝在早期 realtime 為 true 則會從 tranlog 中讀取,後面只會從 index 的 lucene 讀取了。即實時的資料只在 lucene 之中。

  • innerGetLoadFromStoredFields 根據 type,id,filed,source 等資訊過濾,並將結果放到 getresult 之中返回

2.3 小結

  • GET 是根據 doc id 雜湊找到對應的 shard 的

  • get 請求預設是實時的,但是不同版本有差異,在 5.x 以前,讀不到寫的 doc 會從 translog 中去讀取,之後改為讀取不到會進行 refresh 到 lucene 中,因此現在的實時讀取需要複製一定的效能損耗的代價。如果對實時性要求不高,可以請求是手動帶上 realtime 為 false

三、search 流程

3.1 search 整體流程

對於 Search 類請求,ElasticSearch 請求是查詢 lucene 的 Segment,前面的寫入詳情流程也分析了,新增的文件會定時的 refresh 到磁碟中,所以搜尋是屬於近實時的。而且因為沒有文件 id,你不知道你要檢索的文件在哪個分配上,需要將索引的所有的分片都去搜索下,然後彙總。ElasticSearch 的 search 一般有兩個搜尋型別

  • dfs_query_and_fetch,流程複雜一些,但是算分的時候使用了全域性的一些指標,這樣獲取的結果可能更加精確一些。

  • query_then_fetch,預設的搜尋型別。

所有的搜尋系統一般都是兩階段查詢:

第一階段查詢到匹配的 docID,第二階段再查詢 DocID 對應的完整文件。這種在 ElasticSearch 中稱為 query_then_fetch,另一種就是一階段查詢的時候就返回完整 Doc,在 ElasticSearch 中叫 query_and_fetch,一般第二種適用於只需要查詢一個 Shard 的請求。因為這種一次請求就能將資料請求到,減少互動次數,二階段的原因是需要多個分片聚合彙總,如果資料量太大那麼會影響網路傳輸效率,所以第一階段會先返回 id。

除了上述的這兩種查詢外,還有一種三階段查詢的情況。

搜尋裡面有一種算分邏輯是根據 TF 和 DF 來計算 score 的,而在普通的查詢中,第一階段去每個 Shard 中獨立查詢時攜帶條件算分都是獨立的,即 Shard 中的 TF 和 DF 也是獨立的。雖然從統計學的基礎上資料量多的情況下,每一個分片的 TF 和 DF 在整體上會趨向於準確。但是總會有情況導致區域性的 TF 和 DF 不準的情況出現。

ElasticSearch 為了解決這個問題引入了 DFS 查詢。

比如 DFS_query_then_fetch,它在每次查詢時會先收集所有 Shard 中的 TF 和 DF 值,然後將這些值帶入請求中,再次執行 query_then_fetch,這樣算分的時候 TF 和 DF 就是準確的,類似的有 DFS_query_and_fetch。這種查詢的優勢是算分更加精準,但是效率會變差。

另一種選擇是用 BM25 代替 TF/DF 模型。

在 ElasticSearch7.x,使用者沒法指定以下兩種方式: DFS_query_and_fetchquery_and_fetch

注:這兩種算分的演算法模型在《ElasticSearch 實戰篇》有介紹:

這裡 query_then_fetch 具體的搜尋的流程圖如下:

(圖片來自官網)

查詢階段包含以下四個步驟:

  • 客戶端傳送一個 search 請求到 Node 3 , Node 3 會建立一個大小為 from + size 的空優先佇列。

  • Node 3 將查詢請求轉發到索引的每個主分片或副本分片中。每個分片在本地執行查詢並新增結果到大小為 from + size 的本地有序優先佇列中。

  • 每個分片返回各自優先佇列中所有文件的 ID 和排序值給協調節點,也就是 Node 3 ,它合併這些值到自己的優先佇列中來產生一個全域性排序後的結果列表。

  • 當一個搜尋請求被髮送到某個節點時,這個節點就變成了協調節點。這個節點的任務是廣播查詢請求到所有相關分片並將它們的響應整合成全域性排序後的結果集合,這個結果集合會返回給客戶端。

3.2 search 詳細流程

以上就是 ElasticSearch 的 search 的詳細流程,下面會對每一步進行進一步的說明。

3.2.1 協調節點

3.2.1.1 query 階段

協調節點處理 query 請求的執行緒池為:

http_server_work

  • 負責解析請求

負責該解析功能的類為:

org.elasticsearch.rest.action.search.RestSearchAction

@Overridepublic RestChannelConsumer prepareRequest(final RestRequest request, final NodeClient client) throws IOException {    SearchRequest searchRequest = new SearchRequest();    IntConsumer setSize = size -> searchRequest.source().size(size);    request.withContentOrSourceParamParserOrNull(parser ->        parseSearchRequest(searchRequest, request, parser, client.getNamedWriteableRegistry(), setSize));        。。。。。。。。。。。。    };}

複製程式碼

主要將 restquest 的引數封裝成 SearchRequest

這樣 SearchRequest 請求傳送給 TransportSearchAction 處理

  • 生成目的分片列表

將索引涉及到的 shard 列表或者有跨叢集訪問相關的 shard 列表合併

private void executeSearch(...........) { ........     //本叢集的列表分片列表   localShardIterators = StreamSupport.stream(localShardRoutings.spliterator(), false)              .map(it -> new SearchShardIterator(                  searchRequest.getLocalClusterAlias(), it.shardId(), it.getShardRoutings(), localIndices))              .collect(Collectors.toList());  .......  //遠端叢集的分片列表 final GroupShardsIterator<SearchShardIterator> shardIterators = mergeShardsIterators(localShardIterators, remoteShardIterators);  .......}

複製程式碼

  • 遍歷分片傳送請求

如果有多個分片位於同一個節點,仍然會發送多次請求

public final void run() {      ......      for (final SearchShardIterator iterator : toSkipShardsIts) {          assert iterator.skip();          skipShard(iterator);      }      ......      ......      if (shardsIts.size() > 0) {          //遍歷分片傳送請求          for (int i = 0; i < shardsIts.size(); i++) {              final SearchShardIterator shardRoutings = shardsIts.get(i);              assert shardRoutings.skip() == false;              assert shardItIndexMap.containsKey(shardRoutings);              int shardIndex = shardItIndexMap.get(shardRoutings);              //執行shard請求              performPhaseOnShard(shardIndex, shardRoutings, shardRoutings.nextOrNull());          }      ......

複製程式碼

shardsIts 為搜尋涉及的所有分片,而 shardRoutings.nextOrNull()會從分片的所有副本分片選出一個分片來請求。

  • 收集和合並請求

onShardSuccess 對收集到的結果進行合併,這裡需要檢查所有的請求是否都已經有了回覆。

然後才會判斷要不要進行 executeNextPhase

private void onShardResultConsumed(Result result, SearchShardIterator shardIt) {      successfulOps.incrementAndGet();      AtomicArray<ShardSearchFailure> shardFailures = this.shardFailures.get();      if (shardFailures != null) {          shardFailures.set(result.getShardIndex(), null);      }      successfulShardExecution(shardIt);  }  private void successfulShardExecution(SearchShardIterator shardsIt) {      ......      //計數器累加      final int xTotalOps = totalOps.addAndGet(remainingOpsOnIterator);      //是不是所有分都已經回覆,然後呼叫onPhaseDone();      if (xTotalOps == expectedTotalOps) {          onPhaseDone();      } else if (xTotalOps > expectedTotalOps) {          throw new AssertionError("unexpected higher total ops [" + xTotalOps + "] compared to expected [" + expectedTotalOps + "]",              new SearchPhaseExecutionException(getName(), "Shard failures", null, buildShardFailures()));      }  }

複製程式碼

當返回結果的分片數等於預期的總分片數時,協調節點會進入當前 Phase 的結束處理,啟動下一個階段 Fetch Phase 的執行。onPhaseDone()會 executeNextPhase 來執行下一個階段。

3.2.1.2 fetch 階段

當觸發了 executeNextPhase 方法將觸發 fetch 階段

  • 傳送 fetch 請求

上一步的 executeNextPhase 方法觸發 Fetch 階段,Fetch 階段的起點為 FetchSearchPhase#innerRun 函式,從查詢階段的 shard 列表中遍歷,跳過查詢結果為空的 shard。其中也會封裝一些分頁資訊的資料。

private void executeFetch(....){      //傳送請求     context.getSearchTransport().sendExecuteFetch(connection, fetchSearchRequest, context.getTask(),          new SearchActionListener<FetchSearchResult>(shardTarget, shardIndex) {              //處理成功的訊息              @Override              public void innerOnResponse(FetchSearchResult result) {                  try {                      progressListener.notifyFetchResult(shardIndex);                      counter.onResult(result);                  } catch (Exception e) {                      context.onPhaseFailure(FetchSearchPhase.this, "", e);                  }              }              //處理失敗的訊息              @Override              public void onFailure(Exception e) {                  ........              }          });}

複製程式碼

  • 收集結果

使用了 countDown 多執行緒工具,fetchResults 儲存某個分片的結果,每收到一個 shard 的資料就 countDoun 一下,當都完畢後,觸發 finishPhase。接著會進行下一步:

CountedCollector:

final CountedCollector<FetchSearchResult> counter = new CountedCollector<>(fetchResults, docIdsToLoad.length, finishPhase, context);

複製程式碼

finishPhase:

final Runnable finishPhase = ()  -> moveToNextPhase(searchPhaseController, queryResults, reducedQueryPhase, queryAndFetchOptimization ?  queryResults : fetchResults.getAtomicArray());

複製程式碼

  • 執行欄位聚合

執行欄位摺疊功能,有興趣可以研究下。即 ExpandSearchPhase 模組。ES 5.3 版本以後支援的 Field Collapsing 查詢。通過該類查詢可以輕鬆實現按 Field 值進行分類,每個分類獲取排名前 N 的文件。如在選單行為日誌中按選單名稱(使用者管理、角色管理等)分類,獲取每個選單排名點選數前十的員工。使用者也可以按 Field 進行 Aggregation 實現類似功能,但 Field Collapsing 會更易用、高效。

  • 回覆客戶端

ExpandSearchPhase 執行完了,就返回給客戶端結果了。

context.sendSearchResponse(searchResponse, queryResults);

複製程式碼

3.2.2 資料節點

處理資料節點請求的執行緒池為:search

根據前面的兩個階段,資料節點主要處理協調節點的兩類請求:query 和 fetch

  • 響應 query 請求

這裡響應的請求就是第一階段的 query 請求

transportService.registerRequestHandler(QUERY_ACTION_NAME, ThreadPool.Names.SAME, ShardSearchRequest::new,    (request, channel, task) -> {        //執行查詢        searchService.executeQueryPhase(request, keepStatesInContext(channel.getVersion()), (SearchShardTask) task,        //註冊結果監聽器            new ChannelActionListener<>(channel, QUERY_ACTION_NAME, request));    });

複製程式碼

executeQueryPhase:

public void executeQueryPhase(ShardSearchRequest request, boolean keepStatesInContext,                                SearchShardTask task, ActionListener<SearchPhaseResult> listener) {   ...........      final IndexShard shard = getShard(request);      rewriteAndFetchShardRequest(shard, request, new ActionListener<ShardSearchRequest>() {          @Override          public void onResponse(ShardSearchRequest orig) {                .......              //執行真正的請求              runAsync(getExecutor(shard), () -> executeQueryPhase(orig, task, keepStatesInContext), listener);          }      @Override      public void onFailure(Exception exc) {          listener.onFailure(exc);      }  });  }

複製程式碼

executeQueryPhase 會執行 loadOrExecuteQueryPhase 方法


private void loadOrExecuteQueryPhase(final ShardSearchRequest request, final SearchContext context) throws Exception { final boolean canCache = indicesService.canCache(request, context); context.getQueryShardContext().freezeContext(); if (canCache) { indicesService.loadIntoContext(request, context, queryPhase); } else { queryPhase.execute(context); } }

複製程式碼

這裡判斷是否從快取查詢,預設啟用快取,快取的演算法預設為 LRU,即刪除最近最少使用的資料。如果不啟用快取則會執行 queryPhase.execute(context);底層呼叫 lucene 進行檢索,並且進行聚合。

public void execute(SearchContext searchContext) throws QueryPhaseExecutionException {      .......      //聚合預處理      aggregationPhase.preProcess(searchContext);      .......         //全文檢索並打分      rescorePhase.execute(searchContext);      .......       //自動補全和糾錯      suggestPhase.execute(searchContext);      //實現聚合      aggregationPhase.execute(searchContext);      .......
}

複製程式碼

關鍵點:

  • 慢查詢日誌中的 query 日誌統計時間就是該步驟的時間;

  • 聚合 lucene 的操作也是在本階段完成;

  • 查詢的時候會使用 lRU 快取,快取為節點級別的;

  • 響應 fetch 請求;

transportService.registerRequestHandler(FETCH_ID_ACTION_NAME, ThreadPool.Names.SAME, true, true, ShardFetchSearchRequest::new,    (request, channel, task) -> {        searchService.executeFetchPhase(request, (SearchShardTask) task,            new ChannelActionListener<>(channel, FETCH_ID_ACTION_NAME, request));    });

複製程式碼

  • 執行 fetch;

  • 呼叫 fetchPhase 的 execute 方法獲取 doc;

  • 將結果封裝到 FetchSearchResult,呼叫網路元件傳送到 response。

3.3 小結

  • search 是比較耗費資源的,它需要遍歷相關的所有分片,每個分片可能有多個 lucene 段,那麼每個段都會遍歷一下,因此 ElasticSearch 的常見優化策略就是將段進行合併;

  • 分頁查詢的時候,即使是查後面幾頁,也會將前幾頁的資料聚合進行分頁,因此非常耗費記憶體,對於這種有深度分頁的需求可能要尋找其它的解決方式。

四、總結

ElasticSearch 查詢分為兩類,一類為 GET,另一類為 SEARCH。它們使用場景不同。

  • 如果對是實時性要求不高,可以 GET 查詢時不要重新整理來提升效能。

  • GET 讀取一個分片失敗後,會嘗試從其它分片讀取。

  • 慢 query 日誌是統計資料節點接收到了 query 請求後的耗時日誌。

  • 每次分頁的請求都是一次重新搜尋的過程,而不是從第一次搜尋的結果中獲取,這樣深度分頁會比較耗費記憶體。這樣也符合常見使用場景,因為基本只看前幾頁,很少深度分頁;如果確實有需要,可以採用 scoll 根據_scroll_id 查詢的方式。

  • 搜尋需要遍歷分片所有的 Lucene 分段,段的合併會對查詢效能有好處。

  • 聚會操作在 lucene 檢索完畢後 ElasticSearch 實現的。

本文主要分析了 ElasticSearch 分散式查詢主體流程,並未對 lucene 部分進行分析,有興趣的可以自行查詢相關資料。

程式設計師的核心競爭力其實還是技術,因此對技術還是要不斷的學習, 關注 “IT 巔峰技術” 公眾號 ,該公眾號內容定位:中高階開發、架構師、中層管理人員等中高階崗位服務的,除了技術交流外還有很多架構思想和實戰案例。

作者是 《 訊息中介軟體 RocketMQ 技術內幕》 一書作者, 同時也是 “RocketMQ 上海社群”聯合創始人 ,曾就職於拼多多、德邦等公司,現任上市快遞公司架構負責人,主要負責開發框架的搭建、中介軟體相關技術的二次開發和運維管理、混合雲及基礎服務平臺的建設。