Using Logstash version 7.4.3
logstash-filter-json plugin.
filter {
json {
source => "message"
}
}
logs are:-
{"Event":"SparkListenerJobStart","Job ID":1,"Submission Time":1640751467318,"Stage Infos":[{"Stage ID":1,"Stage Attempt ID":0,"Stage Name":"mapToPair at JavaNetworkWordCount.java:66","Number of Tasks":0,"RDD Info":[{"RDD ID":3,"Name":"MapPartitionsRDD","Scope":"{"id":"3_1640751467000","name":"map @ 09:47:47"}","Callsite":"mapToPair at JavaNetworkWordCount.java:66","Parent IDs":[2],"Storage Level":{"Use Disk":false,"Use Memory":false,"Deserialized":false,"Replication":1},"Barrier":false,"Number of Partitions":0,"Number of Cached Partitions":0,"Memory Size":0,"Disk Size":0},{"RDD ID":2,"Name":"MapPartitionsRDD","Scope":"{"id":"2_1640751467000","name":"flatMap @ 09:47:47"}","Callsite":"flatMap at JavaNetworkWordCount.java:65","Parent IDs":[1],"Storage Level":{"Use Disk":false,"Use Memory":false,"Deserialized":false,"Replication":1},"Barrier":false,"Number of Partitions":0,"Number of Cached Partitions":0,"Memory Size":0,"Disk Size":0},{"RDD ID":1,"Name":"BlockRDD","Scope":"{"id":"1_1640751467000","name":"socket text stream [0]\n@ 09:47:47"}","Callsite":"socketTextStream at JavaNetworkWordCount.java:63","Parent IDs":,"Storage Level":{"Use Disk":false,"Use Memory":false,"Deserialized":false,"Replication":1},"Barrier":false,"Number of Partitions":0,"Number of Cached Partitions":0,"Memory Size":0,"Disk Size":0}],"Parent IDs":,"Details":"org.apache.spark.streaming.api.java.AbstractJavaDStreamLike.mapToPair(JavaDStreamLike.scala:42)\norg.apache.spark.examples.streaming.JavaNetworkWordCount.main(JavaNetworkWordCount.java:66)\nsun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)\nsun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)\nsun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)\njava.lang.reflect.Method.invoke(Method.java:498)\norg.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)\norg.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:951)\norg.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:180)\norg.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203)\norg.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90)\norg.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1030)\norg.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1039)\norg.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)","Accumulables":,"Resource Profile Id":0},{"Stage ID":2,"Stage Attempt ID":0,"Stage Name":"print at JavaNetworkWordCount.java:69","Number of Tasks":1,"RDD Info":[{"RDD ID":4,"Name":"ShuffledRDD","Scope":"{"id":"4_1640751467000","name":"reduceByKey\n@ 09:47:47"}","Callsite":"reduceByKey at JavaNetworkWordCount.java:67","Parent IDs":[3],"Storage Level":{"Use Disk":false,"Use Memory":false,"Deserialized":false,"Replication":1},"Barrier":false,"Number of Partitions":7,"Number of Cached Partitions":0,"Memory Size":0,"Disk Size":0}],"Parent IDs":[1],"Details":"org.apache.spark.streaming.api.java.AbstractJavaDStreamLike.print(JavaDStreamLike.scala:42)\norg.apache.spark.examples.streaming.JavaNetworkWordCount.main(JavaNetworkWordCount.java:69)\nsun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)\nsun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)\nsun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)\njava.lang.reflect.Method.invoke(Method.java:498)\norg.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)\norg.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:951)\norg.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:180)\norg.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203)\norg.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90)\norg.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1030)\norg.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1039)\norg.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)","Accumulables":,"Resource Profile Id":0}],"Stage IDs":[1,2],"Properties":{"spark.checkpoint.checkpointAllMarkedAncestors":"true","spark.job.interruptOnCancel":"false","callSite.long":"org.apache.spark.streaming.api.java.AbstractJavaDStreamLike.print(JavaDStreamLike.scala:42)\norg.apache.spark.examples.streaming.JavaNetworkWordCount.main(JavaNetworkWordCount.java:69)\nsun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)\nsun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)\nsun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)\njava.lang.reflect.Method.invoke(Method.java:498)\norg.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)\norg.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:951)\norg.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:180)\norg.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203)\norg.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90)\norg.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1030)\norg.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1039)\norg.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)","spark.rdd.scope":"{"id":"5_1640751467000","name":"print @ 09:47:47"}","callSite.short":"print at JavaNetworkWordCount.java:69","spark.streaming.internal.outputOpId":"0","spark.rdd.scope.noOverride":"true","spark.job.description":"Streaming job from <a href="/streaming/batch/?id=1640751467000">[output operation 0, batch time 09:47:47]","spark.streaming.internal.batchTime":"1640751467000"}}
getting error in logstash.log file:-
[2023-05-10T17:23:24,179][WARN ][logstash.outputs.elasticsearch ][main] Could not index event to elasticsearch. {:status=>400, :action=>["index", {:_id=>nil, :_index=>"spark_json_tier-2023-05-10", :_type=>"_doc", :routing=>nil}, #LogStash::Event:0x4938756a], :response=>{"index"=>{"_index"=>"spark_json_tier-2023-05-10", "_type"=>"_doc", "_id"=>"gM-CBYgBsEObqebTRxWy", "status"=>400, "error"=>{"type"=>"illegal_argument_exception", "reason"=>"mapper [Properties.spark.rdd.scope] of different type, current_type [keyword], merged_type [ObjectMapper]"}}}}