Storing into Elasticsearch using Apache Pig

Hello I have the following Json input in second_mapping_data.json :

{"address":"1.1.1.1","logName":"-","user":"-","time":"07/Mar/2014:16:05:49 -0800","method":"GET","uri":"/twiki/bin/edit/Main/Double_bounce_sender?topicparent=Main.ConfigurationVariables","protocol":"HTTP/1.1","status":"401","bytes":"1000"}

My pig script :
REGISTER elasticsearch-hadoop-2.0.2.jar
REGISTER elasticsearch-hadoop-2.0.2.jar
DEFINE EsStorage org.elasticsearch.hadoop.pig.EsStorage();
logs = load 'second_mapping_data.json' using PigStorage('\n');
STORE logs INTO 'test_index/logsdetails' USING org.elasticsearch.hadoop.pig.EsStorage('es.nodes=127.0.0.1:9200');

and it's not storing the result into elasticsearch and showing error :

Please Find the attachment for the error .

Can anyone tell me what is the problem ?

@cloudxy It looks like the client is breaking while discovering the nodes to write to. Could you share which version of Elasticsearch you are using?

2.4.0

I've narrowed down the issue. This is a problem with some old IP parsing code and should be fixed in ES-Hadoop v2.2.0 and above. Since you are using Elasticsearch version 2.4.0, I would advise upgrading to ES-Hadoop v2.4.0 to ensure maximum compatibility between the two. Hope this helps!

1 Like

Ok . Thank You . I'll try and let you know .

It's hitting elasticsearch but data is not inserting .

Thank You so much for helping , it is going to elasticsearch now . I only load that using JsonLoader() .

But when I'm trying to do the same thing using aws elasticsearch service the following exception is showing :

d per job phase (AliasName[line,offset]): M: logs[1,7] C: R:
2016-09-22 00:41:14,066 [LocalJobRunner Map Task Executor #0] INFO org.elasticsearch.hadoop.util.Version - Elasticsearch Hadoop v2.4.0 [4ab7a3ea4c]
2016-09-22 00:41:14,275 [main] INFO org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - HadoopJobId: job_local611031861_0002
2016-09-22 00:41:14,275 [main] INFO org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - Processing aliases logs
2016-09-22 00:41:14,275 [main] INFO org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - detailed locations: M: logs[1,7] C: R:
2016-09-22 00:41:14,277 [main] INFO org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - 0% complete
2016-09-22 00:41:19,986 [communication thread] INFO org.apache.hadoop.mapred.LocalJobRunner - map > map
2016-09-22 00:42:17,458 [LocalJobRunner Map Task Executor #0] INFO org.apache.commons.httpclient.HttpMethodDirector - I/O exception (java.net.ConnectException) caught when processing request: Connection timed out
2016-09-22 00:42:17,458 [LocalJobRunner Map Task Executor #0] INFO org.apache.commons.httpclient.HttpMethodDirector - Retrying request

But in my local server it is running fine . Can you please tell me what is the problem ?

@cloudxy As detailed here, when running against a cloud environment, you may need to set the es.nodes.wan.only option. This is because ES Hadoop asks one server in the cluster for the addresses of all of the nodes it is aware of. These addresses may not be accessible if Elasticsearch is running in a cloud environment, as they may be behind a firewall or part of a private network. The es.nodes.wan.only option tells the connector to not do any node discovery, and to only make connections to the Elasticsearch cluster through the nodes declared in the es.nodes setting.

What I have to write ?

store logs INTO 'xx/yy' USING org.elasticsearch.hadoop.pig.EsStorage('es.nodes=the_endpoint_of_the_node','es.nodes.wan.only=true');

The above one ?

That should do it, yes

But still giving problem .

I'm trying to push it from my local machine to aws elasticsearch . After the last command when the job is started , it is not ending .
Error:

2016-09-23 03:08:10,606 [Thread-11] INFO org.apache.hadoop.conf.Configuration.deprecation - mapred.job.reduce.markreset.buffer.percent is deprecated. Instead, use mapreduce.reduce.markreset.buffer.percent
2016-09-23
03:08:10,606 [Thread-11] INFO
org.apache.hadoop.conf.Configuration.deprecation - mapred.job.tracker is
deprecated. Instead, use mapreduce.jobtracker.address
2016-09-23
03:08:10,606 [Thread-11] INFO
org.apache.hadoop.conf.Configuration.deprecation - io.bytes.per.checksum
is deprecated. Instead, use dfs.bytes-per-checksum
2016-09-23
03:08:10,606 [Thread-11] INFO org.apache.hadoop.mapred.LocalJobRunner -
OutputCommitter is
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigOutputCommitter
2016-09-23 03:08:10,609 [Thread-11] INFO org.apache.hadoop.mapred.LocalJobRunner - Waiting for

2016-09-23 03:08:10,852 [main] INFO
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher

  • Processing aliases logs
    2016-09-23 03:08:10,852 [main] INFO
    org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher

  • detailed locations: M: logs[2,7] C: R:
    2016-09-23 03:08:10,854
    [main] INFO
    org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher

  • 0% complete
    java.lang.Exception:
    org.elasticsearch.hadoop.EsHadoopIllegalArgumentException: Cannot detect
    ES version - typically this happens if the network/Elasticsearch
    cluster is not accessible or when targeting a WAN/Cloud instance without
    the proper setting 'es.nodes.wan.only'
    at org.apache.hadoop.mapred.LocalJobRunner$Job.runTasks(LocalJobRunner.java:462)
    at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:522)
    Caused
    by: org.elasticsearch.hadoop.EsHadoopIllegalArgumentException: Cannot
    detect ES version - typically this happens if the network/Elasticsearch
    cluster is not accessible or when targeting a WAN/Cloud instance without
    the proper setting 'es.nodes.wan.only'
    at org.elasticsearch.hadoop.rest.InitializationUtils.discoverEsVersion(InitializationUtils.java:196)
    at org.elasticsearch.hadoop.rest.RestService.createWriter(RestService.java:379)
    at org.elasticsearch.hadoop.mr.EsOutputFormat$EsRecordWriter.init(EsOutputFormat.java:173)
    at org.elasticsearch.hadoop.mr.EsOutputFormat$EsRecordWriter.write(EsOutputFormat.java:149)
    at org.elasticsearch.hadoop.pig.EsStorage.putNext(EsStorage.java:192)

    at
    org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigOutputFormat$PigRecordWriter.write(PigOutputFormat.java:139)

    at
    org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigOutputFormat$PigRecordWriter.write(PigOutputFormat.java:98)

Caused by:
org.elasticsearch.hadoop.rest.EsHadoopNoNodesLeftException: Connection
error (check network and/or proxy settings)- all nodes failed; tried
[[search-beyblade-nsmryapnku2giqicww67slo3zi.us-east-1.es.amazonaws.com:9200]]

HadoopVersion PigVersion UserId StartedAt FinishedAt Features
2.6.0-cdh5.5.0 0.12.0-cdh5.5.0 root 2016-09-23 03:08:10 2016-09-23 03:12:23 UNKNOWN

Failed!

Failed Jobs:
JobId Alias Feature Message Outputs
job_local685037827_0002 logs MAP_ONLY Message: Job failed! test_index/logsdtls,

Input(s):
Failed to read data from "file:///home/cloudera/Desktop/Satish/json/second_mapping_data.json"

Output(s):
Failed to produce result in "test_index/logsdtls"

Job DAG:
job_local685037827_0002

2016-09-23 03:12:23,607 [main] INFO org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - Failed!
grunt> 2016-09-23 03:12:28,648 [communication thread] INFO org.apache.hadoop.mapred.LocalJobRunner - map > map

The same error like before is coming .

I think it is unable to connect my elasticsearch cluster . Is it the fault of the jar or I have to make changes in my code ?

Normally this occurs when the URL provided is not a valid URL to be used from outside AWS. Generally, if you can query the URL using curl locally, it should work correctly with ES-Hadoop if you're running hadoop/pig locally. If the URL does not return, then you will need to provide some other form of connecting to the remote service. This is usually different for every cloud provider.

I tried making index from local machine using curl command and it worked .

I'm sorry, but I'm afraid I don't have an answer for the library not connecting in this instance. As I said before, different cloud providers have different ways of interacting with Elasticsearch, and everyone's networking situation is diverse. I must also disclose that Elastic does not support the AWS Elasticsearch Service. My last bit of advice would be to increase the logging levels to TRACE and check the output for network related exceptions that may be getting swallowed and logged. Hopefully this will shed some light on the connectivity problems.

Data is going to elasticsearch service from cloudera just have to mention the es.port=80 . :slight_smile: .