How to convert the Logstash message to fileds

Hi,
I am using Logstash as a syslog server which sends data to elastic. here is the output.

@timestampJul 7, 2023 @ 11:30:12.520@version1 hostname10.11.12.13 message {"proxyname":"test-123-abc","revision":"8","latency":20,"startTimestamp":1688751011776,"endTimestamp":1688751011776,"verb":"GET","cn":"","ip":"10.11.12.13","url":"https://mocktarget.abc.net/abc-def-proxy","msgTimestamp":1688751012505,"organizationName":"ovx-maxo-rdb","Environment":"qa-int-01"}

am trying to convert the key(proxyname, revision, url, etc.) under message as a field so we can create useful dashboard and searches in elastic.

I tried mutate or add field but not sure how to bring those value under the field. Any help will be great!

Thanks

That looks like JSON to me, so use a json filter.

Thanks, but as per the documenation

`

This is a JSON parsing filter. It takes an existing field which contains JSON and expands it into an actual data structure within the Logstash event.

`

but inside the message, i have key value pair not the field. so, same way we can convert string keys to the field ?

I have tried this.


filter {
json {
  source => "message"
}
}

but getting the following error.


[2023-07-07T15:38:51,613][WARN ][logstash.filters.json    ][main][58dsdsdwed3ce66bf5fcee0c960c82af3b5e8360873624b049ba5723846c1] Error parsing json {:source=>"message", :raw=>"1106 <14>1 2023-07-07T19:38:51.369709+00:00


#<LogStash::Json::ParserError: Unexpected character ('<' (code 60)): expected a valid value (number, String, array, object, 'true', 'false' or 'null')
 at [Source: (byte[])"1106 <14>1 2023-07-07T19:38:51.369709+00:00 ABC.def.ghi.k-v4-ao-uat-sdsdsdg-1212-4e7e-a34d-adasdada[APP/PROC/WEB/0] - [tags@47450 app_id="d415ce79-10b7-4e7e-a34d-f397bdsawew81a

What does an example event look like? Use output { stdout { codec => rubydebug } }.

What about this?

filter {
  grok {
    match => { "message" => ["%{TIMESTAMP_ISO8601:@timestamp}%{SPACE}%{DATA:@version}%{SPACE}%{HOSTNAME:hostname}%{SPACE}%{IPV4:ip}%{SPACE}%{GREEDYDATA:message}" ] }
    overwrite => [ "message" ]
  }
  
  json {
    source => "message"
    target => "log_fields"
  }
  
  mutate {
    remove_field => [ "message" ]
  }
}

Thanks for the reply.

It is showing parse failure.

Jul 9, 2023 @ 20:44:17.862 @timestampJul 9, 2023 @ 20:44:17.862@version1host10.17.12.175 tags_grokparse failure,_jsonparsefailure type abc-syslog _idmzmveqeqqwqws0Ou4tyWsV_index abc-syslog-8-2023.07.10_score -

Can you do what Badger said, show as output { stdout { codec => rubydebug } }

If this is sent by syslog, should be separated with comma or similar:
@timestampJul 7, 2023 @ 11:30:12.520@version1 hostname10.11.12.13 message

Do you need only fields inside the message? (From proxyname to Environment)
If do, you need only this:

filter {
    grok { match => { "message" => ["message\s*%{GREEDYDATA:[@metadata][msg]}" ] } }
    json { source => "[@metadata][msg]" }
}

Result:

{
      "startTimestamp" => 1688751011776,
        "msgTimestamp" => 1688751012505,
           "proxyname" => "test-123-abc",
                 "url" => "https://mocktarget.abc.net/abc-def-proxy",
         "Environment" => "qa-int-01",
    "organizationName" => "ovx-maxo-rdb",
                  "ip" => "10.11.12.13",
        "endTimestamp" => 1688751011776,
             "latency" => 20,
                "verb" => "GET",
            "revision" => "8",
          "@timestamp" => 2023-07-10T06:45:47.148823700Z,
                  "cn" => ""
}

Thanks Rios. Yes I tried rubydebug as well. but still the same.

here is my Logstash config

input{
  udp {
    port => someport
    id => "some-syslog001"
    type => "some-syslog-msg"
    add_field => { "[@metadata][type]" => "some-syslog-msg" }
    add_field => { "[@metadata][output]" => "index" }
    workers => 10
    queue_size => 1000
    #ecs_compatibility => disabled
  }
}

filter {
    grok { match => { "message" => ["message\s*%{GREEDYDATA:[@metadata][msg]}" ] } }
    json { source => "[@metadata][msg]" }
}

output {
    stdout {
        codec => rubydebug
        }
    elasticsearch {
      user => logstash
      password => logstash
      hosts => ['https://elk1:9200','https://elk29200','https://elk3:9200']
      ssl_certificate_verification => false
      manage_template => false
      data_stream => false
      index => "%{[@metadata][index_name]}-8-%{+YYYY.MM.dd}"
    }
 }

and , its still not showing messages values as field in elastic.

tail -f /var/log/logstash/logstash-plain.log is showing following

Error parsing json {:source=>"[@metadata][msg]",

~
~

What does an event look like when it is output through the rubydebug codec?

:
Something like this:

[2023-07-10T13:39:12,132][WARN ][logstash.filters.json ][main][a871e8d442cc100e09c7a231768cf15707d7055b0522c9fb6fd652b249dabdb3] Error parsing json {:source=>"[@metadata][msg]", :raw=>"":"This consumer is idempotent and the file has been consumed before matching idempotentKey: amc_kafbb_def_20230621.data. Will skip this file: RemoteFile[amc_kafbb_def_20230621.data]","logger_name":"org.apache.camel.component.file.remote.SftpConsumer","thread_name":"Camel (camel-1) thread #2 - sftp:/id@server/some/files/name/abc/100/daily/node","level":"TRACE","level_value":5000,"appName":"some-app-name","applicationCode":"Marvel"}", :exception=>#<LogStash::Json::ParserError: Unrecognized token 'This': was expecting ('true', 'false' or 'null')
at [Source: (byte)"":"This consumer is idempotent and the file has been consumed before matching idempotentKey: pda_kafbb_mtl_20230621.data. Will skip this file: RemoteFile[amc_kafbb_def_20230621.data]","logger_name":"org.apache.camel.component.file.remote.SftpConsumer"

I think, some values which syslog is sending has space and some special characters( like @, space, url forwarded slashes etc) in it. which is causing the json parse failure I believe.

[msg]", :raw=>"\":\"This consumer is idempotent and the file has been consumed before matching idempotentKey: abc_def_krl_20230523.data.
:exception=>#<LogStash::Json::ParserError: Unrecognized token 'This': was expecting ('true', 'false' or 'null')

[logstash.filters.json    ][main][5nmbadada-sds4264cb43a66a1224b8e440ccf439abd5626a] Error parsing json {:source=>"[@metadata][msg]", :raw=>"\":\"Call KafkaService.doPoll\",\"logger_name\":\"com.def.events.service.kafka.KafkaService\",\"thread_name\":\"Camel-camel-1-somealerts C-0\",\"level\":\"DEBUG\",\"level_value\":10000,\"Name\":\"some-alert-for-team\",\"app":\"marvel\"}", :exception=>#<LogStash::Json::ParserError: Unrecognized token 'Call': was expecting ('true', 'false' or 'null')

Is there a way while parsing json, we can drop some data which have either key or value is missing ?

The message that was grokked into [@metadata][msg]

This consumer is idempotent and the file has been consumed before matching idempotentKey: abc_def_krl_20230523.data.

is not JSON. A json filter is going to raise an exception if you ask it to parse something that is not JSON. If only a subset of your syslog messages contain JSON then you can either try to parse them all and accept that you will get many exceptions, or try to write conditionals that determine whether or note [@metadata][msg] looks like JSON.

Thanks guys for your valuable input! I think I am almost there. just need to filter out the this value "\n \u0000"

[2023-07-11T15:46:45,917][WARN ][logstash.filters.json    ][main][1543fdc4018abd383f8789d4050a7904179800c6a9bd317f664c8ebaa5416a91] Error parsing json {:source=>"message", :raw=>"\n            {\"proxyname\":\"test-abc-def\",\"revision\":\"8\",\"latency\":31,\"startTimestamp\":1689104805142,\"endTimestamp\":1689104805142,\"verb\":\"GET\",\"cn\":\"\",\"ip\":\"10.11.12.13\",\"url\":\"https://abc.def.ghi/jkl-mno-pqr\",\"msgTimestamp\":1689104805475,\"organizationName\":\"someorg-def\",\"apigeeEnvironment\":\"dev\"}\n        \u0000", :exception=>#<LogStash::Json::ParserError: Illegal character ((CTRL-CHAR, code 0)): only regular white space (\r, \n, \t) is allowed between tokens
 at [Source: (byte[])"

I am trying the below code.


if "_jsonparsefailure" in [tags] {
    ruby { code => 'event.set("message", event.get("message").gsub(" \u0001".encode("utf-8"),  ""))' }
    json { source => "message" }

}

but still its not removing. any suggestion ?

Thanks!

I just mutated the field before json parse and it fixed the issue.

here is the code.

filter {
      mutate {
        gsub => [
          "message", "[\\]u0000", ""
        ]
      }

  if [@metadata][type] == "yourLogName" {
    json {
      source => "message"
    }
  }


  if [@metadata][type] == "yourLogName" and [message] {
    ruby { code => 'case event.get("[message]")
      when String
        event.set("messageType", "String")
      when Array
        event.set("messageType", "Array")
      else
        event.set("messageType", "Other")
      end'
    }

    if [messageType] == "Array" {
      ruby {
        code => "
          message = event.get('message')
          new_message = []
          new_event = ''
          message.each { |item|
              if item.start_with?('(tag=')
                  if new_event != ''
                      new_message.push(new_event)
                  end
                  new_event = item;
              else
                  new_event += item;
              end
          }
          if new_event != ''
              new_message.push(new_event)
          end
          if (new_message.size > 0)
              event.set('message', new_message)
          end
        "
      }
      split {
        field => "message"
      }
    }

if "_jsonparsefailure" in [tags] {
    ruby { code => 'event.set("message", event.get("message").gsub("\u0000".encode("utf-8"),  ""))' }
    json { source => "message" }

}    
if "_jsonparsefailure" in [tags] {
    ruby { code => 'event.set("message", event.get("message").gsub("\n".encode("utf-8"),  ""))' }
    json { source => "message" }

}

mutate
{
     remove_field => [ "message" ]
}

    mutate {
      rename => {"host" => "hostname"}
    }


  }
}

Thanks all for your help.

1 Like

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.