How to Parse JSON Input using Filter Plugins?

Team,

I am New to Elastic Stack and need your help to setup the same in my environment.

Setup : We are planning to disable Stackdriver logging in our Google Cloud Platform and instead we want to use ELK to display these logs for us.

Input : Google Pub/Sub --> We have created Sinks/Exports through which we are sending logs to the Logstash.
I have used Google Pub/Sub Input Plugin for my Logstash and is as below

input {
    google_pubsub {
        project_id => "mobile"
        topic => "elk"
        subscription => "elk"
    }
}

Stackdriver sends Input as JSON Format to the Logstash and I can see the message Field as Below :

LOG TYPE 1 :

{"insertId":"10d40fnf47vfwr","labels":{"compute.googleapis.com/resource_name":"fluentd-gcp-v3.1.0-6qrhl","container.googleapis.com/namespace_name":"a-dev","container.googleapis.com/pod_name":"a-28-59d8c67bf9-6mxv9","container.googleapis.com/stream":"stdout"},"logName":"projects/mobile/logs/a-springboot","receiveTimestamp":"2019-02-22T18:24:20.827627762Z","resource":{"labels":{"cluster_name":"mobile","container_name":"a-springboot","instance_id":"94838654749378","namespace_id":"a-dev","pod_id":"a-28-59d8c67bf9-6mxv9","project_id":"mobile","zone":"us-central1-b"},"type":"container"},"severity":"INFO","textPayload":%IP - - [25/Feb/2019:12:48:14 +0000] "GET /health HTTP/1.1" 200 - 7 "http://101.34.0.12:80/health" "kube-probe/1.11+" "-" Correlation-ID="-" x-channel="-"}

LOG TYPE 2 :

{"insertId":"10d40fnf47vfwr","labels":{"compute.googleapis.com/resource_name":"fluentd-gcp-v3.1.0-6qrhl","container.googleapis.com/namespace_name":"a-dev","container.googleapis.com/pod_name":"a-28-59d8c67bf9-6mxv9","container.googleapis.com/stream":"stdout"},"logName":"projects/mobile/logs/a-springboot","receiveTimestamp":"2019-02-22T18:24:20.827627762Z","resource":{"labels":{"cluster_name":"mobile","container_name":"a-springboot","instance_id":"94838654749378","namespace_id":"a-dev","pod_id":"a-28-59d8c67bf9-6mxv9","project_id":"mobile","zone":"us-central1-b"},"type":"container"},"severity":"INFO","textPayload":"51038.335: [CMS-concurrent-abortable-preclean: 0.000/0.000 secs] [Times: user=0.00 sys=0.00, real=0.00 secs] \n","timestamp":"2019-02-23T05:59:14Z"}

How Can I break this down so that I can get all the fields ? For example :
"insertId":"10d40fnf47vfwr"
"receiveTimestamp":"2019-02-22T18:24:20.827627762Z"

If not, how is it Possible to only break text payload field and parse multiple log formats ? if you see above I have 2 different Payload with 2 different formats.. Like this I have multiple formats when it comes to payload

Also, when it comes to text payload, we have different formats of the logs.. so is it possible to match ?

My Output looks like below on the Terminal :

{
        "output" => {
                 "logName" => "projects/mobile/logs/p-springboot",
        "receiveTimestamp" => "2019-02-23T00:15:04.209195344Z",
                "severity" => "ERROR",
                "resource" => {
            "labels" => {
                    "project_id" => "mobile",
                          "zone" => "us-central1-a",
                   "instance_id" => "1695785233915576357",
                "container_name" => "p-springboot",
                  "cluster_name" => "mobile",
                  "namespace_id" => "p-wcs-dev",
                        "pod_id" => "p-2-6458688d74-8hb4b"
            },
              "type" => "container"
        },
             "textPayload" => "    for the 1st parameter of com.kohls.cbp.routes.WebsocketRouter.<init>(WebsocketRouter.scala:11)\n",
               "timestamp" => "2019-02-23T00:14:59Z",
                  "labels" => {
            "container.googleapis.com/namespace_name" => "p-wcs-dev",
                    "container.googleapis.com/stream" => "stderr",
               "compute.googleapis.com/resource_name" => "fluentd-gcp-v3.1.0-dvvsb",
                  "container.googleapis.com/pod_name" => "p-2-6458688d74-8hb4b"
        },
                "insertId" => "148zvqf5j9tjh"
    },
    "@timestamp" => 2019-02-25T11:57:00.514Z,
       "message" => "{"insertId":"10d40fnf47vfwr","labels":{"compute.googleapis.com/resource_name":"fluentd-gcp-v3.1.0-6qrhl","container.googleapis.com/namespace_name":"a-dev","container.googleapis.com/pod_name":"a-28-59d8c67bf9-6mxv9","container.googleapis.com/stream":"stdout"},"logName":"projects/mobile/logs/a-springboot","receiveTimestamp":"2019-02-22T18:24:20.827627762Z","resource":{"labels":{"cluster_name":"mobile","container_name":"a-springboot","instance_id":"94838654749378","namespace_id":"a-dev","pod_id":"a-28-59d8c67bf9-6mxv9","project_id":"mobile","zone":"us-central1-b"},"type":"container"},"severity":"INFO","textPayload":for the 1st parameter of com.kohls.cbp.routes.WebsocketRouter.<init>(WebsocketRouter.scala:11)
}

My Logstash Configuration is as below :

input {
    google_pubsub {
        project_id => "mobile"
        topic => "elk"
        subscription => "elk"
    }
}

filter {
        json {
            source => "message"
	    target => "output"

	    remove_field => [ "@version" ]
	    remove_field => [ "host" ]

        }
    
}

output { stdout { codec => rubydebug } }

=========================================================================

I have tried below filters to parse the JSON Input for Message Field but no luck.. Need Help

filter {     

   grok {
            match => { "message" => "%{GREEDYDATA:request}"}        
        }   

    json{
        source => "request"
        target => "parsedJson"
    }   
    mutate {
    add_field => {
      "insertId" => "%{[parsedJson][insertId]}"
    }
  }
}

If you want to copy one field to another then I recommend using a mutate+copy filter.

You have shown that when the message is valid JSON then the json filter is parsing it. I do not understand what issue you are having.

Hello @Badger : Thanks for replying on my query.

I want to know how to apply a filter to TextPayload field which is an attribute in my Output.

let's say I get my Payload as below, then how do I apply filter to this specific textpayload again so That I get the Status code and other stuffs.. ??

"[25/Feb/2019:13:46:47 +0000] 10.255.4.149 TRUE-CLIENT-IP=- CORRELATION-ID=- URL="GET /health HTTP/1.1" 200 200 1 USER-AGENT="kube-probe/1.11+"

If it is always that format then something like

    dissect { mapping => { "message" => '"[%{ts}] %{ip} TRUE-CLIENT-IP=%{tcip} CORRELATION-ID=%{cid} URL="%{method} %{uri} %{version}" %{status1} %{status2} %{somethingelse} USER-AGENT="%{agent}"%{}' } }

If the number of, or order of, the key=value pairs is variable then it would be more complicated.

If the key=value pairs are variable you could try something like

filter {
    dissect { mapping => { "message" => '[%{ts}] %{ip} %{}"%{}' } }
    mutate { gsub => [ "message", "^\[[^\]]+\] [^ ]+ ", "" ] }
    kv {}
    mutate { gsub => [ "message", '(^| )[^= ]+="[^"]+"( |$)', " " ] }
    mutate { gsub => [ "message", "(^| )[^= ]+=[^ ]+( |$)", " " ] }
    mutate { gsub => [ "message", "(^| )[^= ]+=[^ ]+( |$)", " " ] } # Yes, duplicate is  required
}

That dissects the timestamp and ip, then removes them. Then it parses all the key=value pairs. Then it removes the key="quoted string" items, and then removes the key=value items. You are just left with " 200 200 1 " which you could parse using dissect or grok.

The fact that the gsub has to be repeated suggests to me that this may be a little fragile.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.