I'm using 1.3.0.M2 & Scalding 0.9.0rc4
I'm overriding the config object and set the es.mapping.id to "number"
import com.twitter.scalding.{Job, Args}
import org.elasticsearch.hadoop.cfg.ConfigurationOptions
class JobBase(args: Args) extends Job(args) {
// Overide JobConfig
override def config : Map[AnyRef,AnyRef] = {
super.config ++ Map (ConfigurationOptions.ES_MAPPING_ID -> "number") ++
Map(ConfigurationOptions.ES_WRITE_OPERATION -> "index")
}
}
My Scalding Job looks like that
class ElasticSearchUpdateIndexes(args: Args) extends JobBase(args) {
// Some data to push into elastic-search
val someData = List(
("1","product1", "description1"),
("2","product2", "description2"),
("3","product3", "description3"))
val indexNewDataInElasticSearch =
IterableSource(String,String, String)
.write(ElasticSearchSource("localhost", 9200,"index_es/type_es"))
}
And the wrapper that i'm trying to implement and contribute to Scalding for
the ElasticSearchSource currently looks like this:
case class ElasticSearchSource(
es_host :String="localhost",
es_port :Int = 9200,
es_resource:String="scalding_index/type",
es_fields : Fields = Fields.ALL)
extends Source {
def createLocalTap: Tap[_, _, _] =
new EsTap(es_host, es_port, es_resource,"",es_fields)
override def createTap(readOrWrite: AccessMode)(implicit mode: Mode):
Tap[_, _, ] = {
mode match {
case Local() => {
createLocalTap
}
}
}
}
My problem is that once i introduce :
++ Map (ConfigurationOptions.ES_MAPPING_ID -> "number")
I'm getting
cascading.tuple.TupleException: unable to sink into output identifier:
'unknown'
My second concern is around the usage of the
ConfigurationOptions.ES_MAPPING_ID as part of the JobConfiguration
I understand the benefits of that approach , for Hive/Pig, but I think for
Cascading/Scalding - having a single property is inefficient
What i would ideally be able to do in Scalding is the following:
val productSales = data
.filterProductsBoughtWithOffer("summer-offer-14")
.project('productID, 'customerID, 'quantity)
.write(ElasticSearchSource("localhost", 9200,"offers/summer-offer-14"),
'productID) // productID is the em.mapping.id
.joinWithSmaller('customerID -> 'customerID, customerData)
.write(ElasticSearchSource("localhost", 9200,"customers/got-offer"),
'customerID) // customerID is the em.mapping.id
So i would like within a Single Job to have multiple elastic-search sources
& sinks.
My understanding at the moment is that elasticsearch-hadoop will not allow
me to configure all sources..
Anyhow i'm just looking for some help in implementing this capability in
Scalding ..
Any help appreciated
Antonios
--
You received this message because you are subscribed to the Google Groups "elasticsearch" group.
To unsubscribe from this group and stop receiving emails from it, send an email to elasticsearch+unsubscribe@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/elasticsearch/2e78f7d6-7142-4225-8afa-69f29e509048%40googlegroups.com.
For more options, visit https://groups.google.com/groups/opt_out.