How to parse JSON in syslog_msg to fields?

I have JSON data available as the value of "syslog_msg". How can I parse those data as field so that we can create dashboard based on the fields?

Here is Sample JSON data...
{"serverName":"0c9663d6-ca18-4ac2-7b5e-1cc9","eventComponent":"/cpa/orders/status/","eventName":"getAllStatus/Request","executionTime":"Thu Nov 01 02:16:23 UTC 2018","executedBy":"Komal","eventId":"2d372dfd-4a06-4189-804e-6c81ca166cc1","eventType":"API/Info","serverIp":"10.10.82.186","eventDetails":"/cpa/orders/status/"}

How are you ingesting this data into Elasticsearch?

We are just draining the PCF console log.

Apologies, I should've been a bit more specific with my question :smile:. Are you using Filebeat or Logstash? Or some other tool to ingest the log lines/events into Elasticsearch? Or have you written some custom code that is reading these log lines, creating Elasticsearch documents from them, and indexing them into Elasticsearch?

We are not using any logstash.

What we did...

  1. We are doing console log and yes we have customized the log-format(JSON) based on our need
  2. In Pivotal CF, we mentioned elasticsearch url as the syslog drain url

The result so far we achievied....

  1. All the console logs are shipped to elastic search and the outcomes we can see in Kibana
  2. All the logs(along with customized JSON log) are coming as the field "syslog_msg"

What we want to achieve...

  1. The JSON key to be treated as individual field in Kibana likewise "syslog_msg", "pid" etc.
  2. We can prepare our own dashboard in Kibana based on the field (JSON log key)

The question in our mind...

Do we need to have any specific configuration so that we can parse the JSON and the key-value pair in it can be treated as individual field and its value?

I don't know enough about Pivotal CF logging, but I think your best bet will be to setup an Elasticsearch Ingest pipeline. This allows you to setup a data processing pipeline that runs in Elasticsearch itself and pre-processes JSON documents after they enter the Elasticsearch cluster but right before they are indexed into any Elasticsearch index. You can read more about this feature here: https://www.elastic.co/guide/en/elasticsearch/reference/master/ingest.html.

{
"syslog5424_ver": 1,
"pid": "[APP/PROC/WEB/0]",
"@timestamp": "2018-11-01T13:15:55.578Z",
"priority": 14,
"syslog5425_len": 584,
"program": "990c27da-8eed-4872-bad5-0a8feab39a4a",
"host": "69.241.18.11",
"syslog_msg": "2018-11-01 13:15:33.911 INFO 24 --- [nio-8080-exec-6] c.s.a.c.InventoryOrderController : {"serverName":"0c9663d6-ca18-4ac2-7b5e-1cc9","eventComponent":"/cpa/orders/status/pending","eventName":"getOrdersByStatus/Request","executionTime":"Thu Nov 01 13:15:33 UTC 2018","executedBy":"enterprise","eventId":"b60fe79f-5884-4778-bdd5-c20e6a7afacd","eventType":"API/Info","serverIp":"10.10.82.186","eventDetails":"/cpa/orders/status/pending"}",
"logsource": "BSDSIGMADEV.API.InventoryOrderMicroService",
"@version": "1",
"syslog5424_msgid": "-"
}

How can we turn one of the key in syslog_msg (let's say "eventType") as a field?

@sghosh001c

I would use ingest node because you are sending your data directly to Elasticsearch.

Walkthrough

Here's a working solution that I tested on Elasticsearch 6.3.0.

This solution will work only if you can do three things:

  1. Set node.ingest: true in the elasticsearch.yml files your nodes (or at least one of them).
  2. Create an ingest pipeline (see Step 1 below).
  3. Reference the ID of your pipeline (see Step 2 below) as your data is sent from PCF to Elasticsearch.

Regarding (3), you said that "In Pivotal CF, we mentioned elasticsearch url as the syslog drain url." I haven't worked with PCF enough to judge if it's possible to reference a pipeline in that URL. If it isn't possible, then I would recommend directing your data from PCF to Logstash instead of to Elasticsearch, and using a Logstash pipeline instead of an ingest pipeline.

Assuming you can do these three things, let's take a look at how that would work.

Step 1: Create the ingest pipeline.

Let's create an ingest pipeline called pcf_pipeline. We'll apply three processors in this pipeline:

  1. We'll use the grok processor to extract the JSON object that's embedded in your syslog_msg string and put it in a new field called syslog_msg_payload.
  2. We'll use the json processor to parse our newly extracted syslog_msg_payload field into a JSON object.
  3. We'll use the remove processor to drop our temporary field syslog_msg_payload.

Query:

PUT _ingest/pipeline/pcf_pipeline
{
  "description": "PCF Pipeline",
  "processors": [
    {
      "grok": {
        "field": "syslog_msg",
        "patterns": [ "%{JSON:syslog_msg_payload}" ],
        "pattern_definitions": {
          "JSON": "{.*$"
        },
        "ignore_missing": true
      }
    },
    {
      "json": {
        "field": "syslog_msg_payload",
        "target_field": "syslog_msg_json"
      }
    },
    {
      "remove": {
        "field": "syslog_msg_payload"
      }
    }
  ]
}

Step 2: Index a document.

Let's put in a sample document. You must include the pipeline=pcf_pipeline parameter in your URL. Otherwise your ingest pipeline will not be triggered.

Query:

PUT pcf_index/_doc/1?pipeline=pcf_pipeline
{
  "syslog5424_ver": 1,
  "pid": "[APP/PROC/WEB/0]",
  "@timestamp": "2018-11-01T13:15:55.578Z",
  "priority": 14,
  "syslog5425_len": 584,
  "program": "990c27da-8eed-4872-bad5-0a8feab39a4a",
  "host": "69.241.18.11",
  "syslog_msg": "2018-11-01 13:15:33.911 INFO 24 --- [nio-8080-exec-6] c.s.a.c.InventoryOrderController : {\"serverName\":\"0c9663d6-ca18-4ac2-7b5e-1cc9\",\"eventComponent\":\"/cpa/orders/status/pending\",\"eventName\":\"getOrdersByStatus/Request\",\"executionTime\":\"Thu Nov 01 13:15:33 UTC 2018\",\"executedBy\":\"enterprise\",\"eventId\":\"b60fe79f-5884-4778-bdd5-c20e6a7afacd\",\"eventType\":\"API/Info\",\"serverIp\":\"10.10.82.186\",\"eventDetails\":\"/cpa/orders/status/pending\"}",
  "logsource": "BSDSIGMADEV.API.InventoryOrderMicroService",
  "@version": "1",
  "syslog5424_msgid": "-"
}

Step 3: View the indexed document.

Let's see what how our ingest pipeline affected the document. Notice that your embedded JSON string has been copied into structured fields, which you can now query directly. Success!

Query:

GET pcf_index/_doc/1

Response:

{
  "_index": "pcf_index",
  "_type": "_doc",
  "_id": "1",
  "_version": 4,
  "found": true,
  "_source": {
    "syslog_msg_json": {
      "executionTime": "Thu Nov 01 13:15:33 UTC 2018",
      "executedBy": "enterprise",
      "eventId": "b60fe79f-5884-4778-bdd5-c20e6a7afacd",
      "eventDetails": "/cpa/orders/status/pending",
      "serverName": "0c9663d6-ca18-4ac2-7b5e-1cc9",
      "eventName": "getOrdersByStatus/Request",
      "serverIp": "10.10.82.186",
      "eventComponent": "/cpa/orders/status/pending",
      "eventType": "API/Info"
    },
    "pid": "[APP/PROC/WEB/0]",
    "syslog5425_len": 584,
    "syslog5424_ver": 1,
    "program": "990c27da-8eed-4872-bad5-0a8feab39a4a",
    "priority": 14,
    "logsource": "BSDSIGMADEV.API.InventoryOrderMicroService",
    "syslog5424_msgid": "-",
    "@timestamp": "2018-11-01T13:15:55.578Z",
    "host": "69.241.18.11",
    "@version": "1",
    "syslog_msg": """2018-11-01 13:15:33.911 INFO 24 --- [nio-8080-exec-6] c.s.a.c.InventoryOrderController : {"serverName":"0c9663d6-ca18-4ac2-7b5e-1cc9","eventComponent":"/cpa/orders/status/pending","eventName":"getOrdersByStatus/Request","executionTime":"Thu Nov 01 13:15:33 UTC 2018","executedBy":"enterprise","eventId":"b60fe79f-5884-4778-bdd5-c20e6a7afacd","eventType":"API/Info","serverIp":"10.10.82.186","eventDetails":"/cpa/orders/status/pending"}"""
  }
}
1 Like

We are using below configuration for Logstash.

input {
tcp {
port => 24514
}
udp {
port => 24514
}
}
filter {
grok {
match => {
"message" => [
"%{NONNEGINT:syslog5425_len:int} <%{NONNEGINT:priority:int}>%{NONNEGINT:syslog5424_ver:int} +(?:%{TIMESTAMP_ISO8601:timestamp}|-) +(?:%{HOSTNAME:logsource}|-) +(?:%{NOTSPACE:program}|-) +(?:%{NOTSPACE:pid}|-) +(?:%{NOTSPACE:syslog5424_msgid}|-) +(?:%{SYSLOG5424SD:syslog5424_sd}|-) +%{GREEDYDATA:syslog_msg}",
"<%{NONNEGINT:priority:int}>(?:%{SYSLOGTIMESTAMP:timestamp}|%{TIMESTAMP_ISO8601:timestamp}) (?:%{SYSLOGFACILITY} )?%{SYSLOGHOST:logsource} %{SYSLOGPROG}: %{GREEDYDATA:syslog_msg}"
]
}
remove_field => ["message", "port", "timestamp"]
}
}
output {
elasticsearch {
hosts => [
"xx.xxx.xx.xx:9200" ]
index => "syslog24514-%{+YYYY.MM.dd}"
}
}

Can you please suggest the change required to achieve what we are expecting?

The following is our syslog...

2018-11-14T12:07:54.446-05:00 [APP/PROC/WEB/0] [OUT] 2018-11-14 17:07:54.444 INFO 25 --- [nio-8080-exec-3] c.s.a.c.InventoryOrderController : {"serverName":"XXXXXXXX","eventComponent":"XXXXXXXX","eventName":"XXXXXXXX","executionTime":"XXXXXXXX","executedBy":"XXXXXXXX","eventId":"XXXXXXXX","eventType":"API/Error","serverIp":"XXXXXXXX","eventDetails":"XXXXXXXX"}

We want to make "eventType", "eventId", "serverName", "serverComponent", "eventName", "serverIp" as the field in Kibana, so that we can do aggregate operations to build the dashboard.

@sghosh001c

Here's a working solution for a Logstash configuration that I tested on Logstash 6.3.0.

The filters in this configuration perform three steps:

  1. Grok filter extracts the JSON String, puts it in a temporary field called payload_raw
  2. Json filter parses the temporary payload_raw field, puts the parsed data in a field called "payload"
  3. Mutate filter removes the temporary payload_raw field (and other fields)

The advantage of this approach is that you don't need to know the structure of the JSON. It will parse everything for you.

Logstash configuration

input {
    tcp {
        port => 24514
    }
    udp {
        port => 24514
    }
}

filter {
    
    # Step 1. Extract the JSON String, put it in a temporary field called "payload_raw"
    # Docs: https://www.elastic.co/guide/en/logstash/current/plugins-filters-grok.html
    grok {
        match => {
            "message" => [ "%{JSON:payload_raw}" ]
        }
        pattern_definitions => {
            "JSON" => "{.*$"
        }
    }
    
    # Step 2. Parse the temporary "payload_raw" field, put the parsed data in a field called "payload"
    # Docs: https://www.elastic.co/guide/en/logstash/current/plugins-filters-json.html
    json {
        source => "payload_raw"
        target => "payload"
    }
    
    # Step 3. Remove the temporary "payload_raw" field (and other fields)
    # Docs: https://www.elastic.co/guide/en/logstash/current/plugins-filters-mutate.html
    mutate {
        remove_field => [ "payload_raw", "message", "port", "timestamp" ]
    }
}

output {
    elasticsearch {
        hosts => [ "xx.xxx.xx.xx:9200" ]
        index => "syslog24514-%{+YYYY.MM.dd}"
    }
}

Example input for payload_raw

2018-11-14T12:07:54.446-05:00 [APP/PROC/WEB/0] [OUT] 2018-11-14 17:07:54.444 INFO 25 --- [nio-8080-exec-3] c.s.a.c.InventoryOrderController : {"serverName":"serverNameValue","eventComponent":"eventComponentValue","eventName":"eventNameValue","executionTime":"executionTimeValue","executedBy":"executedByValue","eventId":"eventIdValue","eventType":"eventTypeValue","serverIp":"serverIpValue","eventDetails":"eventDetailsValue"}

Example output for payload

{
  "payload" => {
    "eventComponent": "eventComponentValue",
    "eventName": "eventNameValue",
    "serverName": "serverNameValue",
    "serverIp": "serverIpValue",
    "eventId": "eventIdValue",
    "eventDetails": "eventDetailsValue",
    "executionTime": "executionTimeValue",
    "eventType": "eventTypeValue",
    "executedBy": "executedByValue"
  }
}

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.