Near Real-Time Indexing With ElasticSearch
Choosing your indexing strategy is hard. The Elasticsearch documentation does have some general recommendations, and there are some tips from other companies, but it also depends on the particular usecase. In the typical scenario you have a database as the source of truth, and you have an index that makes things searchable. And you can have the following strategies:
- Index as data comes – you insert in the database and index at the same time. It makes sense if there isn’t too much data; otherwise indexing becomes very inefficient.
- Store in database, index with scheduled job – this is probably the most common approach and is also easy to implement. However, it can have issues if there’s a lot of data to index, as it has to be precisely fetched with (from, to) criteria from the database, and your index lags behind the actual data with the number of seconds (or minutes) between scheduled job runs
- Push to a message queue and write an indexing consumer – you can run something like RabbitMQ and have multiple consumers that poll data and index it. This is not straightforward to implement because you have to poll multiple items in order to leverage batch indexing, and then only mark them as consumed upon successful batch execution – somewhat transactional behaviour.
- Queue items in memory and flush them regularly – this may be good and efficient, but you may lose data if a node dies, so you have to have some sort of healthcheck based on the data in the database
- Hybrid – do a combination of the above; for example if you need to enrich the raw data and update the index at a later stage, you can queue items in memory and then use “store in database, index with scheduled job” to update the index and fill in any missing item. Or you can index as some parts of the data come, and use another strategy for the more active types of data
We have recently decided to implement the “queue in memory” approach (in combination with another one, as we have to do some scheduled post-processing anyway). And the first attempt was to use a class provided by the Elasticsearch client – the BulkProcessor. The logic is clear – accumulate index requests in memory and flush them to Elasticsearch in batches either if a certain limit is reached, or at a fixed time interval. So at most every X seconds and at most at every Y records there will be a batch index request. That achieves near real-time indexing without putting too much stress on Elasticsearch. It also allows multiple bulk indexing requests at the same time, as per Elasticsearch recommendations.
However, we are using the REST API (via Jest) which is not supported by the BulkProcessor. We tried to plug a REST indexing logic instead of the current native one, and although it almost worked, in the process we noticed something worrying – the internalAdd
method, which gets invoked every time an index request is added to the bulk, is synchronized
. Which means threads will block, waiting for each other to add stuff to the bulk. This sounded suboptimal and risky for production environments, so we went for a separate implementation. It can be seen here – ESBulkProcessor.
It allows for multiple threads to flush to Elasticsearch simultaneously, but only one thread (using a lock) to consume from the queue in order to form the batches. Since this is a fast operation, it’s fine to have it serialized. And not because the concurrent queue can’t handle multiple threads reading from it – it can; but reaching the condition for forming the bulk by multiple threads at the same time will result in several small batches rather than one big one, hence the need for only one consumer at a time. This is not a huge problem so the lock can be removed. But it’s important to note it’s not blocking.
This has been in production for a while now and doesn’t seem to have any issues. I will report any changes if there are such due to increased load or edge cases.
It’s important to reiterate the issue if this is the only indexing logic – your application node may fail and you may end up with missing data in Elasticsearch. We are not in that scenario, and I’m not sure which is the best approach to remedy it – be it to do a partial reindex of recent data in case of a failed server, or a batch process the checks if there aren’t mismatches between the database and the index. Of course, we should also say that you may not always have a database – sometimes Elasticsearch is all you have for data storage, and in that case some sort of queue persistence is needed.
The ultimate goal is to have a near real-time indexing as users will expect to see their data as soon as possible, while at the same time not overwhelming the Elasticsearch cluster.
The topic of “what’s the best way to index data” is huge and I hope I’ve clarified it at least a little bit and that our contribution makes sense for other scenarios as well.
Choosing your indexing strategy is hard. The Elasticsearch documentation does have some general recommendations, and there are some tips from other companies, but it also depends on the particular usecase. In the typical scenario you have a database as the source of truth, and you have an index that makes things searchable. And you can have the following strategies:
- Index as data comes – you insert in the database and index at the same time. It makes sense if there isn’t too much data; otherwise indexing becomes very inefficient.
- Store in database, index with scheduled job – this is probably the most common approach and is also easy to implement. However, it can have issues if there’s a lot of data to index, as it has to be precisely fetched with (from, to) criteria from the database, and your index lags behind the actual data with the number of seconds (or minutes) between scheduled job runs
- Push to a message queue and write an indexing consumer – you can run something like RabbitMQ and have multiple consumers that poll data and index it. This is not straightforward to implement because you have to poll multiple items in order to leverage batch indexing, and then only mark them as consumed upon successful batch execution – somewhat transactional behaviour.
- Queue items in memory and flush them regularly – this may be good and efficient, but you may lose data if a node dies, so you have to have some sort of healthcheck based on the data in the database
- Hybrid – do a combination of the above; for example if you need to enrich the raw data and update the index at a later stage, you can queue items in memory and then use “store in database, index with scheduled job” to update the index and fill in any missing item. Or you can index as some parts of the data come, and use another strategy for the more active types of data
We have recently decided to implement the “queue in memory” approach (in combination with another one, as we have to do some scheduled post-processing anyway). And the first attempt was to use a class provided by the Elasticsearch client – the BulkProcessor. The logic is clear – accumulate index requests in memory and flush them to Elasticsearch in batches either if a certain limit is reached, or at a fixed time interval. So at most every X seconds and at most at every Y records there will be a batch index request. That achieves near real-time indexing without putting too much stress on Elasticsearch. It also allows multiple bulk indexing requests at the same time, as per Elasticsearch recommendations.
However, we are using the REST API (via Jest) which is not supported by the BulkProcessor. We tried to plug a REST indexing logic instead of the current native one, and although it almost worked, in the process we noticed something worrying – the internalAdd
method, which gets invoked every time an index request is added to the bulk, is synchronized
. Which means threads will block, waiting for each other to add stuff to the bulk. This sounded suboptimal and risky for production environments, so we went for a separate implementation. It can be seen here – ESBulkProcessor.
It allows for multiple threads to flush to Elasticsearch simultaneously, but only one thread (using a lock) to consume from the queue in order to form the batches. Since this is a fast operation, it’s fine to have it serialized. And not because the concurrent queue can’t handle multiple threads reading from it – it can; but reaching the condition for forming the bulk by multiple threads at the same time will result in several small batches rather than one big one, hence the need for only one consumer at a time. This is not a huge problem so the lock can be removed. But it’s important to note it’s not blocking.
This has been in production for a while now and doesn’t seem to have any issues. I will report any changes if there are such due to increased load or edge cases.
It’s important to reiterate the issue if this is the only indexing logic – your application node may fail and you may end up with missing data in Elasticsearch. We are not in that scenario, and I’m not sure which is the best approach to remedy it – be it to do a partial reindex of recent data in case of a failed server, or a batch process the checks if there aren’t mismatches between the database and the index. Of course, we should also say that you may not always have a database – sometimes Elasticsearch is all you have for data storage, and in that case some sort of queue persistence is needed.
The ultimate goal is to have a near real-time indexing as users will expect to see their data as soon as possible, while at the same time not overwhelming the Elasticsearch cluster.
The topic of “what’s the best way to index data” is huge and I hope I’ve clarified it at least a little bit and that our contribution makes sense for other scenarios as well.
For our microservice platform which is doing real time sports data streaming (around 30 services and growing with hundreds of instances), there was a requirement to audit data produced by every service.
As we were using Kafka as a databus for gluing services together, it was a natural choice to use the message queue approach as data is already in the queue. Using a separate consumer group, a Kafka consumer application receives the messages and bulk indexes them.
Contrary to what you wrote, I found it quite straightforward to do this by leveraging primitives of Kafka itself. Consumer is configured to poll only a limited number of records to avoid load issues on elastic cluster (using max.poll.records) and to wait for a configurable time to fetch as much bytes as possible from Kafka cluster to improve throughput. This works really well with near real time performance with predictable load on elastic cluster.
What I wonder is why you are not using Java High level Rest API from elastic itself.
Good that Kafka has been easy to use. But is it consumed by a single node, or multiple nodes? And once you consume a message, it has to stay in the queue, or is dequeued?
As for the elasticsearch rest client – it just wasn’t there when the project was started (it was started with Elasticsearch 5.6)
Messages stay in the topic till retention period expires. The transactional behaviour you mentioned is achieved by committing offset manually after receiving success response for bulk indexing from elastic cluster