Guaranteeing Data Completeness


(Leo) #1

This is a long explanation, so thank you in advance for anyone who reads it and thinks about it!

Data generation

I have N remote processes that gather some live data, format it into documents, and send off batched index requests to my ElasticSearch (ES) cluster. Some of these processes produce documents very frequently (1 batch / sec consistently), while others might send every couple minutes or even hours. These processes might fall behind, get stuck, or ES might service their index requests in an unexpected order. Also, I can tell which process a document comes from by the value of a certain field F (there is a one-to-one mapping F value -> generation process).

###Users
On the other end, there are users that make calls to an API that I have control over, which is implemented by making one (or several) queries to ES for the near-live data coming in from the N processes.

###Goal
I want to design the API such that the users get data that is complete, at the cost of having it be less near real-time. That is, when the user calls my API, the response should only contain documents whose timestamps are less than T_end; where T_end is the latest timestamp up to which all documents produced by the N processes are indexed into ES. Put another way, if at time T, ES has indexed X documents with timestamps <= T_end, then for any other T’ > T, ES should still have only X documents with timestamps <= T_end.

I’m just trying to formalize the point that I don’t want the users to get incomplete data from the N processes. Note that is only a problem for N > 1 (when N = 1, it’s trivial).

Solution 1

I have one idea for how to make this guarantee. When the user calls the API, I make 2 queries:

  1. term aggregate by F, and find the max timestamp per bucket; the minimum of all these timestamps is T_end
  2. make the regular query, with a range filter for the timestamp to be <= T_end

One issue with this solution is that some of the gathering processes emit documents every couple hours, while others emit every second. This means that T_end could be much older than it needs to be.

Example

process A emits documents at 9:00, 9:01, … 12:00, 12:01, … 13:58, 13:59
process B emits documents at 9:00 and 12:00

If a user makes a query at 14:00, I want to return ALL of the A documents both of the B documents. However, with the solution above I would have T_end = 12:00, so all of the A documents after 12:00 would be omitted.

###Solution 2
I have each data gathering process send a heartbeat by making an update to a special “heartbeat” document in ES once every second (and have 1 heartbeat document for each generation process). Thus, when the user calls the API, the queries become:

  1. get the N heartbeat documents (regular, non-aggregated query); the smallest timestamp is T_end
  2. same as last time, query for documents with timestamp <= T_end

One implication of having a heartbeat is that each data generator would need to be either multi-threaded or have an event loop. In the generators that already send batches of document per second, the heartbeat update could be included in the batched request, thus saving some of the connection handshake overhead.

##Questions

  • Are there any mechanisms in ElasticSearch that might indicate when the data was last complete? (as defined above)
  • Are there any reasons that the heartbeat solution would not work with ES?
  • Are there any technologies that sit in between data generation processes and ES that will solve this problem easily? (logstash, kafka, etc.)

(Adrien Grand) #2

The problem of your definition of complete is that it depends on information that Elasticsearch does not have access to, so you somehow need to make it aware of it by some other way, like your "solution 2". I think this 2nd solution is a good idea, I would just make sure to put this data in a separate (single-shard) index in order to know add unnecessary deletions to your main index. You might also want to take the refresh_interval into account: after you push data to ES, documents might take up to refresh_interval before becoming visible, so I guess you would need to take the min value of your heartbeat index and then subtract refresh_interval in order to be sure to have a consistent view over your data.


(Leo) #3

Thank you Adrien!

The data being is inserted append-only into these indices, so it's a good recommendation to have a separate index for heartbeats.

The guarantee provided by the refresh_interval is essential to having the heartbeat work accurately. Good you pointed this out as well.


(system) #4

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.