Scalding & ElasticSearch-Hadoop


(Antonios Chalkiopoulos) #1

I'm using 1.3.0.M2 & Scalding 0.9.0rc4

I'm overriding the config object and set the es.mapping.id to "number"

import com.twitter.scalding.{Job, Args}
import org.elasticsearch.hadoop.cfg.ConfigurationOptions

class JobBase(args: Args) extends Job(args) {
// Overide JobConfig
override def config : Map[AnyRef,AnyRef] = {
super.config ++ Map (ConfigurationOptions.ES_MAPPING_ID -> "number") ++
Map(ConfigurationOptions.ES_WRITE_OPERATION -> "index")
}
}

My Scalding Job looks like that

class ElasticSearchUpdateIndexes(args: Args) extends JobBase(args) {

// Some data to push into elastic-search
val someData = List(
("1","product1", "description1"),
("2","product2", "description2"),
("3","product3", "description3"))

val indexNewDataInElasticSearch =
IterableSource(String,String, String)
.write(ElasticSearchSource("localhost", 9200,"index_es/type_es"))
}

And the wrapper that i'm trying to implement and contribute to Scalding for
the ElasticSearchSource currently looks like this:

case class ElasticSearchSource(
es_host :String="localhost",
es_port :Int = 9200,
es_resource:String="scalding_index/type",
es_fields : Fields = Fields.ALL)
extends Source {

def createLocalTap: Tap[_, _, _] =
new EsTap(es_host, es_port, es_resource,"",es_fields)

override def createTap(readOrWrite: AccessMode)(implicit mode: Mode):
Tap[_, _, ] = {
mode match {
case Local(
) => {
createLocalTap
}
}
}
}

My problem is that once i introduce :

++ Map (ConfigurationOptions.ES_MAPPING_ID -> "number")

I'm getting

cascading.tuple.TupleException: unable to sink into output identifier:
'unknown'

My second concern is around the usage of the
ConfigurationOptions.ES_MAPPING_ID as part of the JobConfiguration
I understand the benefits of that approach , for Hive/Pig, but I think for
Cascading/Scalding - having a single property is inefficient

What i would ideally be able to do in Scalding is the following:

val productSales = data
.filterProductsBoughtWithOffer("summer-offer-14")
.project('productID, 'customerID, 'quantity)
.write(ElasticSearchSource("localhost", 9200,"offers/summer-offer-14"),
'productID) // productID is the em.mapping.id
.joinWithSmaller('customerID -> 'customerID, customerData)
.write(ElasticSearchSource("localhost", 9200,"customers/got-offer"),
'customerID) // customerID is the em.mapping.id

So i would like within a Single Job to have multiple elastic-search sources
& sinks.
My understanding at the moment is that elasticsearch-hadoop will not allow
me to configure all sources..

Anyhow i'm just looking for some help in implementing this capability in
Scalding ..
Any help appreciated

Antonios

--
You received this message because you are subscribed to the Google Groups "elasticsearch" group.
To unsubscribe from this group and stop receiving emails from it, send an email to elasticsearch+unsubscribe@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/elasticsearch/2e78f7d6-7142-4225-8afa-69f29e509048%40googlegroups.com.
For more options, visit https://groups.google.com/groups/opt_out.


(Costin Leau) #2

Regarding the 'unknown' error, it typically appears if there are no fields defined on the sink/source tap. Without any
field definition, we can't extract the field value.
We could use numbers but since the tuple order can easily change, is better/safer to use the names.

Regarding configuration clashes, we could introduce a dedicated Properties object per Tap or potentially a var arg (for
lose declaration) - unsure yet on how this will translate to the configuration of the running job (as a Tap/Input/Output
format we can't touch the configuration itself since we're called after the 'job' has already started; any changes that
we make are local).

Can you please raise an issue to make sure we keep track of this?

If you can't fix the 'unknown' problem, please try and translate it into proper Cascading and raise an issue for it as
well.

Thanks!

On 21/02/2014 1:46 PM, Antonios Chalkiopoulos wrote:

I'm using 1.3.0.M2 & Scalding 0.9.0rc4

I'm overriding the config object and set the es.mapping.id to "number"

import com.twitter.scalding.{Job, Args}
import org.elasticsearch.hadoop.cfg.ConfigurationOptions

class JobBase(args: Args) extends Job(args) {
// Overide JobConfig
override def config : Map[AnyRef,AnyRef] = {
super.config ++ Map (ConfigurationOptions.ES_MAPPING_ID -> "number") ++ Map(ConfigurationOptions.ES_WRITE_OPERATION
-> "index")
}
}

My Scalding Job looks like that

class ElasticSearchUpdateIndexes(args: Args) extends JobBase(args) {

// Some data to push into elastic-search
val someData = List(
("1","product1", "description1"),
("2","product2", "description2"),
("3","product3", "description3"))

val indexNewDataInElasticSearch =
IterableSource(String,String, String)
.write(ElasticSearchSource("localhost", 9200,"index_es/type_es"))
}

And the wrapper that i'm trying to implement and contribute to Scalding for the ElasticSearchSource currently looks like
this:

case class ElasticSearchSource(
es_host :String="localhost",
es_port :Int = 9200,
es_resource:String="scalding_index/type",
es_fields : Fields = Fields.ALL)
extends Source {

def createLocalTap: Tap[_, _, _] =
new EsTap(es_host, es_port, es_resource,"",es_fields)

override def createTap(readOrWrite: AccessMode)(implicit mode: Mode): Tap[_, _, ] = {
mode match {
case Local(
) => {
createLocalTap
}
}
}
}

My problem is that once i introduce :

++ Map (ConfigurationOptions.ES_MAPPING_ID -> "number")

I'm getting

cascading.tuple.TupleException: unable to sink into output identifier: 'unknown'

My second concern is around the usage of the ConfigurationOptions.ES_MAPPING_ID as part of the JobConfiguration
I understand the benefits of that approach , for Hive/Pig, but I think for Cascading/Scalding - having a single property
is inefficient

What i would ideally be able to do in Scalding is the following:

val productSales = data
.filterProductsBoughtWithOffer("summer-offer-14")
.project('productID, 'customerID, 'quantity)
.write(ElasticSearchSource("localhost", 9200,"offers/summer-offer-14"), 'productID) // productID is the em.mapping.id
.joinWithSmaller('customerID -> 'customerID, customerData)
.write(ElasticSearchSource("localhost", 9200,"customers/got-offer"), 'customerID) // customerID is the em.mapping.id

So i would like within a Single Job to have multiple elastic-search sources & sinks.
My understanding at the moment is that elasticsearch-hadoop will not allow me to configure all sources..

Anyhow i'm just looking for some help in implementing this capability in Scalding ..
Any help appreciated

Antonios

--
You received this message because you are subscribed to the Google Groups "elasticsearch" group.
To unsubscribe from this group and stop receiving emails from it, send an email to
elasticsearch+unsubscribe@googlegroups.com.
To view this discussion on the web visit
https://groups.google.com/d/msgid/elasticsearch/2e78f7d6-7142-4225-8afa-69f29e509048%40googlegroups.com.
For more options, visit https://groups.google.com/groups/opt_out.

--
Costin

--
You received this message because you are subscribed to the Google Groups "elasticsearch" group.
To unsubscribe from this group and stop receiving emails from it, send an email to elasticsearch+unsubscribe@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/elasticsearch/53073FC2.3060104%40gmail.com.
For more options, visit https://groups.google.com/groups/opt_out.


(system) #3