Parse stringified json from syslog 3164 events

Hi,

I'm pretty new to the elastic stack so please excuse any newbie-level ignorance.

Im shipping data from my splunk environment to logstash using the methods detailed in the Splunk Forward data to third-party systems. documentation.

Ive got the two systems talking to each other, and i can see events from splunk landing in logstash. The challenge that i am having is that the events coming from splunk contain stringified json that doesnt appear to parse correctly.

Using the file output plugin i can see the contents of the messages

{
  "service": {
    "type": "system"
  },
  "type": "aws_ecs",
  "@version": "1",
  "message": "<13> ip-XXX-XXX-XXX-XXX.ap-southeast-2.compute.internal {\"line\":{\"app\":\"myapp\",\"ts\":\"2024-09-02T03:33:38.344Z\",\"traceId\":\"0f5dd515a04c33f3\",\"clientId\":\"client001\",\"instance\":\"ip-XXX-XXX-XXX-XXX.ap-southeast-2.compute.internal\",\"billing_event_sent\":\"false\",\"service_access_id\":\"my_special_service_id\",\"request_method\":\"GET\",\"subjectId\":\"\",\"remote_host\":\"XXX.XXX.XXX.XXX\",\"execution_time\":\"6\",\"spanId\":\"0f5dd515a04c33f3\",\"uri_path\":\"/v1/my-endpoint\",\"uri_query\":\"itemIds=50597529\\u0026excludeFields=digitalAssets%2CobjectMetadata\",\"X-ID\":\"9ca579d6-dfdf-4469-a310-a81149529599\",\"status\":\"200\",\"msg\":\"usage_start=2024-09-02T03:33:38.337918501,client_id:client001,service_access_id:my_special_service_id\",\"env\":\"uat\",\"logger_version\":\"2\"},\"source\":\"stdout\",\"tag\":\"uat/5ab01221f3374d01af0ab76792db5e79-2470140894/default\",\"attrs\":{\"AppVersion\":\"1.1.0-1383039-6fb8d1dd\",\"CdktfVersion\":\"1.16.4\"}}\n",
  "tags": [
    "src:splunk",
    "sourcetype:aws_ecs",
    "_grokparsefailure_sysloginput"
  ],
  "log": {
    "syslog": {
      "facility": {
        "name": "kernel",
        "code": 0
      },
      "severity": {
        "name": "Emergency",
        "code": 0
      },
      "priority": 0
    }
  },
  "event": {
    "original":  "<13> ip-XXX-XXX-XXX-XXX.ap-southeast-2.compute.internal {\"line\":{\"app\":\"myapp\",\"ts\":\"2024-09-02T03:33:38.344Z\",\"traceId\":\"0f5dd515a04c33f3\",\"clientId\":\"client001\",\"instance\":\"ip-XXX-XXX-XXX-XXX.ap-southeast-2.compute.internal\",\"billing_event_sent\":\"false\",\"service_access_id\":\"my_special_service_id\",\"request_method\":\"GET\",\"subjectId\":\"\",\"remote_host\":\"XXX.XXX.XXX.XXX\",\"execution_time\":\"6\",\"spanId\":\"0f5dd515a04c33f3\",\"uri_path\":\"/v1/my-endpoint\",\"uri_query\":\"itemIds=50597529\\u0026excludeFields=digitalAssets%2CobjectMetadata\",\"X-ID\":\"9ca579d6-dfdf-4469-a310-a81149529599\",\"status\":\"200\",\"msg\":\"usage_start=2024-09-02T03:33:38.337918501,client_id:client001,service_access_id:my_special_service_id\",\"env\":\"uat\",\"logger_version\":\"2\"},\"source\":\"stdout\",\"tag\":\"uat/5ab01221f3374d01af0ab76792db5e79-2470140894/default\",\"attrs\":{\"AppVersion\":\"1.1.0-1383039-6fb8d1dd\",\"CdktfVersion\":\"1.16.4\"}}\n"
  },
  "host": {
    "ip": "XX.XX.XX.229"
  },
  "@timestamp": "2024-09-02T03:33:49.730817594Z"
}

Ideally, i'd like to throw-away the syslog fields and just keep the json that is the original message, but having problems figuring out how to do this.

My config currently looks like this (ive tried a few things that ive found on around here on the forums with no success, so figure its better to ask than keep failing with no real idea why):

input {
  syslog {
    id => "splunk-syslog"
    port => 5514
    type => "syslog-splunk"
  }

  syslog {
    id => "sourcetype:aws_ecs"
    port => 51414
    tags => ["src:splunk", "sourcetype:aws_ecs", "source:my-source-api"]
    type => "aws_ecs"
  }
}

filter {
  if ([type] == "aws:ecs") {
    grok {
      match => { "message" => "%{SYSLOGHOST:syslog_hostname} %{GREEDYDATA:syslog_message}" }
      add_field => [ "received_at", "%{@timestamp}" ]
      add_field => [ "received_from", "%{host}" ]
    }
    grok {
      overwrite => ["syslog_message"]
    }
    json {
      source => "message"
    }
  }
}


output {
    stdout {}
    if [type] == "aws_ecs" {
      file {
        path => "/var/log/logstash/debug/aws_ecs.log"
        file_mode => 0660
        codec => json_lines
      }
    } else {
      file {
        path => "/var/log/logstash/debug/undefined.log"
        file_mode => 0660
        codec => json_lines
      }
    }
}

can someone please help me figure out what i am doing wrong here ?

Welcome to the community!

Everyone started as newbie, don't worry. Read, test, ask, learn more...

This is not OK. You have:

  • type => "syslog-splunk" and
  • type => "aws_ecs"
    It should be:
    if ([type] == "aws_ecs") {
    ...
    or
    if ([id] == "sourcetype:aws_ecs") {
    ...
    or
    if "src:splunk" in [tags]
    ...

Furthermore, not sure how do you get the host field.
add_field => [ "received_from", "%{host}" ]
Is it from the message or you want LS field which is in [host][hostname].

Try to use in the output for more details.
stdout { codec => rubydebug }

If still you have problems, provide the raw messages from both source how the look inside LS.

1 Like

yep, this was it! thanks for this. just needed another set of eyes to tell me where i was going wrong.

after fixing the conditional and switching to the rubydebug codec i was able to identify my issues and get it sorted.

thanks a heap!

1 Like

That is benefit Elastic community, help to solve an issue quickly. Sometimes is a tip, sometimes is more. Thank you also for any feedback, it's valuable for the community.

While on the subject, do not use rubydebug in the production, can take a lot of resources.