How to conditionally clone incoming documents to a different index and add a field

I have a custom Beat publishing documents for a number of "system" types (ex. Web Server, App Server, Client Device, Database Server, etc.). Each system type can have tens, hundreds or thousands of instances (ex. Database Server 1, Database Server 2, ..., Database Server 2500). Each document the Beat publishes contains several hundred fields, including an "InstanceId" field (uniquely identifying the system) and a "Created Time" field (the time the system sent the data to the Beat).

I want to have 2 separate indices for each "system" type being published. The first index, the "All" index, contains every document received for the day for the system type and would typically contain tens of millions of records. The second index, the "Latest" index, contains only the latest record for each "InstanceId" within the type. The reason for this approach is the "Latest" index will be queried/aggregated on a per second basis and performing the queries/aggregations against a few thousand records will be much more efficient than performing the queries/aggregations against tens of millions of records. However, each record needs to be available (in the "All" index) for data analysis and reporting.

I am trying to determine the most efficient way to clone an incoming document for the "All" index to the "Latest" index, so that an Upsert occurs on the "latest" index, but only if the "Created Time" field on the incoming document is later than the "Created Time" field for the matching document (based on "InstanceId") on the "Latest" index.

The incoming document would not have the "Id" field set so that a new record is created in the "All" index for every document received. The clone process would need to copy the "InstanceId" field into the "Id" field of the cloned document so that the Upsert occurs in the "Latest" index.

I could obviously do the duplication on the Beat side, but that would result in twice as much data being transferred over the wire to Elasticsearch, which would be not be possible given data transfer constraints. I'm currently publishing from the Beat directly to Elasticsearch, generating the "All" index. Is it possible to accomplish the stated goal solely in Elasticsearch, or do I need to introduce Logstash in to the process. If I do need Logstash, is there an existing plugin that could accomplish the stated goal, or would I need to author a custom plugin?

Thanks for any insight/suggestions.


why dont you just use time-based indices and maybe even index sorting to speed things up? No need to store data twice?


Hi Alexander. Thanks for responding.

My thinking was if I'm doing hundreds of varying queries/aggregations per minute on just the latest document for each distinct entity (roughly a couple of thousand distinct entities per index) in indices with several million to 50 million documents, it would be worth the additional indices/disk space to get the performance gains of having an index with just the <0.01% of the documents I'm interested in and not having to include something in every query to isolate the latest documents.

I'll look in to your suggestion, but I don't have much experience with either of those features. Is it plausible that using time-based indices (and maybe even index sorting) could yield similar gains to what I described?

Thanks again.


This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.