Text processing Spark - ES

Hi,

I need an advice about the best approach for my problem.
With ManifoldCF I am indexing documents from the filesystem to ES. I gather the metadata and the content of each document. So far this works great, but for my further analysis I need to run a tokenizer and stemmer on the content field. As far as I know I cannot retrieve tokens from ES easily. On stackoverflow the Term Vector API has been mentioned but I guess it is not appropriate for larger data sets. http://stackoverflow.com/questions/13404722/retrieve-analyzed-tokens-from-elasticsearch-documents

A document stored in ES looks like the following:
GET elastic/_search
"_index": "elastic",
"_type": "documents",
"_id": "file:/path/to/file/test.pdf",
"_source": {
"created": "Mon Jan 20 11:50:35 CET 2014",
"uri": "/path/to/file/test.pdf",
"modified": "2015-10-23T10:46:40.000+0000",
"indexed": "2016-03-25T17:05:37.241+0000",
"mimetype": "application/pdf",
"content": "Extracted text from the document"

My idea was to utilize Apache Spark, retrieve the ES index, do the text processing and write the updates back to ES. For my use case the tokens could override the "content" field or be stored in a new "tokens" field.

I found a similar request in this discussion board ( Updating an existing index using spark ) and also implemented the StanfordCoreNLP in Spark according to http://stackoverflow.com/questions/30222559/simplest-method-for-text-lemmatization-in-scala-and-spark.

Below is my code but I don't know how to make the writeTokensToES function work.
I thought I can iterate through the RDD, call the writeTokensToES function each time and write the tokens to ES.

Anyway I am not experienced in Spark/Scala so I reach my limits here. Is there an easier way to extract the tokens or should I try to make it work somehow?

import org.apache.spark._
import org.apache.spark.rdd.RDD
import scala.collection.mutable.ArrayBuffer
import org.elasticsearch.spark._
import org.elasticsearch.spark.rdd._
import edu.stanford.nlp.pipeline._
import edu.stanford.nlp.ling.CoreAnnotations._
import scala.collection.JavaConversions._
import java.util.Properties 
import java.io.File

object Stemming {
	def main(args: Array[String]) {
        val conf = new SparkConf() .setMaster("local[*]") .setAppName("SparkLemma")
            .set("es.nodes", "hadoop-m:9200" )
            .set("es.write.operation", "upsert")
            .set("es.mapping.id", "id")
		val esIndex = "index/type"
        val sc = new SparkContext(conf)
	
	// Read data from ES
	val esRDD = sc.esRDD(esIndex)
	val stopWords = sc.broadcast(scala.io.Source.fromFile("stopwords.txt").getLines().toSet).value
	
	def createNLPPipeline(): StanfordCoreNLP = {
		val props = new Properties()
		props.put("annotators", "tokenize, ssplit, pos, lemma")
		new StanfordCoreNLP(props)
	}

	def plainTextToLemmas(text: String, stopWords: Set[String], pipeline: StanfordCoreNLP) : Seq[String] = {
		val doc = new Annotation(text)
		pipeline.annotate(doc)
		val lemmas = new ArrayBuffer[String]()
		val sentences = doc.get(classOf[SentencesAnnotation])
		for (sentence <- sentences;
			token <- sentence.get(classOf[TokensAnnotation])) {
			val lemma = token.get(classOf[LemmaAnnotation])
			if (lemma.length > 3 && !stopWords.contains(lemma)) {
				lemmas += lemma.toLowerCase
			}
		}
    lemmas
	}
	val lemmatized: RDD[Seq[String]] = plainText.mapPartitions(strings => {
		val pipeline = createNLPPipeline()
		strings.map{case(id, content) => plainTextToLemmas(content, stopWords, pipeline)}
	})
	
def writeTokensToES(row:Tuple2[String,scala.collection.Map[String,AnyRef]], tokens: Seq[String]): Map[String,String] = {
	return Map("id" -> row._1, "tokens" -> tokens)
}

val testRDD = esRDD.map(row => writeTokensToES(row, lemmatized))
	EsSpark.saveToEs(testRDD, esIndex)
	sc.stop()
	}
}

Currently there's no easy way to do this with the current APIs. Basically there are two things required:

  1. open up the ES APIs to get better access to the underlying Lucene/Analysis data. This is typically done for reading but can be extended for writing as well. Can be implemented through a plugin.

  2. Extend ES-Spark with said APIs to make it easy to consume those from a Spark environment.

@Britta_Weber and myself have been working on and off on an integration between ES and MLlib - in fact, I just gave a presentation last week on Strata about this (no idea when the video is going to be online).

Basically you can find the code that we used (unpolished and not really documented since it's wip) at


Britta's vectorize plugin is here: https://github.com/brwe/es-token-plugin while support for it in ES-Spark is available here
https://github.com/costin/elasticsearch-hadoop/tree/ml_poc

Not sure how much these help out.
Note that again, this is just wip - it's unclear when and if something will come out of it.

Costin,
thanks for clearing this up.