SYSLOG, convert string message to JSON format

Hello all,
Please allow me to declare that I am a newbie into logstash filtering (and in coding in general).
I am a systems / networks engineer trying to learn something new.
That being said, I have set up a 3node ELK cluster that runs perfectly. I have configured a remote system to send logs into my cluster via syslog, which are received consistently.
I am now facing the following challenge, which I do not seem to get a grasp of, so I kindly ask for you help.

First things first:
This is my logstash configuration file:

// located in /conf.d/logstash.conf
#Config Syslog
input {
tcp {
port => 5514
type => syslog
}
}

filter {
if [type] == "syslog" {
grok {
match => { "message" => "%{SYSLOGTIMESTAMP:syslog_timestamp} %{SYSLOGHOST:syslog_hostname} %{DATA:syslog_program}(?:[%{POSINT:syslog_pid}])?: %{GREEDYDATA:syslog_message}" }
add_field => [ "received_at", "%{@timestamp}" ]
add_field => [ "received_from", "%{host}" ]
}
date {
match => [ "syslog_timestamp", "MMM d HH:mm:ss.SSS", "MMM dd HH:mm:ss.SSS" ]
timezone => "Europe/Athens"
}
}
}

#Save Output in JSON format
filter {

json {
  source => "syslog_message"
}

}

#Save Output to Elastic
output {
elasticsearch { hosts => ["http://xxxxxx@FQDN1:9200", "http://xxxxxx@FQDN2:9200", "http://xxxxxx@FQDN3:9200"] }
stdout { codec => rubydebug }
}
// Config file end

The Problem:
As you can see below, the message I am receiving is formatted as a string, and not as json, making it harder for me to query. I would like the message content to be parsed as json.

This is how it looks now:

Any help will be appreciated.

Thank you all in advance
Jake

Hi Jake,

thank you for reaching out.

First of all I'd like to ask you to format your code before posting. Otherwise it'll be really difficult for us to read your posts and replicate your case. Thus, write your code (logstash .conf file, jsons or whatever) in whatever editor you want (Visual Code, Atom, Sublime ...), format and indent it properly, paste it here (leaving a newline before and after the code block), highlight the block of code (or json, or whatever it is) and click on the Preformatted Text icon (image ).

This is the right way to post code blocks:
image

This is the wrong one:

That being said, I see something strange in your pipeline. Why are there 2 filter sections? You can put your filter plugins one after the other in the same filter { } section.

Anyway, looking at your output, it seems to me it correctly parses the syslog_message, in fact, together with the whole message (stored in the syslog_message field), I can see also some other fields (like bytes_sent, product_title, plan_name ...), apparently extracted from the syslog_message.

What do you want to achieve exactly? Do you have any field tags: _jsonparsefailure?

Also, can you post the standard output of the following two pipelines?

input {
  tcp {
    port => 5514
    type => syslog
  }
}

filter {}

output {
  stdout{}
}

And

input {
  tcp {
    port => 5514
    type => syslog
  }
}

filter {
  json {
    source => "syslog_message"
  }
}

output {
  stdout{}
}

Hello Fabio, thank you for the prompt response.
The only reason there are 2 filter sections is clearly my ignorance; thank you for pointing that out.
What I want to achieve, is to get the syslog_message formatted as json, meaning I want to extract all fields (currently formatted as a continuous string) into different lines.

I do not get any _jsonparsefailure tags.

Here is the code:

    #Config Syslog
input {
  tcp {
    port => 5514
    type => syslog
  }
}

filter {
  if [type] == "syslog" {
    grok {
      match => { "message" => "%{SYSLOGTIMESTAMP:syslog_timestamp} %{SYSLOGHOST:syslog_hostname} %{DATA:syslog_program}(?:\[%{POSINT:syslog_pid}\])?: %{GREEDYDATA:syslog_message}" }
      add_field => [ "received_at", "%{@timestamp}" ]
      add_field => [ "received_from", "%{host}" ]
    }
    date {
      match => [ "syslog_timestamp", "MMM  d HH:mm:ss.SSS", "MMM dd HH:mm:ss.SSS" ]
      timezone => "Europe/Athens"
    }
  }
}

#Save Output in JSON format
filter {

    json {
      source => "syslog_message"
    }
}

#Save Output to Elastic
output {
  elasticsearch { hosts => ["http://xxxxxx@FQDN1:9200", "http://xxxxxx@FQDN2:9200", "http://xxxxxx@FQDN3:9200"] }
  stdout { codec => rubydebug }
}`

You did apologize, yet kept two filter sections in your code :sweat_smile:

Anyway, what I wanted to point out is that looking at the reponse in your Dev Tools, it does look correctly parsed.

If it wasn't, you would have found a _jsonparsefailure value in your tags field and, more importantly, you wouldn't have had fields like space_name, product_title, plan_name returned, which I do see here:

What does this query return?

GET /logstash/_search
{
  "query": {
    "term": {
      "bytes_sent": {
        "value": 17
      }
    }
  }
}

Also, can you post here (formatted) a JSON (the whole one, blurring the sensitive data of course) which is returned by this query

GET /logstash/_search
{
  "size": 1
}

Hello again,
I have 'corrected' the twin filter, so please see below both the config file, and the output:
As you can see, the 'syslog_message' and 'message' sections, are formatted as strings, and I would like to extract it in different fields.

#Config Syslog
input {
  tcp {
    port => 5514
    type => syslog
  }
}

filter {
  if [type] == "syslog" {
    grok {
      match => { "message" => "%{SYSLOGTIMESTAMP:syslog_timestamp} %{SYSLOGHOST:syslog_hostname} %{DATA:syslog_program}(?:\[%{POSINT:syslog_pid}\])?: %{GREEDYDATA:syslog_message}" }
      add_field => [ "received_at", "%{@timestamp}" ]
      add_field => [ "received_from", "%{host}" ]
    }
    date {
      match => [ "syslog_timestamp", "MMM  d HH:mm:ss.SSS", "MMM dd HH:mm:ss.SSS" ]
      timezone => "Europe/Athens"
    }
         json {
      source => "syslog_message"
    }
  }
}


#Save Output to Elastic
output {
  elasticsearch { hosts => ["http://xxxxx@FQDN1:9200", "http://xxxxx@FQDN2:9200", "xxxxx@FQDN3:9200"] }
  stdout { codec => rubydebug }
}




##Output
{
  "took" : 682,
  "timed_out" : false,
  "_shards" : {
    "total" : 1,
    "successful" : 1,
    "skipped" : 0,
    "failed" : 0
  },
  "hits" : {
    "total" : {
      "value" : 451,
      "relation" : "eq"
    },
    "max_score" : 1.0,
    "hits" : [
      {
        "_index" : "logstash",
        "_type" : "_doc",
        "_id" : "6qHvSnABi0VbevQbv57X",
        "_score" : 1.0,
        "_source" : {
          "received_from" : "_gateway",
          "@version" : "1",
          "api_id" : "5aa0df3ae4b07f0e184f3913",
          "client_id" : "",
          "api_version" : "1.0.0",
          "tags" : [
            "_geoip_lookup_failure"
          ],
          "developer_org_id" : "",
          "org_name" : "somename",
          "gateway_geoip" : { },
          "syslog_message" : """{"api_id":"5aa0df3ae4b07f0e184f3913","app_type":"PRODUCTION","bytes_received":0,"response_body":"","client_id":"","billing":{"amount":0,"provider":"none","currency":"USD","model":"free","trial_period_days":0},"datetime":"2020-02-15T22:19:50.923Z","time_to_serve_request":30,"uri_path":"/somepath","log_policy":"activity","endpoint_url":"N/A","host":"127.0.0.1","client_ip":"x.x.x.x","client_geoip":{},"request_protocol":"https","developer_org_id":"","transaction_id":"715743809","immediate_client_ip":"x.x.x.x","product_name":"N/A","product_title":"","plan_name":"N/A","tags":["_geoip_lookup_failure"],"catalog_id":"5a420f3fe4b033680ff5fb5d","space_name":[""],"api_name":"health-check-api","org_id":"5a3cb922e4b033680ff5fb28","status_code":"200 OK","request_method":"GET","developer_org_name":"","http_user_agent":"","@version":"1","response_http_headers":[],"org_name":"someorgname","latency_info":[{"task":"Start","started":0},{"task":"set-variable","started":22}],"headers":{"http__ws_haprt_wlmversion":"-1","http_via":"1.1 AQAAAATSnnM-","http_version":"HTTP/1.1","http_connection":"Keep-Alive","request_method":"POST","http_host":"localhost:9700","request_uri":"/_bulk","http_x_forwarded_server":"someremotehost","content_type":"text/plain","http_x_global_transaction_id":"34fbf1d05e486e8624ccf0e5","http_x_forwarded_host":"x.x.x.x:zzzz","http_x_forwarded_for":"x.x.x.x","request_path":"/_bulk","http_organization":"admin","http_x_client_ip":"127.0.0.1","content_length":"1111"},"catalog_name":"uat","debug":[],"rateLimit":{"rate-limit":{"limit":"-1","count":"-1"},"rate-limit-1":{"limit":"-1","count":"-1"},"rate-limit-2":{"limit":"-1","count":"-1"},"per-minute":{"limit":"-1","count":"-1"}},"api_version":"1.0.0","bytes_sent":7,"gateway_geoip":{},"app_name":"N/A","@timestamp":"2020-02-15T22:19:50.981Z","request_body":"","request_http_headers":[],"resource_id":"health-check-api:1.0.0:get:/healthcheck","gateway_ip":"x.x.x.x","space_id":[""],"plan_id":"","developer_org_title":"","query_string":[]}""",
          "request_http_headers" : [ ],
          "billing" : {
            "model" : "free",
            "trial_period_days" : 0,
            "provider" : "none",
            "currency" : "USD",
            "amount" : 0
          },
          "request_method" : "GET",
          "bytes_sent" : 7,
          "syslog_program" : "apimanagement[-]",
          "client_ip" : "x.x.x.x",
          "query_string" : [ ],
          "org_id" : "5a3cb922e4b033680ff5fb28",
          "http_user_agent" : "",
          "port" : 55047,
          "request_body" : "",
          "transaction_id" : "715743809",
          "space_name" : [
            ""
          ],
          "immediate_client_ip" : "x.x.x.x",
          "resource_id" : "health-check-api:1.0.0:get:/healthcheck",
          "time_to_serve_request" : 30,
          "received_at" : "2020-02-15T22:19:51.019Z",
          "debug" : [ ],
          "catalog_id" : "5a420f3fe4b033680ff5fb5d",
          "space_id" : [
            ""
          ],
          "request_protocol" : "https",
          "client_geoip" : { },
          "plan_name" : "N/A",
          "api_name" : "health-check-api",
          "headers" : {
            "http_organization" : "admin",
            "request_uri" : "/_bulk",
            "http__ws_haprt_wlmversion" : "-1",
            "http_x_forwarded_server" : "someremotehost",
            "http_version" : "HTTP/1.1",
            "content_length" : "1111",
            "http_host" : "localhost:9700",
            "request_path" : "/_bulk",
            "http_connection" : "Keep-Alive",
            "http_x_client_ip" : "127.0.0.1",
            "request_method" : "POST",
            "http_via" : "1.1 AQAAAATSnnM-",
            "http_x_global_transaction_id" : "34fbf1d05e486e8624ccf0e5",
            "http_x_forwarded_host" : "x.x.x.x:zzzz",
            "http_x_forwarded_for" : "x.x.x.x",
            "content_type" : "text/plain"
          },
          "host" : "127.0.0.1",
          "message" : """<102>Feb 15 22:19:50 somefqdn apimanagement[-]: {"api_id":"5aa0df3ae4b07f0e184f3913","app_type":"PRODUCTION","bytes_received":0,"response_body":"","client_id":"","somefunction":{"amount":0,"provider":"none","currency":"USD","model":"free","trial_period_days":0},"datetime":"2020-02-15T22:19:50.923Z","time_to_serve_request":30,"uri_path":"/somepath","log_policy":"activity","endpoint_url":"N/A","host":"127.0.0.1","client_ip":"x.x.x.x","client_geoip":{},"request_protocol":"https","developer_org_id":"","transaction_id":"715743809","immediate_client_ip":"x.x.x.x","product_name":"N/A","product_title":"","plan_name":"N/A","tags":["_geoip_lookup_failure"],"catalog_id":"5a420f3fe4b033680ff5fb5d","space_name":[""],"api_name":"health-check-api","org_id":"5a3cb922e4b033680ff5fb28","status_code":"200 OK","request_method":"GET","developer_org_name":"","http_user_agent":"","@version":"1","response_http_headers":[],"org_name":"somename","latency_info":[{"task":"Start","started":0},{"task":"set-variable","started":22}],"headers":{"http__ws_haprt_wlmversion":"-1","http_via":"1.1 AQAAAATSnnM-","http_version":"HTTP/1.1","http_connection":"Keep-Alive","request_method":"POST","http_host":"localhost:9700","request_uri":"/_bulk","http_x_forwarded_server":"somefqdn","content_type":"text/plain","http_x_global_transaction_id":"34fbf1d05e486e8624ccf0e5","http_x_forwarded_host":"10.110.165.13:9443","http_x_forwarded_for":"10.110.165.10","request_path":"/_bulk","http_organization":"admin","http_x_client_ip":"127.0.0.1","content_length":"1111"},"catalog_name":"uat","debug":[],"rateLimit":{"rate-limit":{"limit":"-1","count":"-1"},"rate-limit-1":{"limit":"-1","count":"-1"},"rate-limit-2":{"limit":"-1","count":"-1"},"per-minute":{"limit":"-1","count":"-1"}},"api_version":"1.0.0","bytes_sent":7,"gateway_geoip":{},"app_name":"N/A","@timestamp":"2020-02-15T22:19:50.981Z","request_body":"","request_http_headers":[],"resource_id":"health-check-api:1.0.0:get:/healthcheck","gateway_ip":"x.x.x.x","space_id":[""],"plan_id":"","developer_org_title":"","query_string":[]}""",
          "gateway_ip" : "x.x.x.x",
          "product_title" : "",
          "catalog_name" : "uat",
          "developer_org_name" : "",
          "app_type" : "PRODUCTION",
          "datetime" : "2020-02-15T22:19:50.923Z",
          "syslog_hostname" : "somehostname",
          "syslog_timestamp" : "Feb 15 22:19:50",
          "rateLimit" : {
            "rate-limit-2" : {
              "count" : "-1",
              "limit" : "-1"
            },
            "rate-limit-1" : {
              "count" : "-1",
              "limit" : "-1"
            },
            "rate-limit" : {
              "count" : "-1",
              "limit" : "-1"
            },
            "per-minute" : {
              "count" : "-1",
              "limit" : "-1"
            }
          },
          "response_body" : "",
          "status_code" : "200 OK",
          "plan_id" : "",
          "bytes_received" : 0,
          "developer_org_title" : "",
          "endpoint_url" : "N/A",
          "latency_info" : [
            {
              "started" : 0,
              "task" : "Start"
            },
            {
              "started" : 22,
              "task" : "set-variable"
            }
          ],
          "uri_path" : "somepath-uat/uat/healthcheck",
          "@timestamp" : "2020-02-15T22:19:50.981Z",
          "app_name" : "N/A",
          "product_name" : "N/A",
          "response_http_headers" : [ ],
          "log_policy" : "activity",
          "type" : "syslog"
        }
      },

Believe me mate, I'm struggling to understand what you want to achieve.

Apart from the fact that the JSON you posted as output is incomplete. Anyway, what I see there is the fields "stringified" inside the syslog_message correctly extracted.

For example, the first field of syslog_message, api_id, is correctly extracted some lines above:

What is your final goal? How do you want to use those data? What do you want to build?

I am sorry for not being clear;
What I wanted to achieve, was to get a better understanding of how filtering works.
I believed my code was to return the extracted values alone, and drop the 'stringified' part.
I thought that my code was mistakenly returning the 'stringified' part, but obviously the mistake here was my understanding of the code.
Thank you for your time Fabio, sorry for the conundrum.

No problem at all :grin:

I'm glad we finally understood each other.

If you want the whole message not to be returned, you simpy discard it with a remove_field filter.

After your json filter, you can put another filter like

mutate {
  remove_field => ["syslog_message", "message"]
}

Fabio,
Thank you very, very much.
You helped me to conceptually grasp the overall functionality.
Again, thank you for you time and valuable input.

1 Like

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.