Getting error "Could not index event to Elasticsearch" in logstash?

Using Logstash version 7.4.3

logstash-filter-json plugin.

filter {
json {
source => "message"
}
}

logs are:-

{"Event":"SparkListenerJobStart","Job ID":1,"Submission Time":1640751467318,"Stage Infos":[{"Stage ID":1,"Stage Attempt ID":0,"Stage Name":"mapToPair at JavaNetworkWordCount.java:66","Number of Tasks":0,"RDD Info":[{"RDD ID":3,"Name":"MapPartitionsRDD","Scope":"{"id":"3_1640751467000","name":"map @ 09:47:47"}","Callsite":"mapToPair at JavaNetworkWordCount.java:66","Parent IDs":[2],"Storage Level":{"Use Disk":false,"Use Memory":false,"Deserialized":false,"Replication":1},"Barrier":false,"Number of Partitions":0,"Number of Cached Partitions":0,"Memory Size":0,"Disk Size":0},{"RDD ID":2,"Name":"MapPartitionsRDD","Scope":"{"id":"2_1640751467000","name":"flatMap @ 09:47:47"}","Callsite":"flatMap at JavaNetworkWordCount.java:65","Parent IDs":[1],"Storage Level":{"Use Disk":false,"Use Memory":false,"Deserialized":false,"Replication":1},"Barrier":false,"Number of Partitions":0,"Number of Cached Partitions":0,"Memory Size":0,"Disk Size":0},{"RDD ID":1,"Name":"BlockRDD","Scope":"{"id":"1_1640751467000","name":"socket text stream [0]\n@ 09:47:47"}","Callsite":"socketTextStream at JavaNetworkWordCount.java:63","Parent IDs":,"Storage Level":{"Use Disk":false,"Use Memory":false,"Deserialized":false,"Replication":1},"Barrier":false,"Number of Partitions":0,"Number of Cached Partitions":0,"Memory Size":0,"Disk Size":0}],"Parent IDs":,"Details":"org.apache.spark.streaming.api.java.AbstractJavaDStreamLike.mapToPair(JavaDStreamLike.scala:42)\norg.apache.spark.examples.streaming.JavaNetworkWordCount.main(JavaNetworkWordCount.java:66)\nsun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)\nsun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)\nsun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)\njava.lang.reflect.Method.invoke(Method.java:498)\norg.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)\norg.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:951)\norg.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:180)\norg.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203)\norg.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90)\norg.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1030)\norg.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1039)\norg.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)","Accumulables":,"Resource Profile Id":0},{"Stage ID":2,"Stage Attempt ID":0,"Stage Name":"print at JavaNetworkWordCount.java:69","Number of Tasks":1,"RDD Info":[{"RDD ID":4,"Name":"ShuffledRDD","Scope":"{"id":"4_1640751467000","name":"reduceByKey\n@ 09:47:47"}","Callsite":"reduceByKey at JavaNetworkWordCount.java:67","Parent IDs":[3],"Storage Level":{"Use Disk":false,"Use Memory":false,"Deserialized":false,"Replication":1},"Barrier":false,"Number of Partitions":7,"Number of Cached Partitions":0,"Memory Size":0,"Disk Size":0}],"Parent IDs":[1],"Details":"org.apache.spark.streaming.api.java.AbstractJavaDStreamLike.print(JavaDStreamLike.scala:42)\norg.apache.spark.examples.streaming.JavaNetworkWordCount.main(JavaNetworkWordCount.java:69)\nsun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)\nsun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)\nsun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)\njava.lang.reflect.Method.invoke(Method.java:498)\norg.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)\norg.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:951)\norg.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:180)\norg.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203)\norg.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90)\norg.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1030)\norg.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1039)\norg.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)","Accumulables":,"Resource Profile Id":0}],"Stage IDs":[1,2],"Properties":{"spark.checkpoint.checkpointAllMarkedAncestors":"true","spark.job.interruptOnCancel":"false","callSite.long":"org.apache.spark.streaming.api.java.AbstractJavaDStreamLike.print(JavaDStreamLike.scala:42)\norg.apache.spark.examples.streaming.JavaNetworkWordCount.main(JavaNetworkWordCount.java:69)\nsun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)\nsun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)\nsun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)\njava.lang.reflect.Method.invoke(Method.java:498)\norg.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)\norg.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:951)\norg.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:180)\norg.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203)\norg.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90)\norg.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1030)\norg.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1039)\norg.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)","spark.rdd.scope":"{"id":"5_1640751467000","name":"print @ 09:47:47"}","callSite.short":"print at JavaNetworkWordCount.java:69","spark.streaming.internal.outputOpId":"0","spark.rdd.scope.noOverride":"true","spark.job.description":"Streaming job from <a href="/streaming/batch/?id=1640751467000">[output operation 0, batch time 09:47:47]","spark.streaming.internal.batchTime":"1640751467000"}}

getting error in logstash.log file:-

[2023-05-10T17:23:24,179][WARN ][logstash.outputs.elasticsearch ][main] Could not index event to elasticsearch. {:status=>400, :action=>["index", {:_id=>nil, :_index=>"spark_json_tier-2023-05-10", :_type=>"_doc", :routing=>nil}, #LogStash::Event:0x4938756a], :response=>{"index"=>{"_index"=>"spark_json_tier-2023-05-10", "_type"=>"_doc", "_id"=>"gM-CBYgBsEObqebTRxWy", "status"=>400, "error"=>{"type"=>"illegal_argument_exception", "reason"=>"mapper [Properties.spark.rdd.scope] of different type, current_type [keyword], merged_type [ObjectMapper]"}}}}

Hi @talbehat,

Can you check the mapping of your index, particularly for the scope field? It looks like you have a value that doesn't match the expected field type.

From the above error and the log event you have shared it looks like your mapping is expecting text as the field is of type keyword, but the log extract shows that it is an object.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.