Hi,
I need an advice about the best approach for my problem.
With ManifoldCF I am indexing documents from the filesystem to ES. I gather the metadata and the content of each document. So far this works great, but for my further analysis I need to run a tokenizer and stemmer on the content field. As far as I know I cannot retrieve tokens from ES easily. On stackoverflow the Term Vector API has been mentioned but I guess it is not appropriate for larger data sets. http://stackoverflow.com/questions/13404722/retrieve-analyzed-tokens-from-elasticsearch-documents
A document stored in ES looks like the following:
GET elastic/_search
"_index": "elastic",
"_type": "documents",
"_id": "file:/path/to/file/test.pdf",
"_source": {
"created": "Mon Jan 20 11:50:35 CET 2014",
"uri": "/path/to/file/test.pdf",
"modified": "2015-10-23T10:46:40.000+0000",
"indexed": "2016-03-25T17:05:37.241+0000",
"mimetype": "application/pdf",
"content": "Extracted text from the document"
My idea was to utilize Apache Spark, retrieve the ES index, do the text processing and write the updates back to ES. For my use case the tokens could override the "content" field or be stored in a new "tokens" field.
I found a similar request in this discussion board ( Updating an existing index using spark ) and also implemented the StanfordCoreNLP in Spark according to http://stackoverflow.com/questions/30222559/simplest-method-for-text-lemmatization-in-scala-and-spark.
Below is my code but I don't know how to make the writeTokensToES function work.
I thought I can iterate through the RDD, call the writeTokensToES function each time and write the tokens to ES.
Anyway I am not experienced in Spark/Scala so I reach my limits here. Is there an easier way to extract the tokens or should I try to make it work somehow?
import org.apache.spark._
import org.apache.spark.rdd.RDD
import scala.collection.mutable.ArrayBuffer
import org.elasticsearch.spark._
import org.elasticsearch.spark.rdd._
import edu.stanford.nlp.pipeline._
import edu.stanford.nlp.ling.CoreAnnotations._
import scala.collection.JavaConversions._
import java.util.Properties
import java.io.File
object Stemming {
def main(args: Array[String]) {
val conf = new SparkConf() .setMaster("local[*]") .setAppName("SparkLemma")
.set("es.nodes", "hadoop-m:9200" )
.set("es.write.operation", "upsert")
.set("es.mapping.id", "id")
val esIndex = "index/type"
val sc = new SparkContext(conf)
// Read data from ES
val esRDD = sc.esRDD(esIndex)
val stopWords = sc.broadcast(scala.io.Source.fromFile("stopwords.txt").getLines().toSet).value
def createNLPPipeline(): StanfordCoreNLP = {
val props = new Properties()
props.put("annotators", "tokenize, ssplit, pos, lemma")
new StanfordCoreNLP(props)
}
def plainTextToLemmas(text: String, stopWords: Set[String], pipeline: StanfordCoreNLP) : Seq[String] = {
val doc = new Annotation(text)
pipeline.annotate(doc)
val lemmas = new ArrayBuffer[String]()
val sentences = doc.get(classOf[SentencesAnnotation])
for (sentence <- sentences;
token <- sentence.get(classOf[TokensAnnotation])) {
val lemma = token.get(classOf[LemmaAnnotation])
if (lemma.length > 3 && !stopWords.contains(lemma)) {
lemmas += lemma.toLowerCase
}
}
lemmas
}
val lemmatized: RDD[Seq[String]] = plainText.mapPartitions(strings => {
val pipeline = createNLPPipeline()
strings.map{case(id, content) => plainTextToLemmas(content, stopWords, pipeline)}
})
def writeTokensToES(row:Tuple2[String,scala.collection.Map[String,AnyRef]], tokens: Seq[String]): Map[String,String] = {
return Map("id" -> row._1, "tokens" -> tokens)
}
val testRDD = esRDD.map(row => writeTokensToES(row, lemmatized))
EsSpark.saveToEs(testRDD, esIndex)
sc.stop()
}
}