Analytics on data stored in ES

Hi,

We are storing lots of mail messages in ES with multiple fields. 600
Millions+ messages across 3 ES nodes.

There is a custom algorithm which works on batch of messages to correlate
based on fields & other message semantics.
Final result involves groups of messages returned similar to say field
collapsing type results.

Currently we fetch 100K+ messages from ES & apply this logic to return
final results to user. The algo can't be modeled using aggregations.

Obviously this is not scalable approach if say we want to process 100 M
messages as part of this processing & return results in few mins.The
messages are large & partitioned across few ES nodes. We want to main data
locality while processing so as not to download lots of data from ES over
network.

Any way to execute some code over shards from within ES, fine if done as
part of postFilter as well. What are options available before thinking
about Hadoop/Spark using es-hadoop library?

Solr seems to be having such a plugin hook(experimental) for custom
processing.
https://cwiki.apache.org/confluence/display/solr/AnalyticsQuery+API

Thanks,
Ram

--
You received this message because you are subscribed to the Google Groups "elasticsearch" group.
To unsubscribe from this group and stop receiving emails from it, send an email to elasticsearch+unsubscribe@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/elasticsearch/f98a4bcb-2d9b-4aca-b49d-9afce519a69a%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Hi Ram,
we have built something similar for a compliance analytics
application. Consider the following:

  • The feeding pipeline should perform any tagging, extractions,
    enrichments, classification as much as possible. The results will be
    indexed. Usually, that takes care of some computationally intensive
    tasks (e.g., complex entity extraction, relationship extraction) and
    prepares for later analytics by providing proper entities to work on. As
    messages usually don't change (i.e., once indexed, you will keep them
    unchanged for the rest of their lifetime), spending a bit more compute
    time in feeding is fine.

  • You don't have to store the original message contents in
    Elasticsearch. Try Apache Cassandra and only index a message id in
    Elasticsearch, that can be used to retrieve the original message from
    Cassandra or simply from a file storage (in the case of
    compliance/e-discovery, it tends to be an immutable file storage). In
    our application, relevant meta-data is only about 60% of the source
    volume, so storing original messages somewhere else would require only
    about 38% of the Elasticsearch storage required for both.

  • Your queries may become complex, but you can scale with more replica
    and nodes, or simply more RAM as necessary. Unless you're talking about
    SMS messages, three nodes seems tight.

  • If you need to do some query-time analytics, fetch the candidate
    records and use aggregations if possible. Aggregations may not do the
    entire job, but simply help finding the candidates. You may want to run
    a first query to obtain just aggregations without result hits, and then
    run one or more queries to get the actual candidate sets. Querying
    should be considered "cheap", so having multiple queries is fine.

  • Now do the extra analytics on the query result set obtained. For this
    purpose, you should to look into Apache Spark to handle fast in-memory
    processing of this data set if you really have a number of small,
    parallel jobs with a significant divergence of run-times. As the scaling
    properties of Elasticsearch retrieval and the post-query processing will
    most likely be quite different, I would not recommend using any form of
    plug-in for Elasticsearch (or Solr).

  • If I take the dimensioning from my application and calculate that for
    600 M e-mail messages, I would get (average size of 10 kB ex
    attachments, plus derived meta-data of approx. another 6 kB of text)
    around 10 TB of raw data. Three nodes seems to be a bit short for this
    application. I don't know about the RAM and CPU sizings in your case,
    but you should consider going to a definitely larger number of nodes.

Some thoughts... your mileage may vary :slight_smile:

Best regards,
--Jürgen

On 12.12.2014 06:04, Ramchandra Phadake wrote:

Hi,

We are storing lots of mail messages in ES with multiple fields. 600
Millions+ messages across 3 ES nodes.

There is a custom algorithm which works on batch of messages to
correlate based on fields & other message semantics.
Final result involves groups of messages returned similar to say field
collapsing type results.

Currently we fetch 100K+ messages from ES & apply this logic to return
final results to user. The algo can't be modeled using aggregations.

Obviously this is not scalable approach if say we want to process 100
M messages as part of this processing & return results in few mins.The
messages are large & partitioned across few ES nodes. We want to main
data locality while processing so as not to download lots of data from
ES over network.

Any way to execute some code over shards from within ES, fine if done
as part of postFilter as well. What are options available before
thinking about Hadoop/Spark using es-hadoop library?

Solr seems to be having such a plugin hook(experimental) for custom
processing.
https://cwiki.apache.org/confluence/display/solr/AnalyticsQuery+API

Thanks,
Ram

--
You received this message because you are subscribed to the Google
Groups "elasticsearch" group.
To unsubscribe from this group and stop receiving emails from it, send
an email to elasticsearch+unsubscribe@googlegroups.com
mailto:elasticsearch+unsubscribe@googlegroups.com.
To view this discussion on the web visit
https://groups.google.com/d/msgid/elasticsearch/f98a4bcb-2d9b-4aca-b49d-9afce519a69a%40googlegroups.com
https://groups.google.com/d/msgid/elasticsearch/f98a4bcb-2d9b-4aca-b49d-9afce519a69a%40googlegroups.com?utm_medium=email&utm_source=footer.
For more options, visit https://groups.google.com/d/optout.

--

Mit freundlichen Grüßen/Kind regards/Cordialement vôtre/Atentamente/С
уважением
i.A. Jürgen Wagner
Head of Competence Center "Intelligence"
& Senior Cloud Consultant

Devoteam GmbH, Industriestr. 3, 70565 Stuttgart, Germany
Phone: +49 6151 868-8725, Fax: +49 711 13353-53, Mobile: +49 171 864 1543
E-Mail: juergen.wagner@devoteam.com
mailto:juergen.wagner@devoteam.com, URL: www.devoteam.de
http://www.devoteam.de/


Managing Board: Jürgen Hatzipantelis (CEO)
Address of Record: 64331 Weiterstadt, Germany; Commercial Register:
Amtsgericht Darmstadt HRB 6450; Tax Number: DE 172 993 071

--
You received this message because you are subscribed to the Google Groups "elasticsearch" group.
To unsubscribe from this group and stop receiving emails from it, send an email to elasticsearch+unsubscribe@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/elasticsearch/548A93B2.4080006%40devoteam.com.
For more options, visit https://groups.google.com/d/optout.

Thanks Jürgen for the detailed reply.

-We have considered on doing some processing at indexing time which is
known like type, metadata extraction. But kept this limited due to impact
on size and relied on-demand analytics for further expansion. One more
aspect is building some pipeline and right now we have single entry
processing so not looking at a window of messages for complex processing
but yes this will need to built in near future.

  • Thanks we will certainly consider cassandra option but right now going
    with ES as message store. Need to handle synchronization the moment we add
    one more store.

  • Current average is ~5KB which comes to 3+ TB which is working out fine
    across 3 nodes for now. Since this is across few categorized indexes its
    working fine.
    We will be adding few more nodes & replication for few indexes. We are
    using 16GB RAM nodes with quad core CPUs.

  • Yes aggregations are being used in application for all normal stats and
    with some group by logic as well. And thats working perfect.

  • Yes we are checking on Spark & related complexity in deployment. I have
    different opinion here. ES/Solr are now eating into NoSQL space & there are
    different use-cases for which its getting used than just search. Search
    based analytics is becoming prominent as well. we don't need full fledged
    MR layer as such in ES/Solr but something more simpler. Some custom
    processing which can check each relevant entry and then decide what results
    should be returned to user. Since the candidates list are as per user query
    this needs to be done on-demand.

Thanks,
Ram

On Friday, December 12, 2014 10:34:46 AM UTC+5:30, Ramchandra Phadake wrote:

Hi,

We are storing lots of mail messages in ES with multiple fields. 600
Millions+ messages across 3 ES nodes.

There is a custom algorithm which works on batch of messages to correlate
based on fields & other message semantics.
Final result involves groups of messages returned similar to say field
collapsing type results.

Currently we fetch 100K+ messages from ES & apply this logic to return
final results to user. The algo can't be modeled using aggregations.

Obviously this is not scalable approach if say we want to process 100 M
messages as part of this processing & return results in few mins.The
messages are large & partitioned across few ES nodes. We want to main data
locality while processing so as not to download lots of data from ES over
network.

Any way to execute some code over shards from within ES, fine if done as
part of postFilter as well. What are options available before thinking
about Hadoop/Spark using es-hadoop library?

Solr seems to be having such a plugin hook(experimental) for custom
processing.
https://cwiki.apache.org/confluence/display/solr/AnalyticsQuery+API

Thanks,
Ram

--
You received this message because you are subscribed to the Google Groups "elasticsearch" group.
To unsubscribe from this group and stop receiving emails from it, send an email to elasticsearch+unsubscribe@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/elasticsearch/6ca7536a-12e8-4665-88d4-e0a2bd032fea%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Hi,,

Consider a non-data master node, this can improve data handling and search
speed a lot as I understand.

On Friday, December 12, 2014 6:04:46 AM UTC+1, Ramchandra Phadake wrote:

Hi,

We are storing lots of mail messages in ES with multiple fields. 600
Millions+ messages across 3 ES nodes.

There is a custom algorithm which works on batch of messages to correlate
based on fields & other message semantics.
Final result involves groups of messages returned similar to say field
collapsing type results.

Currently we fetch 100K+ messages from ES & apply this logic to return
final results to user. The algo can't be modeled using aggregations.

Obviously this is not scalable approach if say we want to process 100 M
messages as part of this processing & return results in few mins.The
messages are large & partitioned across few ES nodes. We want to main data
locality while processing so as not to download lots of data from ES over
network.

Any way to execute some code over shards from within ES, fine if done as
part of postFilter as well. What are options available before thinking
about Hadoop/Spark using es-hadoop library?

Solr seems to be having such a plugin hook(experimental) for custom
processing.
https://cwiki.apache.org/confluence/display/solr/AnalyticsQuery+API

Thanks,
Ram

--
You received this message because you are subscribed to the Google Groups "elasticsearch" group.
To unsubscribe from this group and stop receiving emails from it, send an email to elasticsearch+unsubscribe@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/elasticsearch/5094ea6e-2b96-4fb2-a2ba-e542db009865%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Yes in general the fetch can be improved using standalone clients. I am NOT
saying that data nodes are a bottleneck as of now.Indexing is not impacting
the search.

The point I am raising is data locality. Data is spread over few shards
across few machines.Need to perform processing on this data without
explicit fetch outside ES.
I want to return sample of grouped entries after going over all matched
entries within an index.

Thanks,
Ram

On Saturday, December 13, 2014 9:24:11 PM UTC+5:30, Arie wrote:

Hi,,

Consider a non-data master node, this can improve data handling and search
speed a lot as I understand.

--
You received this message because you are subscribed to the Google Groups "elasticsearch" group.
To unsubscribe from this group and stop receiving emails from it, send an email to elasticsearch+unsubscribe@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/elasticsearch/4af3e662-f9a7-4200-84bf-4cc6af5e4d7d%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.