Elasticsearch-hadoop for spark, index documents from a RDD in different index by day: myindex-2014-01-01 for example

Hi,

I work on a complex workflow using Spark (Parsing, Cleaning, Machine
Learning....).
At the end of the workflow I want to send aggregated results to
elasticsearch so my portal could query data.
There will be two types of processing: streaming and the possibility to
relaunch workflow on all available data.

Right now I use elasticsearch-hadoop and particularly the spark part to
send document to elasticsearch with the saveJsonToEs(myindex, mytype)
method.
The target is to have an index by day using the proper template that we
build.
AFAIK you could not add consideration of a feature in a document to send it
to the proper index in elasticsearch-hadoop.

What is the proper way to implement this feature?
Have a special step useing spark and bulk so that each executor send
documents to the proper index considering the feature of each line?
Is there something that I missed in elasticsearch-hadoop?

Julien

--
You received this message because you are subscribed to the Google Groups "elasticsearch" group.
To unsubscribe from this group and stop receiving emails from it, send an email to elasticsearch+unsubscribe@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/elasticsearch/58b0e0e3-a297-4cf4-95bf-d3cf34546ea3%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

1 Like

I think I have a solution:
Build JSON files so I could send it directly to _bulk
saveJsonToEs("_bulk")

Not sure if it will be optimized or even worked, I'll try.

On Thursday, January 15, 2015 at 4:17:57 PM UTC+1, Julien Naour wrote:

Hi,

I work on a complex workflow using Spark (Parsing, Cleaning, Machine
Learning....).
At the end of the workflow I want to send aggregated results to
elasticsearch so my portal could query data.
There will be two types of processing: streaming and the possibility to
relaunch workflow on all available data.

Right now I use elasticsearch-hadoop and particularly the spark part to
send document to elasticsearch with the saveJsonToEs(myindex, mytype)
method.
The target is to have an index by day using the proper template that we
build.
AFAIK you could not add consideration of a feature in a document to send
it to the proper index in elasticsearch-hadoop.

What is the proper way to implement this feature?
Have a special step useing spark and bulk so that each executor send
documents to the proper index considering the feature of each line?
Is there something that I missed in elasticsearch-hadoop?

Julien

--
You received this message because you are subscribed to the Google Groups "elasticsearch" group.
To unsubscribe from this group and stop receiving emails from it, send an email to elasticsearch+unsubscribe@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/elasticsearch/b9bba847-9e64-4336-92d9-80cd52c081d8%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

My previous idea doesn't seem to work. Cannot send documents directly to
_bulk only to "index/type" pattern

On Thursday, January 15, 2015 at 4:17:57 PM UTC+1, Julien Naour wrote:

Hi,

I work on a complex workflow using Spark (Parsing, Cleaning, Machine
Learning....).
At the end of the workflow I want to send aggregated results to
elasticsearch so my portal could query data.
There will be two types of processing: streaming and the possibility to
relaunch workflow on all available data.

Right now I use elasticsearch-hadoop and particularly the spark part to
send document to elasticsearch with the saveJsonToEs(myindex, mytype)
method.
The target is to have an index by day using the proper template that we
build.
AFAIK you could not add consideration of a feature in a document to send
it to the proper index in elasticsearch-hadoop.

What is the proper way to implement this feature?
Have a special step useing spark and bulk so that each executor send
documents to the proper index considering the feature of each line?
Is there something that I missed in elasticsearch-hadoop?

Julien

--
You received this message because you are subscribed to the Google Groups "elasticsearch" group.
To unsubscribe from this group and stop receiving emails from it, send an email to elasticsearch+unsubscribe@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/elasticsearch/f01bc8d0-0c04-4c82-8ddf-dc301b06179c%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

I implemented a solution for my problem.
I use a foreachPartitions and instantiate a bulk processor using a
transport client (i.e. one by partition) to send documents.
It's not fast but it works.
Somebody have an idea to be more efficient?

Julien

On Thursday, January 15, 2015 at 4:40:22 PM UTC+1, Julien Naour wrote:

My previous idea doesn't seem to work. Cannot send documents directly to
_bulk only to "index/type" pattern

On Thursday, January 15, 2015 at 4:17:57 PM UTC+1, Julien Naour wrote:

Hi,

I work on a complex workflow using Spark (Parsing, Cleaning, Machine
Learning....).
At the end of the workflow I want to send aggregated results to
elasticsearch so my portal could query data.
There will be two types of processing: streaming and the possibility to
relaunch workflow on all available data.

Right now I use elasticsearch-hadoop and particularly the spark part to
send document to elasticsearch with the saveJsonToEs(myindex, mytype)
method.
The target is to have an index by day using the proper template that we
build.
AFAIK you could not add consideration of a feature in a document to send
it to the proper index in elasticsearch-hadoop.

What is the proper way to implement this feature?
Have a special step useing spark and bulk so that each executor send
documents to the proper index considering the feature of each line?
Is there something that I missed in elasticsearch-hadoop?

Julien

--
You received this message because you are subscribed to the Google Groups "elasticsearch" group.
To unsubscribe from this group and stop receiving emails from it, send an email to elasticsearch+unsubscribe@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/elasticsearch/dad84f38-cc36-4351-9d13-e0d1f461ebe9%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Hi Julien,

I'm unclear of what you are trying to achieve and what doesn't work.
es-hadoop allows either a static index/type or a dynamic one [1] [2]. One can also use a 'formatter' so for example
you can use a pattern like "{@timestamp:YYYY-MM-dd}" - meaning the field @timestamp will be used as a target but first
it will formatted into year/month/day.

There's work underway to extend that for API/real-time environments, if the global settings (which are pluggable are not
enough)
like Spark to customize the metadata per entry [3].

Hope this helps,

[1] http://www.elasticsearch.org/guide/en/elasticsearch/hadoop/master/configuration.html#cfg-multi-writes
[2] http://www.elasticsearch.org/guide/en/elasticsearch/hadoop/master/spark.html#spark-write-dyn
[3] https://github.com/elasticsearch/elasticsearch-hadoop/issues/358

On 1/19/15 4:50 PM, Julien Naour wrote:

I implemented a solution for my problem.
I use a foreachPartitions and instantiate a bulk processor using a transport client (i.e. one by partition) to send
documents.
It's not fast but it works.
Somebody have an idea to be more efficient?

Julien

On Thursday, January 15, 2015 at 4:40:22 PM UTC+1, Julien Naour wrote:

My previous idea doesn't seem to work. Cannot send documents directly to _bulk only to "index/type" pattern

On Thursday, January 15, 2015 at 4:17:57 PM UTC+1, Julien Naour wrote:

    Hi,

    I work on a complex workflow using Spark (Parsing, Cleaning, Machine Learning....).
    At the end of the workflow I want to send aggregated results to elasticsearch so my portal could query data.
    There will be two types of processing: streaming and the possibility to relaunch workflow on all available data.

    Right now I use elasticsearch-hadoop and particularly the spark part to send document to elasticsearch with the
    saveJsonToEs(myindex, mytype) method.
    The target is to have an index by day using the proper template that we build.
    AFAIK you could not add consideration of a feature in a document to send it to the proper index in
    elasticsearch-hadoop.

    What is the proper way to implement this feature?
    Have a special step useing spark and bulk so that each executor send documents to the proper index considering
    the feature of each line?
    Is there something that I missed in elasticsearch-hadoop?

    Julien

--
You received this message because you are subscribed to the Google Groups "elasticsearch" group.
To unsubscribe from this group and stop receiving emails from it, send an email to
elasticsearch+unsubscribe@googlegroups.com mailto:elasticsearch+unsubscribe@googlegroups.com.
To view this discussion on the web visit
https://groups.google.com/d/msgid/elasticsearch/dad84f38-cc36-4351-9d13-e0d1f461ebe9%40googlegroups.com
https://groups.google.com/d/msgid/elasticsearch/dad84f38-cc36-4351-9d13-e0d1f461ebe9%40googlegroups.com?utm_medium=email&utm_source=footer.
For more options, visit https://groups.google.com/d/optout.

--
Costin

--
You received this message because you are subscribed to the Google Groups "elasticsearch" group.
To unsubscribe from this group and stop receiving emails from it, send an email to elasticsearch+unsubscribe@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/elasticsearch/54BD2062.1050907%40gmail.com.
For more options, visit https://groups.google.com/d/optout.

Thanks for the reply Costin.
I'm not really clear but the basic idea is to index data by day considering
a feature available in each line

Example of data (~700 millions lines for ~90days):

2014-01-01,05,06,ici
2014-01-04,05,06,la

The first one have to be send to my-index-2014-01-01/my-type and the other
my-index-2014-01-04/my-type
I would like to do it without having to launch 90 saveJsonToES (using the
elasticsearch-hadoop spark API)

Is it more clear?

It seems that the dynamic index could work for me. I'll try that right away.

Thanks again

Julien

2015-01-19 16:18 GMT+01:00 Costin Leau costin.leau@gmail.com:

Hi Julien,

I'm unclear of what you are trying to achieve and what doesn't work.
es-hadoop allows either a static index/type or a dynamic one [1] [2]. One
can also use a 'formatter' so for example
you can use a pattern like "{@timestamp:YYYY-MM-dd}" - meaning the field
@timestamp will be used as a target but first
it will formatted into year/month/day.

There's work underway to extend that for API/real-time environments, if
the global settings (which are pluggable are not enough)
like Spark to customize the metadata per entry [3].

Hope this helps,

[1] http://www.elasticsearch.org/guide/en/elasticsearch/hadoop/
master/configuration.html#cfg-multi-writes
[2] http://www.elasticsearch.org/guide/en/elasticsearch/hadoop/
master/spark.html#spark-write-dyn
[3] https://github.com/elasticsearch/elasticsearch-hadoop/issues/358

On 1/19/15 4:50 PM, Julien Naour wrote:

I implemented a solution for my problem.
I use a foreachPartitions and instantiate a bulk processor using a
transport client (i.e. one by partition) to send
documents.
It's not fast but it works.
Somebody have an idea to be more efficient?

Julien

On Thursday, January 15, 2015 at 4:40:22 PM UTC+1, Julien Naour wrote:

My previous idea doesn't seem to work. Cannot send documents directly

to _bulk only to "index/type" pattern

On Thursday, January 15, 2015 at 4:17:57 PM UTC+1, Julien Naour wrote:

    Hi,

    I work on a complex workflow using Spark (Parsing, Cleaning,

Machine Learning....).
At the end of the workflow I want to send aggregated results to
elasticsearch so my portal could query data.
There will be two types of processing: streaming and the
possibility to relaunch workflow on all available data.

    Right now I use elasticsearch-hadoop and particularly the spark

part to send document to elasticsearch with the
saveJsonToEs(myindex, mytype) method.
The target is to have an index by day using the proper template
that we build.
AFAIK you could not add consideration of a feature in a document
to send it to the proper index in
elasticsearch-hadoop.

    What is the proper way to implement this feature?
    Have a special step useing spark and bulk so that each executor

send documents to the proper index considering
the feature of each line?
Is there something that I missed in elasticsearch-hadoop?

    Julien

--
You received this message because you are subscribed to the Google Groups
"elasticsearch" group.
To unsubscribe from this group and stop receiving emails from it, send an
email to
elasticsearch+unsubscribe@googlegroups.com <mailto:elasticsearch+
unsubscribe@googlegroups.com>.
To view this discussion on the web visit
https://groups.google.com/d/msgid/elasticsearch/dad84f38-
cc36-4351-9d13-e0d1f461ebe9%40googlegroups.com
<https://groups.google.com/d/msgid/elasticsearch/dad84f38-
cc36-4351-9d13-e0d1f461ebe9%40googlegroups.com?utm_medium=
email&utm_source=footer>.
For more options, visit https://groups.google.com/d/optout.

--
Costin

--
You received this message because you are subscribed to a topic in the
Google Groups "elasticsearch" group.
To unsubscribe from this topic, visit https://groups.google.com/d/
topic/elasticsearch/5-LwjQxVlhk/unsubscribe.
To unsubscribe from this group and all its topics, send an email to
elasticsearch+unsubscribe@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/
msgid/elasticsearch/54BD2062.1050907%40gmail.com.

For more options, visit https://groups.google.com/d/optout.

--
You received this message because you are subscribed to the Google Groups "elasticsearch" group.
To unsubscribe from this group and stop receiving emails from it, send an email to elasticsearch+unsubscribe@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/elasticsearch/CAFGA2c2xpkS5EGYL8yDC%3DkjraR0nHoz0Dhm%2BAdR52GF1NV93jg%40mail.gmail.com.
For more options, visit https://groups.google.com/d/optout.

Ok it works with a simple saveJsonToEs("aft-{date}/analysis")

Thanks Costin

2015-01-19 16:45 GMT+01:00 Julien Naour julnaour@gmail.com:

Thanks for the reply Costin.
I'm not really clear but the basic idea is to index data by day
considering a feature available in each line

Example of data (~700 millions lines for ~90days):

2014-01-01,05,06,ici
2014-01-04,05,06,la

The first one have to be send to my-index-2014-01-01/my-type and the other
my-index-2014-01-04/my-type
I would like to do it without having to launch 90 saveJsonToES (using the
elasticsearch-hadoop spark API)

Is it more clear?

It seems that the dynamic index could work for me. I'll try that right
away.

Thanks again

Julien

2015-01-19 16:18 GMT+01:00 Costin Leau costin.leau@gmail.com:

Hi Julien,

I'm unclear of what you are trying to achieve and what doesn't work.
es-hadoop allows either a static index/type or a dynamic one [1] [2]. One
can also use a 'formatter' so for example
you can use a pattern like "{@timestamp:YYYY-MM-dd}" - meaning the field
@timestamp will be used as a target but first
it will formatted into year/month/day.

There's work underway to extend that for API/real-time environments, if
the global settings (which are pluggable are not enough)
like Spark to customize the metadata per entry [3].

Hope this helps,

[1] http://www.elasticsearch.org/guide/en/elasticsearch/hadoop/
master/configuration.html#cfg-multi-writes
[2] http://www.elasticsearch.org/guide/en/elasticsearch/hadoop/
master/spark.html#spark-write-dyn
[3] https://github.com/elasticsearch/elasticsearch-hadoop/issues/358

On 1/19/15 4:50 PM, Julien Naour wrote:

I implemented a solution for my problem.
I use a foreachPartitions and instantiate a bulk processor using a
transport client (i.e. one by partition) to send
documents.
It's not fast but it works.
Somebody have an idea to be more efficient?

Julien

On Thursday, January 15, 2015 at 4:40:22 PM UTC+1, Julien Naour wrote:

My previous idea doesn't seem to work. Cannot send documents

directly to _bulk only to "index/type" pattern

On Thursday, January 15, 2015 at 4:17:57 PM UTC+1, Julien Naour

wrote:

    Hi,

    I work on a complex workflow using Spark (Parsing, Cleaning,

Machine Learning....).
At the end of the workflow I want to send aggregated results to
elasticsearch so my portal could query data.
There will be two types of processing: streaming and the
possibility to relaunch workflow on all available data.

    Right now I use elasticsearch-hadoop and particularly the spark

part to send document to elasticsearch with the
saveJsonToEs(myindex, mytype) method.
The target is to have an index by day using the proper template
that we build.
AFAIK you could not add consideration of a feature in a document
to send it to the proper index in
elasticsearch-hadoop.

    What is the proper way to implement this feature?
    Have a special step useing spark and bulk so that each executor

send documents to the proper index considering
the feature of each line?
Is there something that I missed in elasticsearch-hadoop?

    Julien

--
You received this message because you are subscribed to the Google
Groups "elasticsearch" group.
To unsubscribe from this group and stop receiving emails from it, send
an email to
elasticsearch+unsubscribe@googlegroups.com <mailto:elasticsearch+
unsubscribe@googlegroups.com>.
To view this discussion on the web visit
https://groups.google.com/d/msgid/elasticsearch/dad84f38-
cc36-4351-9d13-e0d1f461ebe9%40googlegroups.com
<https://groups.google.com/d/msgid/elasticsearch/dad84f38-
cc36-4351-9d13-e0d1f461ebe9%40googlegroups.com?utm_medium=
email&utm_source=footer>.
For more options, visit https://groups.google.com/d/optout.

--
Costin

--
You received this message because you are subscribed to a topic in the
Google Groups "elasticsearch" group.
To unsubscribe from this topic, visit https://groups.google.com/d/
topic/elasticsearch/5-LwjQxVlhk/unsubscribe.
To unsubscribe from this group and all its topics, send an email to
elasticsearch+unsubscribe@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/
msgid/elasticsearch/54BD2062.1050907%40gmail.com.

For more options, visit https://groups.google.com/d/optout.

--
You received this message because you are subscribed to the Google Groups "elasticsearch" group.
To unsubscribe from this group and stop receiving emails from it, send an email to elasticsearch+unsubscribe@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/elasticsearch/CAFGA2c3Qpq-Jotr3LefGm%3Dt%3Dcz%3Dk8EjeQMizMOPUT9oe87d3ag%40mail.gmail.com.
For more options, visit https://groups.google.com/d/optout.

Hi Julien,

I am trying to achieve something similar. In my case, my JSON contains a
field "time" in Unix time. And i want to partition my indexes by this
field. That is, if one JSON1 contains 1422904680 in "time" and JSON2
contains 1422991080 in time, then i want to create indexes which are
partitioned by time (24 hours) like

index_1422835200_1422921599 - which will contain JSON1 because value of
"time" = 1422904680 falls in its range.
index_1422921600_1423007999 - which will contain JSON2 because value of
"time" = 1422991080 falls in its range.

Now I want to create my indexes dynamically. There is also possibility of
receiving a JSON which contain a value of time before current date.

To achieve this, I need to create index name dynamically by calculating it
at the time of creation then and there itself. Programmatically, I want to
achieve something like this

JavaEsSpark.saveJsonToEs(jrd, "index_{time} - {time} % 86400}_{ {time} +
86400 - {date} % 86400 }/type");

Is it possible to achieve dynamic index name creation as described above?
If no, is there any work around to achieve my use case.

Thanks
Abhishek

--
You received this message because you are subscribed to the Google Groups "elasticsearch" group.
To unsubscribe from this group and stop receiving emails from it, send an email to elasticsearch+unsubscribe@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/elasticsearch/2326fa57-63d9-4b9c-8b71-b1c8b2b0e061%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Hi Abhishek,

I'll probably do a previous step to process the day by line (like
yyyy-MM-dd).
And just do
JavaEsSpark.saveJsonToEs(jrd, "index_{date}/type");

If the range is needed I'll probably do the same. Add a feature with {time}

  • {time} % 86400}{ {time} + 86400 - {date} % 86400 } and just
    index
    {my_new_feature}

It adds a step and a feature in the doc but it's simple to do.

Julien

2015-02-02 15:13 GMT+01:00 Abhishek Patel present.boiling2290@gmail.com:

Hi Julien,

I am trying to achieve something similar. In my case, my JSON contains a
field "time" in Unix time. And i want to partition my indexes by this
field. That is, if one JSON1 contains 1422904680 in "time" and JSON2
contains 1422991080 in time, then i want to create indexes which are
partitioned by time (24 hours) like

index_1422835200_1422921599 - which will contain JSON1 because value of
"time" = 1422904680 falls in its range.
index_1422921600_1423007999 - which will contain JSON2 because value of
"time" = 1422991080 falls in its range.

Now I want to create my indexes dynamically. There is also possibility of
receiving a JSON which contain a value of time before current date.

To achieve this, I need to create index name dynamically by calculating it
at the time of creation then and there itself. Programmatically, I want to
achieve something like this

JavaEsSpark.saveJsonToEs(jrd, "index_{time} - {time} % 86400}_{ {time} +
86400 - {date} % 86400 }/type");

Is it possible to achieve dynamic index name creation as described above?
If no, is there any work around to achieve my use case.

Thanks
Abhishek

--
You received this message because you are subscribed to a topic in the
Google Groups "elasticsearch" group.
To unsubscribe from this topic, visit
https://groups.google.com/d/topic/elasticsearch/5-LwjQxVlhk/unsubscribe.
To unsubscribe from this group and all its topics, send an email to
elasticsearch+unsubscribe@googlegroups.com.
To view this discussion on the web visit
https://groups.google.com/d/msgid/elasticsearch/2326fa57-63d9-4b9c-8b71-b1c8b2b0e061%40googlegroups.com
https://groups.google.com/d/msgid/elasticsearch/2326fa57-63d9-4b9c-8b71-b1c8b2b0e061%40googlegroups.com?utm_medium=email&utm_source=footer
.

For more options, visit https://groups.google.com/d/optout.

--
You received this message because you are subscribed to the Google Groups "elasticsearch" group.
To unsubscribe from this group and stop receiving emails from it, send an email to elasticsearch+unsubscribe@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/elasticsearch/CAFGA2c3fFGtSpTtpR9tD%2BsfwjuRDftHTK7y2QUHUYEhH5iwkWA%40mail.gmail.com.
For more options, visit https://groups.google.com/d/optout.

Hi Julien

Yes. Probably that's the only possible work around to do this. What I am
planning to do is, calculate the name of index prior to writing and add a
field named "indexname" in my JSON and then I will use

JavaEsSpark.saveJsonToEs(jrd, "index_{indexname}/type");

Thanks for the reply.

Abhishek

--
You received this message because you are subscribed to the Google Groups "elasticsearch" group.
To unsubscribe from this group and stop receiving emails from it, send an email to elasticsearch+unsubscribe@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/elasticsearch/7d41dbce-964c-4c71-94cf-d4dcf0e902b5%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.