How to prevent from exiting MapReduce job when an Exception throwed in Elasitcsearch Hadoop


(Brad Jungsu Heo) #1

I'm stuck with an Exception while running MapReduce.

I'm using Elasticsearch 2.1 and Elasticsearch Hadoop 2.2.0

My Problem

The type of f1 is byte

$ curl -XGET http://hostname:9200/index-name/?pretty
...
"f1": {
    "type": "byte"
}
...

One of documents has value 20 on f1 field.

$ curl -XGET http://hostname:9200/index-name/type-name/doc-id?pretty
...
"f1": 20
...

But I made a mistake like this:

$ curl -XPOST http://hostname:9200/index-name/type-name/doc-id/_update -d '
{
  "script": "ctx._source.f1 += \"10\";",
  "upsert": {
      "f1": 20
  }
}'

Now, f1 became 2010 which does not fit in byte

$ curl -XGET http://hostname:9200/index-name/type-name/doc-id?pretty
...
"f1": "2010"
...

Finally, ES Hadoop throws the NumberFormatException

INFO mapreduce.Job: Task Id : attempt_1454640755387_0404_m_000020_2, Status : FAILED
Error: org.elasticsearch.hadoop.rest.EsHadoopParsingException: Cannot parse value [2010] for field [f1]
    at org.elasticsearch.hadoop.serialization.ScrollReader.read(ScrollReader.java:701)
    at org.elasticsearch.hadoop.serialization.ScrollReader.map(ScrollReader.java:794)
    at org.elasticsearch.hadoop.serialization.ScrollReader.read(ScrollReader.java:692)
    at org.elasticsearch.hadoop.serialization.ScrollReader.readHitAsMap(ScrollReader.java:457)
    at org.elasticsearch.hadoop.serialization.ScrollReader.readHit(ScrollReader.java:382)
    at org.elasticsearch.hadoop.serialization.ScrollReader.read(ScrollReader.java:277)
    at org.elasticsearch.hadoop.serialization.ScrollReader.read(ScrollReader.java:250)
    at org.elasticsearch.hadoop.rest.RestRepository.scroll(RestRepository.java:456)
    at org.elasticsearch.hadoop.rest.ScrollQuery.hasNext(ScrollQuery.java:86)
    at org.elasticsearch.hadoop.mr.EsInputFormat$ShardRecordReader.next(EsInputFormat.java:298)
    at org.elasticsearch.hadoop.mr.EsInputFormat$ShardRecordReader.nextKeyValue(EsInputFormat.java:232)
    at org.apache.hadoop.mapred.MapTask$NewTrackingRecordReader.nextKeyValue(MapTask.java:553)
    at org.apache.hadoop.mapreduce.task.MapContextImpl.nextKeyValue(MapContextImpl.java:80)
    at org.apache.hadoop.mapreduce.lib.map.WrappedMapper$Context.nextKeyValue(WrappedMapper.java:91)
    at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:144)
    at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:784)
    at org.apache.hadoop.mapred.MapTask.run(MapTask.java:341)
    at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:163)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:415)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1628)
    at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:158)
Caused by: java.lang.NumberFormatException: Value out of range. Value:"2030" Radix:10
    at java.lang.Byte.parseByte(Byte.java:150)
    at java.lang.Byte.parseByte(Byte.java:174)
    at org.elasticsearch.hadoop.serialization.builder.JdkValueReader.parseByte(JdkValueReader.java:333)
    at org.elasticsearch.hadoop.serialization.builder.JdkValueReader.byteValue(JdkValueReader.java:325)
    at org.elasticsearch.hadoop.serialization.builder.JdkValueReader.readValue(JdkValueReader.java:67)
    at org.elasticsearch.hadoop.serialization.ScrollReader.parseValue(ScrollReader.java:714)
    at org.elasticsearch.hadoop.serialization.ScrollReader.read(ScrollReader.java:699)
    ... 21 more

What I want is ...

I want to ignore malformed document which throws NumberFormat Exception and want to continue MapReduce.

What I did is ...

According to SO Answer, I surrounded Mapper.map() method with try-catch block. But it didn't help me.

Thanks.


(Costin Leau) #2

ES-Hadoop is not a mapper - rather in M/R is available as an Input/OutputFormat. The issue is not the mapper but rather the data that is sent to ES.
ES-Hadoop currently has no option to ignore errors as it is fail-fast - if something goes wrong, it bails out right away.
You can however filter the incorrect data before it reaches ES.


(system) #3