I wrote this easy connection on python:
class ElasticS(object):
def __init__(self,portI,ip):
try:
self.es = Elasticsearch([ip],port=portI )
print ("Connected: ", self.es.info())
except Exception as ex:
print ("Error: ", ex)
return None
def get_elastic(self):
return self.es
if __name__ == '__main__':
ElasticS(portI=9200,ip='localhost')
Pipeline:
from elasticsearch import Elasticsearch
class Pipeline(object):
def __init__(self):
es = Elasticsearch()
es.cat.health()
body = {
"description": "parse arxiv pdfs and index into ES",
"processors" :
[
{ "attachment" : { "field": "pdf" } },
{ "remove" : { "field": "pdf" } },
]
}
es.index(index='_ingest', doc_type='pipeline', id='attachment', body=body)
if __name__ == '__main__':
Pipeline()
Insert with bulk streaming:
from elasticsearch.helpers import streaming_bulk
from Config.Connection import ElasticS
from Config.Parse import Parse
es = ElasticS(ip='localhost',portI=9200)
p = Parse()
for ok, result in streaming_bulk(
client=es.get_elastic(),
actions=p.parse_pdf(filename="QzpccHJvdmFcMTcwMS4wMDg5MC5wZGY="),
index="arxiv",
doc_type="arxiv",
chunk_size=4,
params={"pipeline": "attachment"}
):
action, result = result.popitem()
if not ok:
print("failed to index document")
else:
print("Success!")
I GOT THIS ERROR:
[2017-01-19T17:53:14,342][DEBUG][o.e.a.i.IngestActionFilter] [node1] failed to execute pipeline [attachment] for document [arxiv/arxiv/null]
org.elasticsearch.common.compress.NotXContentException: Compressor detection can only be called on some xcontent bytes or compressed xcontent bytes
at org.elasticsearch.common.compress.CompressorFactory.compressor(CompressorFactory.java:57) ~[elasticsearch-5.1.1.jar:5.1.1]
at org.elasticsearch.common.xcontent.XContentHelper.convertToMap(XContentHelper.java:65) ~[elasticsearch-5.1.1.jar:5.1.1]
at org.elasticsearch.action.index.IndexRequest.sourceAsMap(IndexRequest.java:403) ~[elasticsearch-5.1.1.jar:5.1.1]
at org.elasticsearch.ingest.PipelineExecutionService.innerExecute(PipelineExecutionService.java:164) ~[elasticsearch-5.1.1.jar:5.1.1]
at org.elasticsearch.ingest.PipelineExecutionService.access$000(PipelineExecutionService.java:41) ~[elasticsearch-5.1.1.jar:5.1.1]
at org.elasticsearch.ingest.PipelineExecutionService$2.doRun(PipelineExecutionService.java:88) [elasticsearch-5.1.1.jar:5.1.1]
at org.elasticsearch.common.util.concurrent.ThreadContext$ContextPreservingAbstractRunnable.doRun(ThreadContext.java:527) [elasticsearch-5.1.1.jar:5.1.1]
at org.elasticsearch.common.util.concurrent.AbstractRunnable.run(AbstractRunnable.java:37) [elasticsearch-5.1.1.jar:5.1.1]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [?:1.8.0_101]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [?:1.8.0_101]
at java.lang.Thread.run(Thread.java:745) [?:1.8.0_101]
i'm following this guide btw: GUIDE
My main goal is to insert documents on elasticsearch and work with them...
i'm pretty sure im doing something wrong. thank you