讓Elasticsearch飛起來!百億級實時查詢優化實戰

2019-09-04 01:37:01

最近的一個項目是風控過程數據實時統計分析和聚合的一個 OLAP 分析監控平台,日流量峯值在 10 到 12 億上下,每年數據約 4000 億條,佔用空間大概 200T。


面對這樣一個數據量級的需求,我們的數據如何存儲和實現實時查詢將是一個嚴峻的挑戰。


經過對 Elasticsearch 多方調研和超過幾百億條數據的插入和聚合查詢的驗證之後,我們總結出以下幾種能夠有效提升性能和解決這一問題的方案:

  • 集羣規劃

  • 存儲策略

  • 索引拆分

  • 壓縮

  • 冷熱分區等


本文所使用的 Elasticsearch 版本為 5.3.3。

什麼是時序索引?其主要特點體現在如下兩個方面:

  • 存,以時間為軸,數據只有增加,沒有變更,並且必須包含 timestamp(日期時間,名稱隨意)字段。

    其作用和意義要大於數據的 id 字段,常見的數據比如我們通常要記錄的操作日誌、用户行為日誌、或股市行情數據、服務器 CPU、內存、網絡的使用率等。

  • 取,一定是以時間範圍為第一過濾條件,然後是其他查詢條件,比如近一天、一週、本月等等,然後在這個範圍內進行二次過濾。

    比如性別或地域等,查詢結果中比較關注的是每條數據和 timestamp 字段具體發生的時間點,而非 id。


此類數據一般用於 OLAP、監控分析等場景。


集羣部署規劃


我們都知道在 Elasticsearch(下稱 ES)集羣中有兩個主要角色:Master Node 和 Data Node,其他如 Tribe Node 等節點可根據業務需要另行設立。


為了讓集羣有更好的性能表現,我們應該對這兩個角色有一個更好的規劃,在 Nodes 之間做讀取分離,保證集羣的穩定性和快速響應,在大規模的數據存儲和查詢的壓力之下能夠坦然面對,各自愉快的協作。


Master Nodes


Master Node,整個集羣的管理者,負有對 index 的管理、shards 的分配,以及整個集羣拓撲信息的管理等功能。


眾所周知,Master Node 可以通過 Data Node 兼任,但是,如果對羣集規模和穩定要求很高的話,就要職責分離,Master Node 推薦獨立,它的狀態關乎整個集羣的存活。


Master 的配置:

node.master: true 
node.datafalse 
node.ingest: false


這樣 Master 不參與 I、O,從數據的搜索和索引操作中解脱出來,專門負責集羣的管理工作,因此 Master Node 的節點配置可以相對低一些。


另外防止 ES 集羣 split brain(腦裂),合理配置 discovery.zen.minimum_master_nodes 參數,官方推薦 master-eligible nodes / 2 + 1 向下取整的個數。


這個參數決定選舉 Master 的 Node 個數,太小容易發生“腦裂”,可能會出現多個 Master,太大 Master 將無法選舉。


更多 Master 選舉相關內容請參考:

https://www.elastic.co/guide/en/elasticsearch/reference/5.3/modules-discovery-zen.html#master-election


Data Nodes


Data Node 是數據的承載者,對索引的數據存儲、查詢、聚合等操作提供支持。


這些操作嚴重消耗系統的 CPU、內存、IO 等資源,因此,應該把最好的資源分配給 Data Node,因為它們是真正幹累活的角色,同樣 Data Node 也不兼任 Master 的功能。


Data 的配置:

node.master: false 
node.datatrue 
node.ingest: false


Coordinating Only Nodes

 

ES 本身是一個分佈式的計算集羣,每個 Node 都可以響應用户的請求,包括 Master Node、Data Node,它們都有完整的 Cluster State 信息。


正如我們知道的一樣,在某個 Node 收到用户請求的時候,會將請求轉發到集羣中所有索引相關的 Node 上,之後將每個 Node 的計算結果合併後返回給請求方。


我們暫且將這個 Node 稱為查詢節點,整個過程跟分佈式數據庫原理類似。那問題來了,這個查詢節點如果在併發和數據量比較大的情況下,由於數據的聚合可能會讓內存和網絡出現瓶頸。


因此,在職責分離指導思想的前提下,這些操作我們也應該從這些角色中剝離出來,官方稱它是 Coordinating Nodes,只負責路由用户的請求,包括讀、寫等操作,對內存、網絡和 CPU 要求比較高。


本質上,Coordinating Only Nodes 可以籠統的理解為是一個負載均衡器,或者反向代理,只負責讀,本身不寫數據,它的配置是:

node.masterfalse 
node.datafalse 
node.ingestfalse 
search.remote.connectfalse


增加 Coordinating Nodes 的數量可以提高 API 請求響應的性能,我們也可以針對不同量級的 Index 分配獨立的 Coordinating Nodes 來滿足請求性能。


那是不是越多越好呢?在一定範圍內是肯定的,但凡事有個度,過了負作用就會突顯,太多的話會給集羣增加負擔。


在做 Master 選舉的時候會先確保所有 Node 的 Cluster State 是一致的,同步的時候會等待每個 Node 的 Acknowledgement 確認,所以適量分配可以讓集羣暢快的工作。


search.remote.connect 是禁用跨集羣查詢,防止在進行集羣之間查詢時發生二次路由:

https://www.elastic.co/guide/en/elasticsearch/reference/current/modules-cross-cluster-search.html


Routing


類似於分佈式數據庫中的分片原則,將符合規則的數據存儲到同一分片。ES 通過哈希算法來決定數據存儲於哪個 Shard:

shard_num = hash(_routing) % num_primary_shards


其中 hash(_routing) 得出一個數字,然後除以主 Shards 的數量得到一個餘數,餘數的範圍是 0 到 number_of_primary_shards - 1,這個數字就是文檔所在的 Shard。


Routing 默認是 id 值,當然可以自定義,合理指定 Routing 能夠大幅提升查詢效率,Routing 支持 GET、Delete、Update、Post、Put 等操作。


如:

PUT my_index/my_type/1?routing=user1
{
  "title""This is a document"
}

GET my_index/my_type/1?routing=user1


不指定 Routing 的查詢過程:

簡單的來説,一個查詢請求過來以後會查詢每個 Shard,然後做結果聚合,總的時間大概就是所有 Shard 查詢所消耗的時間之和。


指定 Routing 以後:

會根據 Routing 查詢特定的一個或多個 Shard,這樣就大大減少了查詢時間,提高了查詢效率。


當然,如何設置 Routing 是一個難點,需要一點技巧,要根據業務特點合理組合 Routing 的值,來劃分 Shard 的存儲,最終保持數據量相對均衡。


可以組合幾個維度做為 Routing ,有點類似於 Hbase Key,例如不同的業務線加不同的類別,不同的城市和不同的類型等等,如:

  • _search?routing=beijing:按城市。

  • _search?routing=beijing_user123:按城市和用户。

  • _search?routing=beijing_android,shanghai_android:按城市和手機類型等。


數據不均衡?假如你的業務在北京、上海的數據遠遠大於其他二三線城市的數據。


再例如我們的業務場景,A 業務線的數據量級遠遠大於 B 業務線,有時候很難通過 Routing 指定一個值保證數據在所有 Shards 上均勻分佈,會讓部分 Shard 變的越來越大,影響查詢性能,怎麼辦?


一種解決辦法是單獨為這些數據量大的渠道創建獨立的 Index,如:

http://localhost:9200/shanghai,beijing,other/_search?routing=android


這樣可以根據需要在不同 Index 之間查詢,然而每個 Index 中 Shards 的數據可以做到相對均衡。


另一種辦法是指定 Index 參數 index.routing_partition_size,來解決最終可能產生羣集不均衡的問題,指定這個參數後新的算法如下:

shard_num = (hash(_routing) + hash(_id) % routing_partition_size) % num_primary_shards


index.routing_partition_size 應具有大於 1 且小於 index.number_of_shards 的值。


最終數據會在 routing_partition_size 幾個 Shard 上均勻存儲,是哪個 Shard 取決於 hash(_id) % routing_partition_size 的計算結果。


指定參數 index.routing_partition_size 後,索引中的 Mappings 必須指定 _routing 為 "required": true,另外 Mappings 不支持 parent-child 父子關係。


很多情況下,指定 Routing 後會大幅提升查詢性能,畢竟查詢的 Shard 只有那麼幾個,但是如何設置 Routing 是個難題,可根據業務特性巧妙組合。


索引拆分


Index 通過橫向擴展 Shards 實現分佈式存儲,這樣可以解決 Index 大數據存儲的問題。


但在一個 Index 變的越來越大,單個 Shard 也越來越大,查詢和存儲的速度也越來越慢。


更重要的是一個 Index 其實是有存儲上限的(除非你設置足夠多的 Shards 和機器),如官方聲明單個 Shard 的文檔數不能超過 20 億(受限於 Lucene index,每個 Shard 是一個 Lucene index)。


考慮到 I、O,針對 Index 每個 Node 的 Shards 數最好不超過 3 個,那面對這樣一個龐大的 Index,我們是採用更多的 Shards,還是更多的 Index,我們如何選擇?


Index 的 Shards 總量也不宜太多,更多的 Shards 會帶來更多的 I、O 開銷,其實答案就已經很明確,除非你能接受長時間的查詢等待。


Index 拆分的思路很簡單,時序索引有一個好處就是隻有增加,沒有變更,按時間累積,天然對索引的拆分友好支持,可以按照時間和數據量做任意時間段的拆分。


ES 提供的 Rollover Api + Index Template 可以非常便捷和友好的實現 Index 的拆分工作,把單個 index docs 數量控制在百億內,也就是一個 Index 默認 5 個 Shards 左右即可,保證查詢的即時響應。


簡單介紹一下 Rollover API 和 Index Template 這兩個東西,如何實現 index 的拆分。


Index Template


我們知道 ES 可以為同一目的或同一類索引創建一個 Index Template,之後創建的索引只要符合匹配規則就會套用這個 Template,不必每次指定 Settings 和 Mappings 等屬性。


一個 Index 可以被多個 Template 匹配,那 Settings 和 Mappings 就是多個 Template 合併後的結果。


有衝突通過 Template 的屬性"order" : 0 從低到高覆蓋(這部分據説會在 ES6 中會做調整,更好的解決 Template 匹配衝突問題)。


示例:

PUT _template/template_1
{
    "index_patterns" : ["log-*"],
    "order" : 0,
    "settings" : {
        "number_of_shards" : 5
    },
    "aliases" : {
        "alias1" : {}
    }
}


Rollover Index


Rollover Index 可以將現有的索引通過一定的規則,如數據量和時間,索引的命名必須是 logs-000001 這種格式,並指定 aliases,示例:

PUT /logs-000001 
{
  "aliases": {
    "logs_write": {}
  }
}

# Add > 1000 documents to logs-000001

POST /logs_write/_rollover 
{
  "conditions": {
    "max_age":   "7d",
    "max_docs":  1000
  }
}


先創建索引並指定別名 logs_write,插入 1000 條數據,然後請求 _rollover api 並指定拆分規則。


如果索引中的數據大於規則中指定的數據量或者時間過時,新的索引將被創建,索引名稱為 logs-000002,並根據規則套用 Index Template,同時別名 logs_write 也將被變更到 logs-000002。


注意事項:

  • 索引命名規則必須如同:logs-000001。

  • 索引必須指定 aliases。

  • Rollover Index API 調用時才去檢查索引是否超出指定規則,不會自動觸發,需要手動調用,可以通過 Curator 實現自動化。

  • 如果符合條件會創建新的索引,老索引的數據不會發生變化,如果你已經插入 2000 條,拆分後還是 2000 條。

  • 插入數據時一定要用別名,否則你可能一直在往一個索引裏追加數據。


技巧是按日期滾動索引:

PUT /now/d}-1>
{
  "aliases": {
    "logs_write": {}
  }
}


假如生成的索引名為 logs-2017.04.13-1,如果你在當天執行 Rollover 會生成 logs-2017.04.13-000001,次日的話是 logs-2017.04.14-000001。


這樣就會按日期進行切割索引,那如果你想查詢 3 天內的數據可以通過日期規則來匹配索引名,如:

GET /now/d}-*>,now/d-1d}-*>,now/d-2d}-*>/_search


到此,我們就可以通過 Index Template 和 Rollover API 實現對 Index 的任意拆分,並按照需要進行任意時間段的合併查詢,這樣只要你的時間跨度不是很大,查詢速度一般可以控制在毫秒級,存儲性能也不會遇到瓶頸。


Hot-Warm 架構


冷熱架構,為了保證大規模時序索引實時數據分析的時效性,可以根據資源配置不同將 Data Nodes 進行分類形成分層或分組架構。


一部分支持新數據的讀寫,另一部分僅支持歷史數據的存儲,存放一些查詢發生機率較低的數據。


即 Hot-Warm 架構,對 CPU,磁盤、內存等硬件資源合理的規劃和利用,達到性能和效率的最大化。


我們可以通過 ES 的 Shard Allocation Filtering 來實現 Hot-Warm 的架構。


實現思路如下:

  • 將 Data Node 根據不同的資源配比打上標籤,如:Host、Warm。

  • 定義 2 個時序索引的 Index Template,包括 Hot Template 和 Warm Template,Hot Template 可以多分配一些 Shard 和擁有更好資源的 Hot Node。

  • 用 Hot Template 創建一個 Active Index 名為 active-logs-1,別名 active-logs,支持索引切割。

    插入一定數據後,通過 roller over api 將 active-logs 切割,並將切割前的 Index 移動到 Warm Nodes 上,如 active-logs-1,並阻止寫入。

  • 通過 Shrinking API 收縮索引 active-logs-1 為 inactive-logs-1,原 Shard 為 5,適當收縮到 2 或 3,可以在 Warm Template 中指定,減少檢索的 Shard,使查詢更快。

  • 通過 force-merging api 合併 inactive-logs-1 索引每個 Shard 的 Segment,節省存儲空間。

  • 刪除 active-logs-1。


Hot,Warm Nodes


Hot Nodes


擁有最好資源的 Data Nodes,如更高性能的 CPU、SSD 磁盤、內存等資源,這些特殊的 Nodes 支持索引所有的讀、寫操作。


如果你計劃以 100 億為單位來切割 Index,那至少需要三個這樣的 Data Nodes,Index 的 Shard 數為 5,每個 Shard 支持 20 億 Documents 數據的存儲。


為這類 Data Nodes 打上標籤,以便我們在 Template 中識別和指定,啟動參數如下:

./bin/elasticsearch -Enode.attr.box_type=hot


或者配置文件:

node.attr.box_typehot


Warm Nodes


存儲只讀數據,並且查詢量較少,但用於存儲長多時間歷史數據的 Data Nodes,這類 Nodes 相對 Hot Nodes 較差的硬件配置,根據需求配置稍差的 CPU、機械磁盤和其他硬件資源,至於數量根據需要保留多長時間的數據來配比,同樣需要打上標籤,方法跟 Hot Nodes 一樣,指定為 Warm,box_type: warm。


Hot,Warm Template


Hot Template


我們可以通過指定參數"routing.allocation.include.box_type": "hot",讓所有符合命名規則索引的 Shard 都將被分配到 Hot Nodes 上:

PUT _template/active-logs
{
  "template""active-logs-*",
  "settings": {
    "number_of_shards":   5,
    "number_of_replicas": 1,
    "routing.allocation.include.box_type""hot",
    "routing.allocation.total_shards_per_node": 2
  },
  "aliases": {
    "active-logs":  {}
  }
}


Warm Template


同樣符合命名規則索引的 Shard 會被分配到 Warm Nodes 上,我們指定了更少的 Shards 數量和複本數。


注意,這裏的複本數為 0,和 best_compression 級別的壓縮,方便做遷移等操作,以及進行一些數據的壓縮:

PUT _template/inactive-logs
{
  "template""inactive-logs-*",
  "settings": {
    "number_of_shards":   1,
    "number_of_replicas": 0,
    "routing.allocation.include.box_type""warm",
    "codec""best_compression"
  }
}


假如我們已經創建了索引 active-logs-1 ,當然你可以通過 _bulk API 快速寫入測試的數據,然後參考上文中介紹的 Rollover API 進行切割。


Shrink Index


Rollover API 切割以後,active-logs-1 將變成一個冷索引,我們將它移動到 Warm Nodes 上。


先將索引置為只讀狀態,拒絕任何寫入操作,然後修改 index.routing.allocation.include.box_type 屬性,ES 會自動移動所有 Shards 到目標 Data Nodes 上:

PUT active-logs-1/_settings
{
  "index.blocks.write"true,
  "index.routing.allocation.include.box_type""warm"
}


Cluster Health API 可以查看遷移狀態,完成後進行收縮操作,其實相當於複製出來一個新的索引,舊的索引還存在。

POST active-logs-1/_shrink/inactive-logs-1


我們可以通過 Head 插件查看整個集羣索引的變化情況。關於 Shard 的分配請參考:

https://www.elastic.co/guide/en/elasticsearch/reference/current/shard-allocation-filtering.html


Forcemerge


到目前為止我們已經實現了索引的冷熱分離,和索引的收縮,我們知道每個 Shard 下面由多個 Segment 組成,那 inactive-logs-1 的 Shard 數是 1,但 Segment 還是多個。


這類索引不會在接受寫入操作,為了節約空間和改善查詢性能,通過 Forcemerge API 將 Segment 適量合併:

PUT inactive-logs-1/_settings
"number_of_replicas": 1 }


ES 的 Forcemerge 過程是先創建新的 Segment 刪除舊的,所以舊 Segment 的壓縮方式 best_compression 不會在新的 Segment 中生效,新的 Segment 還是默認的壓縮方式。


現在 inactive-logs-1 的複本還是 0,如果有需要的話,可以分配新的複本數:

PUT inactive-logs-1/_settings
"number_of_replicas": 1 }


最後刪除 active-logs-1,因為我們已經為它做了一個查詢複本 inactive-logs-1。


DELETE active-logs-1


走到這裏,我們就已經實現了 Index 的 Hot-Warm 架構,根據業務特點將一段時間的數據放在 Hot Nodes,更多的歷史數據存儲於 Warm Nodes。


其他優化方案


索引隔離


在多條業務線的索引共用一個 ES 集羣時會發生流量被獨吃獨佔的情況,因為大家都共用相同的集羣資源,流量大的索引會佔用大部分計算資源而流量小的也會被拖慢,得不到即時響應,或者説業務流量大的索引可以按天拆分,幾個流量小的索引可以按周或月拆分。


這種情況下我們可以將規模大的索引和其他相對小規模的索引獨立存儲,分開查詢或合併查詢。


除了 Master Nodes 以外,Data Nodes 和 Coordinating Nodes 都可以獨立使用(其實如果每個索引的量都特別大也應該採用這種架構),還有一個好處是對方的某個 Node 掛掉,自己不受影響。


同樣利用 ES 支持 Shard Allocation Filtering 功能來實現索引的資源獨立分配,先將 Nodes 進行打標籤,劃分區域,類似於 Hot-Warm 架構:

node.attr.zone=zone_a、node.attr.zone=zone_b


或者:

node.attr.zone =zone_hot_a、node.attr.zone=zone_hot_b


等打標籤的方式來區別對應不同的索引,然後在各自的 Index Template 中指定不同的 node.attr.zone 即可。


如"index.routing.allocation.include.zone" : "zone_a,zone_hot_a",

或者排除法"index.routing.allocation.exclude.size": "zone_hot_b"

分配到 zone_hot_b 以外的所有 Data Nodes 上。


更多用法可以參考 Hot-Warm 架構,或 shard-allocation-filtering:

https://www.elastic.co/guide/en/elasticsearch/reference/current/shard-allocation-filtering.html


跨數據中心


如果你的業務遍佈全國各地,四海八荒,如果你數據要存儲到多個機房,如果你的 Index 有幾萬個甚至更多( Index 特別多,集羣龐大會導致 Cluster State 信息量特別大,因為 Cluster State 包含了所有 Shard、Index、Node 等所有相關信息,它存儲在每個 Node 上,這些數據發生變化都會實時同步到所有 Node 上,當這個數據很大的時候會對集羣的性能造成影響)。


這些情況下我們會考慮部署多個 ES Cluster,那我們將如何解決跨集羣查詢的問題呢?


目前 ES 針對跨集羣操作提供了兩種方案 Tribe Node 和 Cross Cluster Search。


Tribe Node


需要一個獨立的 Node 節點,加入其他 ES Cluster,用法有點類似於 Coordinating Only Node。


所不同的是 Tribe 是針對多個 ES 集羣之間的所有節點,Tribe Node 收到請求廣播到相關集羣中所有節點,將結果合併處理後返回。


表面上看起來 Tribe Node 將多個集羣串連成了一個整體,遇到同名 Index 發生衝突,會選擇其中一個 Index,也可以指定:

tribe:
  on_conflictprefer_t1
  t1:
    cluster.nameus-cluster
    discovery.zen.ping.multicast.enabledfalse
    discovery.zen.ping.unicast.hosts['usm1','usm2','usm3']
  t2:
    cluster.nameeu-cluster
    discovery.zen.ping.multicast.enabledfalse
    discovery.zen.ping.unicast.hosts['eum1','eum2','eum3']


Cross Cluster Search


Cross Cluster Search 可以讓集羣中的任意一個節點聯合查詢其他集羣中的數據, 通過配置 elasticsearch.yml 或者 API 來啟用這個功能,API 示例:

PUT _cluster/settings
{
  "persistent": {
    "search": {
      "remote": {
        "cluster_one": {
          "seeds": [
            "127.0.0.1:9300"
          ]
    ...
        }
      }
    }
  }
}


提交以後整個集羣所有節點都會生效,都可以做為代理去做跨集羣聯合查詢,不過我們最好還是通過 Coordinating Only Nodes 去發起請求。

POST /cluster_one:decision,decision/_search
{
    "match_all": {}
}


對集羣 cluster_one 和本集羣中名為 Decision 的索引聯合查詢。


目前這個功能還在測試階段,但未來可能會取代 Tribe Node,之間的最大的差異是 Tribe Node 需要設置獨立的節點,而 Cross Cluster Search 不需要,集羣中的任意一個節點都可以兼任。


比如可以用我們的 Coordinating Only Nodes 做為聯合查詢節點,另一個優點是配置是動態的,不需要重啟節點。


實際上可以理解為是一個 ES 集羣之間特定的動態代理工具,支持所有操作,包括 Index 的創建和修改,並且通過 Namespace 對 Index 進行隔離,也解決了 Tribe Node 之 Index 名稱衝突的問題。


小結


我們在文中介紹了幾種方案用來解決時序索引的海量數據存儲和查詢的問題,根據業務特點和使用場景來單獨或組合使用能夠發揮出意想不到的效果。


特別是 Nodes 之間的讀寫分離、索引拆分、Hot-Warm 等方案的組合應用對索引的查詢和存儲性能有顯著的提升。


另外 Routing 在新版本中增加了 routing_partition_size,解決了 Shard 難以均衡的問題。


如果你的索引 Mapping 中沒有 parent-child 關聯關係可以考慮使用,對查詢的性能提升非常有效。


本文參考內容:

  • hot-warm-architecture-in-elasticsearch-5-x

  • managing-time-based-indices-efficiently

  • what-is-evolving-in-elasticsearch


作者:土豆的奧特之父

編輯:陶家龍、孫淑娟



●編號876,輸入編號直達本文

●輸入m獲取到文章目錄

推薦↓↓↓

Python編程

已同步到看一看
在看



熱點新聞