Parallel Scan & Scroll an Elasticsearch Index

Written by amit.yogev90 | Published 2017/07/03
Tech Story Tags: elasticsearch | spark | hadoop | scan | scroll

TLDRvia the TL;DR App

Edit: I was told by /u/warkolm that elasticsearch provides a native solution to my problem called sliced scroll. This feature was introduced in Elasticsearch 5.0. If you have es < 5.0, you can still use the routing solution that I present here; and I hope that you find the other information also useful.

Elasticsearch provides a native api to scan and scroll over indexes. It means that you get a ‘cursor’ and you can scroll over it. You can use the scan helper method for an easier use of the scroll api:

The drawback with this action is that it limits you to one scroller. To get the next batch of documents, you’d need the next scroll_id, which you’ll get on the next scroll command.

Scanning a large index (millions++ documents) can take awhile. How can you scroll over a large index with millions of documents (or more) In a reasonable time? The solution is to parallel the scan.

Parallelism in Elasticsearch

In Elasticsearch each index is split into smaller elements known as shards. These shards are distributed across multiple nodes. You can define the number of primary shards and number of replicas to ensure data integrity if the primary shard fails, and to increase performance — replica shards can handle search requests.

Elasticsearch-hadoop uses shards for parallelism. To use Elasticsearch-hadoop you will need Spark or Hadoop cluster which can have a high overhead to use, especially if your task is very simple. We will try to implement a way to parallel the scan by ourselves.

The default number of shards in elasticsearch index is 5; Which means that elasticsearch-hadoop can parallel the scan to up to 5 parallel tasks. Determine the number of shards when you create the index to increase parallelism on large indexes.

The Shards Api

How can we implement the parallel scan by ourselves? Instead of scanning the entire index, we would like to scroll over each shard independently.

search shards api returns the shards that a search request would be executed against (link)

To get the shards & nodes information use:

$ curl -XGET http://elasticsearch:9200/index/_search_shards

The routing parameter determines which shard the request would be executed against.

When we query the search shards api with the routing parameter, the result will be the shard that the request will be executed against.

For example — search requests to index: my-index with routing hello will be executed against shard number 1:

$ curl http://elasticsearch:9200/my-index/_search_shards?routing=hello

{"nodes": {...},"shards": [[{"state": "STARTED","primary": true,"node": "nu81I57KRCWw8zq27fKd1A","shard": 1,"index": "index",}]]}

To find the complete mapping of routing to shard We can use the following script:

the result will be something like this:

{0: 491, 83: 493, 27: 465, 74: 403, 111: 508, 57: 404, 78: 495, 106: 463... }

This means that if you use the routing 491 the expected shard will be shard number zero.

Each shard holds about total documents / number of shards documents. We can verify this by sending a search request with routing 491. In my case I got ~ 3 million hits for shard zero with total of 356 million documents in the entire index which is ~120 shards (the actual shard count).

Implementing the parallel scan

We now have the shard to routing mapping. The next step is to scan and scroll each shard:

We managed to split the scan/scroll per shard, but the above method is still synchronous; Each shard scan has to finish before another shard scan can start. Let’s use multiprocessing.Pool to run a worker (shard scan) for each CPU on our local machine:

I used the pool to count the number of documents on each shard in a 5 shards index on a 4 CPUs machine (and added logs). The pool limits the number of workers to the number of CPUs — 4 parallel tasks.

You can see that the last worker started (**) only after the first worker finished (*).

[16:42:26.485186] Starting: ForkPoolWorker-1 Pid: 11447[16:42:26.485286] Starting: ForkPoolWorker-2 Pid: 11448[16:42:26.485437] Starting: ForkPoolWorker-3 Pid: 11449[16:42:26.485457] Starting: ForkPoolWorker-4 Pid: 11450

[16:42:28.235941] Name: ForkPoolWorker-4 Pid: 11450 Result: 3149[16:42:28.264004] Name: ForkPoolWorker-3 Pid: 11449 Result: 2152[16:42:28.581638] Name: ForkPoolWorker-1 Pid: 11447 Result: 0170[16:42:29.845960] Name: ForkPoolWorker-2 Pid: 11448 Result: 1133

* [16:42:33.240230] Exiting: ForkPoolWorker-4 Pid: 11450** [16:42:33.240581] Starting: ForkPoolWorker-4 Pid: 11450[16:42:33.264458] Exiting: ForkPoolWorker-3 Pid: 11449[16:42:33.586739] Exiting: ForkPoolWorker-1 Pid: 11447[16:42:34.850162] Exiting: ForkPoolWorker-2 Pid: 11448[16:42:34.851107] Name: ForkPoolWorker-4 Pid: 11450 Result: 5==148[16:42:39.854437] Exiting: ForkPoolWorker-4 Pid: 11450

Docker worker

The next step will be to distribute the workers over a group of machines. I’ll pack each worker as a docker image and run it in a fleet cluster.

To run a single docker you can use:

$ docker run -ti -e ES_HOST=*** -e ES_PORT=*** -e INDEX=index -e DOC_TYPE=doc_type -e SHARD=1 es-parallel

Each worker will get a shard number, determine the relevant routing number, run the scan_shard command and quit.

The worker code is similar to the previous sync and pool code, but it only runs one shard scan. You can see the final result of the worker here:

Parallel on fleet cluster

es-parallel@.service is an example unit file to start on a fleet cluster. Set the correct configuration and Let’s start the units with:

$ fleetctl start es-parallel@{0..4}

Where 4 is the number of shards (minus 1).

Notice that the ExecStart command uses the unit number as the shard number:

ExecStart=/bin/sh -c '\/usr/bin/docker run \--name %p-%i \-e ES_HOST="" \-e ES_PORT="" \-e INDEX="" \-e DOC_TYPE="" \-e ROUTING="%i" \es-parallel '

And the result:

$ fleetctl journal es-parallel@1

$ g-t-5 docker[43117]: Using default tag: latest$ g-t-5 docker[43117]: latest: Pulling from es-parallel$ g-t-5 systemd[1]: Started Es Parallel.$ g-t-5 sh[43160]: Namespace(doc_type='doc_type', es_auth=None, es_host='elasticsearch', es_port=9200, es_use_ssl=False, index='index', shard='1')$ g-t-5 sh[43160]: Result: 148$ g-t-5 docker[43312]: es-parallel-1

Notes and Ideas:

  • Currently — the results are printed to console. You can do whatever you want with them.
  • All workers will start at the same time. Make sure that your elasticsearch cluster can handle it. If not — you can write a script that starts workers as needed.

Sliced Scroll

Elasticsearch 5.0 introduced the sliced scroll feature — a native way to split the scroll to multiple slices:

GET /index/doc_type/_search?scroll=1m{"slice": {"id": 0,"max": 2},"query": { ... }}

GET /index/doc_type/_search?scroll=1m{"slice": {"id": 1,"max": 2},"query": { ... }}

max is the number of slices and id is the slice number. Max can be equal to the number of shards, lower or higher. Splitting is done first on the shards and then locally on each shard. This means that if max == num_of_shards each slice will be a scroll on a single shard.

Be aware that when the number of slices is bigger than the number of shards a memory cost operations occurs.

You can use the sliced scroll for parallel reindex, update by query and delete by query. Read more here.

Summary

es-parallel can be a nice replacement to the overhead of using Spark/Hadoop cluster on elasticsearch. It is great for mapping, filtering and selecting documents, But you (still) don’t have a way to reduce the results.

Find the source code here And the docker here


Published by HackerNoon on 2017/07/03