Dumping raw data in custom format

I have a use case where I'd like to be able to dump all the documents in
ES to a specific output format. However, using scan or any other
"consistent" view is relatively slow. Using the scan query with a
"match_all", it processes items at a rate of around 80,000 a second--but
that means it will still take over 5 hours to dump. It also means it can't
be parallelized across machines, which effectively stops scaling.

I've also looked at things like Knapsack, Elastidump, etc., but these still
don't give me the ability to parallelize the work, and they're not
particularly fast. They also don't allow me to manipulate it to the
specific format I want (it's not JSON, and requires some organization of
the data).

So I have a few ideas, which may or may not be possible:

  1. Retrieve shard-specific data from ElasticSearch (i.e., "Give me all
    the data for Shard X"). This would allow me to divide the task up into /at
    least/ S tasks, where S is the number of segments, but there doesn't seem
    to be an API that exposes this.
  2. Get snapshots of each shard from disk. This would also allow me to
    divide up the work, but would also require a framework on top to coordinate
    which segments have been retrieved, etc..
  3. Hadoop. However, launching an entire MR cluster just to dump data
    sounds like overkill.

The first option gives me the most flexibility and would require the least
amount of work on my part, but there doesn't seem to be any way to dump all
the data for a specific shard via the API. Is there any sort of API or
flag that provides this, or otherwise provides a way to partition the data
to different consumers?

The second would also (assumingly) give me the ability to subdivide tasks
out per worker, and would also allow these to be done offline. I was able
to write a sample program that uses Lucene to do this, but this adds the
additional complexity of coordinating work across the various hosts in the
cluster, as well as requiring an intermediate step where I transfer the
common files to another host to combine them. This isn't a terrible
problem to have--but does require additional infrastructure to organize.

The third is not desirable because it's an incredible amount of operational
load without a clear tradeoff, since we don't already have a map reduce
cluster on hand.

Thanks for any tips or suggestions!

Andrew

--
You received this message because you are subscribed to the Google Groups "elasticsearch" group.
To unsubscribe from this group and stop receiving emails from it, send an email to elasticsearch+unsubscribe@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/elasticsearch/97b93fb9-fa7b-4e82-922c-98e8fb48103b%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Forgot to mention--the data set size is around 1.6 billion documents.

On Tuesday, February 10, 2015 at 9:29:39 AM UTC-8, Andrew McFague wrote:

I have a use case where I'd like to be able to dump all the documents in
ES to a specific output format. However, using scan or any other
"consistent" view is relatively slow. Using the scan query with a
"match_all", it processes items at a rate of around 80,000 a second--but
that means it will still take over 5 hours to dump. It also means it can't
be parallelized across machines, which effectively stops scaling.

I've also looked at things like Knapsack, Elastidump, etc., but these
still don't give me the ability to parallelize the work, and they're not
particularly fast. They also don't allow me to manipulate it to the
specific format I want (it's not JSON, and requires some organization of
the data).

So I have a few ideas, which may or may not be possible:

  1. Retrieve shard-specific data from ElasticSearch (i.e., "Give me all
    the data for Shard X"). This would allow me to divide the task up into /at
    least/ S tasks, where S is the number of segments, but there doesn't seem
    to be an API that exposes this.
  2. Get snapshots of each shard from disk. This would also allow me to
    divide up the work, but would also require a framework on top to coordinate
    which segments have been retrieved, etc..
  3. Hadoop. However, launching an entire MR cluster just to dump data
    sounds like overkill.

The first option gives me the most flexibility and would require the least
amount of work on my part, but there doesn't seem to be any way to dump all
the data for a specific shard via the API. Is there any sort of API or
flag that provides this, or otherwise provides a way to partition the data
to different consumers?

The second would also (assumingly) give me the ability to subdivide tasks
out per worker, and would also allow these to be done offline. I was able
to write a sample program that uses Lucene to do this, but this adds the
additional complexity of coordinating work across the various hosts in the
cluster, as well as requiring an intermediate step where I transfer the
common files to another host to combine them. This isn't a terrible
problem to have--but does require additional infrastructure to organize.

The third is not desirable because it's an incredible amount of
operational load without a clear tradeoff, since we don't already have a
map reduce cluster on hand.

Thanks for any tips or suggestions!

Andrew

--
You received this message because you are subscribed to the Google Groups "elasticsearch" group.
To unsubscribe from this group and stop receiving emails from it, send an email to elasticsearch+unsubscribe@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/elasticsearch/91cebf19-dc58-48bf-80fa-839a7cea4596%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Use the scan/scroll API with different queries (filter by document type
etc), from a custom tool written in Java. This will be the fastest.

--

Itamar Syn-Hershko
http://code972.com | @synhershko https://twitter.com/synhershko
Freelance Developer & Consultant
Lucene.NET committer and PMC member

On Tue, Feb 10, 2015 at 7:41 PM, Andrew McFague redmumba@gmail.com wrote:

Forgot to mention--the data set size is around 1.6 billion documents.

On Tuesday, February 10, 2015 at 9:29:39 AM UTC-8, Andrew McFague wrote:

I have a use case where I'd like to be able to dump all the documents
in ES to a specific output format. However, using scan or any other
"consistent" view is relatively slow. Using the scan query with a
"match_all", it processes items at a rate of around 80,000 a second--but
that means it will still take over 5 hours to dump. It also means it can't
be parallelized across machines, which effectively stops scaling.

I've also looked at things like Knapsack, Elastidump, etc., but these
still don't give me the ability to parallelize the work, and they're not
particularly fast. They also don't allow me to manipulate it to the
specific format I want (it's not JSON, and requires some organization of
the data).

So I have a few ideas, which may or may not be possible:

  1. Retrieve shard-specific data from ElasticSearch (i.e., "Give me
    all the data for Shard X"). This would allow me to divide the task up into
    /at least/ S tasks, where S is the number of segments, but there doesn't
    seem to be an API that exposes this.
  2. Get snapshots of each shard from disk. This would also allow me
    to divide up the work, but would also require a framework on top to
    coordinate which segments have been retrieved, etc..
  3. Hadoop. However, launching an entire MR cluster just to dump data
    sounds like overkill.

The first option gives me the most flexibility and would require the
least amount of work on my part, but there doesn't seem to be any way to
dump all the data for a specific shard via the API. Is there any sort of
API or flag that provides this, or otherwise provides a way to partition
the data to different consumers?

The second would also (assumingly) give me the ability to subdivide tasks
out per worker, and would also allow these to be done offline. I was able
to write a sample program that uses Lucene to do this, but this adds the
additional complexity of coordinating work across the various hosts in the
cluster, as well as requiring an intermediate step where I transfer the
common files to another host to combine them. This isn't a terrible
problem to have--but does require additional infrastructure to organize.

The third is not desirable because it's an incredible amount of
operational load without a clear tradeoff, since we don't already have a
map reduce cluster on hand.

Thanks for any tips or suggestions!

Andrew

--
You received this message because you are subscribed to the Google Groups
"elasticsearch" group.
To unsubscribe from this group and stop receiving emails from it, send an
email to elasticsearch+unsubscribe@googlegroups.com.
To view this discussion on the web visit
https://groups.google.com/d/msgid/elasticsearch/91cebf19-dc58-48bf-80fa-839a7cea4596%40googlegroups.com
https://groups.google.com/d/msgid/elasticsearch/91cebf19-dc58-48bf-80fa-839a7cea4596%40googlegroups.com?utm_medium=email&utm_source=footer
.
For more options, visit https://groups.google.com/d/optout.

--
You received this message because you are subscribed to the Google Groups "elasticsearch" group.
To unsubscribe from this group and stop receiving emails from it, send an email to elasticsearch+unsubscribe@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/elasticsearch/CAHTr4Zv9-%3DEsiY1DpzjT8SzQ8jSg7rYrH04UPqYHpwOq2nyMOw%40mail.gmail.com.
For more options, visit https://groups.google.com/d/optout.

With a match_all query using the scan API, it still takes 5.5 hours.
Profiling it shows that around 48% of the time is being spent waiting for
ES to respond with the next set of data--which represents a significant
portion of the runtime. This is with only a single thread consuming the
data--indicating diminishing returns as we add more consumers around the
single network bottleneck.

Andrew

On Tuesday, February 10, 2015 at 9:45:51 AM UTC-8, Itamar Syn-Hershko wrote:

Use the scan/scroll API with different queries (filter by document type
etc), from a custom tool written in Java. This will be the fastest.

--

Itamar Syn-Hershko
http://code972.com | @synhershko https://twitter.com/synhershko
Freelance Developer & Consultant
Lucene.NET committer and PMC member

On Tue, Feb 10, 2015 at 7:41 PM, Andrew McFague <redm...@gmail.com
<javascript:>> wrote:

Forgot to mention--the data set size is around 1.6 billion documents.

On Tuesday, February 10, 2015 at 9:29:39 AM UTC-8, Andrew McFague wrote:

I have a use case where I'd like to be able to dump all the documents
in ES to a specific output format. However, using scan or any other
"consistent" view is relatively slow. Using the scan query with a
"match_all", it processes items at a rate of around 80,000 a second--but
that means it will still take over 5 hours to dump. It also means it can't
be parallelized across machines, which effectively stops scaling.

I've also looked at things like Knapsack, Elastidump, etc., but these
still don't give me the ability to parallelize the work, and they're not
particularly fast. They also don't allow me to manipulate it to the
specific format I want (it's not JSON, and requires some organization of
the data).

So I have a few ideas, which may or may not be possible:

  1. Retrieve shard-specific data from ElasticSearch (i.e., "Give me
    all the data for Shard X"). This would allow me to divide the task up into
    /at least/ S tasks, where S is the number of segments, but there doesn't
    seem to be an API that exposes this.
  2. Get snapshots of each shard from disk. This would also allow me
    to divide up the work, but would also require a framework on top to
    coordinate which segments have been retrieved, etc..
  3. Hadoop. However, launching an entire MR cluster just to dump
    data sounds like overkill.

The first option gives me the most flexibility and would require the
least amount of work on my part, but there doesn't seem to be any way to
dump all the data for a specific shard via the API. Is there any sort of
API or flag that provides this, or otherwise provides a way to partition
the data to different consumers?

The second would also (assumingly) give me the ability to subdivide
tasks out per worker, and would also allow these to be done offline. I was
able to write a sample program that uses Lucene to do this, but this adds
the additional complexity of coordinating work across the various hosts in
the cluster, as well as requiring an intermediate step where I transfer the
common files to another host to combine them. This isn't a terrible
problem to have--but does require additional infrastructure to organize.

The third is not desirable because it's an incredible amount of
operational load without a clear tradeoff, since we don't already have a
map reduce cluster on hand.

Thanks for any tips or suggestions!

Andrew

--
You received this message because you are subscribed to the Google Groups
"elasticsearch" group.
To unsubscribe from this group and stop receiving emails from it, send an
email to elasticsearc...@googlegroups.com <javascript:>.
To view this discussion on the web visit
https://groups.google.com/d/msgid/elasticsearch/91cebf19-dc58-48bf-80fa-839a7cea4596%40googlegroups.com
https://groups.google.com/d/msgid/elasticsearch/91cebf19-dc58-48bf-80fa-839a7cea4596%40googlegroups.com?utm_medium=email&utm_source=footer
.
For more options, visit https://groups.google.com/d/optout.

--
You received this message because you are subscribed to the Google Groups "elasticsearch" group.
To unsubscribe from this group and stop receiving emails from it, send an email to elasticsearch+unsubscribe@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/elasticsearch/a5512288-1fa1-4a8b-b6e6-cd380089379a%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

you can try spark on es. spark will treat shard as partition,it will parallelized across shard, also parallelized across machines.