Logstash parsing of statistics log file

Below is my input log file format.

19-01-01 Name Succ Fail Timeout Reject Retransmission Duplicate Thrput Response time (ms)
Detected Answered (/s) Avg Min Max
07:52:44 DCIPDiameter 0 0 0 0 0 0 0 - - -
ProviderGet 0 0 0 0 0 0 0 - - -
ProviderUpdate 0 0 0 0 0 0 0 - - -
ProviderCleanup 0 0 0 0 0 0 0 - - -
07:52:54 DCIPDiameter 0 0 0 0 0 0 0 - - -
ProviderGet 0 0 0 0 0 0 0 - - -
ProviderUpdate 0 0 0 0 0 0 0 - - -
ProviderCleanup 0 0 0 0 0 0 0 - - -

I am able to parse the input using below grok debugger. But how can I populate the timefield from previous line till it hits another time field.

%{TIME:Time}\s+%{WORD:Name}\s*+%{NUMBER:Success}\s*+%{NUMBER:Fail}\s*+%{NUMBER:Timeout}\s*+%{NUMBER:Reject}\s*+%{NUMBER:Retransmission_detected}\s*+%{NUMBER:Duplicate_answered}\s*+%{NUMBER:Thrput}\s*+%{NUMBER:Response_time_Avg}\s*+%{NUMBER:Response_time_Min}\s*+%{NUMBER:Response_time_Max}

Thanks in advance.

You could use a multiline codec to combine all the lines for one time, then mutate+split to create an array of lines, then use grok (or csv, or dissect) on each of the entries in the array.

Thanks for your reply. But how will I replicate the time field to below lines until I get another timefield with a value in it?

I am suggesting you combine all the lines for one time into a single event.

I can use below code to combine all lines that starts with a space into a single event
input {
stdin {
codec => multiline {
pattern => "^\s"
what => "previous"
}
}
}

How will I split the lines so that I get time for each line?

I would experiment with something like

if [message] =~ "Timeout Reject Retransmission Duplicate" { drop {} }
dissect { mapping => { "message" => "%{time} %{}" } }
mutate { gsub => [ "message", "^[0-9]{2}:[0-9]{2}:[0-9]{2} ", "" ] }
split { }
dissect { mapping => { "message" => "%{type} %{}" } }
mutate { gsub => [ "message", "^[^ ]+ ", "" ] }
csv { target => "zzz" columns => [ "Succ", "Fail", "Timeout", "Reject", "Retransmission", "Duplicate", "Thrput" ] separator => " " }
mutate { rename => { "zzz" => "%{type}" } }

Thanks.. will try and let you know!

I tried the filter. This is what I get in my Kibana json output. The drop filter is not working correctly. I even tried to use exclude_lines from filebeat but that didnt work either.

{
"_index": "filebeat-6.2.4-%{[fields][file_type]}-2019.05.21",
"_type": "doc",
"_id": "RNS722oBBcEnU1ubr1Ox",
"_version": 1,
"_score": null,
"_source": {
"beat": {
"hostname": "server6646",
"version": "6.2.4",
"name": "server6646"
},
"offset": 8386341,
"prospector": {
"type": "log"
},
"message": " Name Succ Fail Timeout Reject Retransmission Duplicate Thrput Response time (ms)",
"tags": [
"beats_input_codec_plain_applied"
],
"@timestamp": "2019-05-21T18:51:10.506Z",
"time": "19-01-02",
"@version": "1",
"source": "/var/log/SDP/stat_cipdia/PSC-CIPDiameter_8.1_A_1_Gx.stat.1",
"host": "server6646",
"type": "19-01-02",
"19-01-02": {
"column11": "(ms)",
"column9": "Response",
"Fail": "Succ",
"column8": "Thrput",
"Succ": "Name",
"Retransmission": "Reject",
"Duplicate": "Retransmission",
"Thrput": "Duplicate",
"Reject": "Timeout",
"Timeout": "Fail",
"column10": "time"
}
},
"fields": {
"@timestamp": [
"2019-05-21T18:51:10.506Z"
]
},
"sort": [
1558464670506
]
}

The Logstash log is as below:

[2019-05-21T13:51:11,712][WARN ][logstash.outputs.elasticsearch] Could not index event to Elasticsearch. {:status=>400, :action=>["index", {:_id=>nil, :_index=>"filebeat-6.2.4-%{[fields][file_type]}-2019.05.21", :_type=>"doc", :_routing=>nil}, #LogStash::Event:0x668e9a41], :response=>{"index"=>{"_index"=>"filebeat-6.2.4-%{[fields][file_type]}-2019.05.21", "_type"=>"doc", "_id"=>"t9S722oBBcEnU1ubr1W3", "status"=>400, "error"=>{"type"=>"mapper_parsing_exception", "reason"=>"failed to parse", "caused_by"=>{"type"=>"illegal_argument_exception", "reason"=>"field name cannot be an empty string"}}}}}
[2019-05-21T13:51:11,712][WARN ][logstash.outputs.elasticsearch] Could not index event to Elasticsearch. {:status=>400, :action=>["index", {:_id=>nil, :_index=>"filebeat-6.2.4-%{[fields][file_type]}-2019.05.21", :_type=>"doc", :_routing=>nil}, #LogStash::Event:0xf23f080], :response=>{"index"=>{"_index"=>"filebeat-6.2.4-%{[fields][file_type]}-2019.05.21", "_type"=>"doc", "_id"=>"uNS722oBBcEnU1ubr1W3", "status"=>400, "error"=>{"type"=>"mapper_parsing_exception", "reason"=>"failed to parse", "caused_by"=>{"type"=>"illegal_argument_exception", "reason"=>"field name cannot be an empty string"}}}}}
[2019-05-21T13:51:11,712][WARN ][logstash.outputs.elasticsearch] Could not index event to Elasticsearch. {:status=>400, :action=>["index", {:_id=>nil, :_index=>"filebeat-6.2.4-%{[fields][file_type]}-2019.05.21", :_type=>"doc", :_routing=>nil}, #LogStash::Event:0x4202d457], :response=>{"index"=>{"_index"=>"filebeat-6.2.4-%{[fields][file_type]}-2019.05.21", "_type"=>"doc", "_id"=>"utS722oBBcEnU1ubr1W3", "status"=>400, "error"=>{"type"=>"mapper_parsing_exception", "reason"=>"failed to parse", "caused_by"=>{"type"=>"illegal_argument_exception", "reason"=>"field name cannot be an empty string"}}}}}
[2019-05-21T13:51:11,712][WARN ][logstash.outputs.elasticsearch] Could not index event to Elasticsearch. {:status=>400, :action=>["index", {:_id=>nil, :_index=>"filebeat-6.2.4-%{[fields][file_type]}-2019.05.21", :_type=>"doc", :_routing=>nil}, #LogStash::Event:0x1ef21e06], :response=>{"index"=>{"_index"=>"filebeat-6.2.4-%{[fields][file_type]}-2019.05.21", "_type"=>"doc", "_id"=>"u9S722oBBcEnU1ubr1W3", "status"=>400, "error"=>{"type"=>"mapper_parsing_exception", "reason"=>"failed to parse", "caused_by"=>{"type"=>"illegal_argument_exception", "reason"=>"field name cannot be an empty string"}}}}}

I suggest you replace the elasticsearch output with

output { stdout { codec => rubydebug } }

and make sure the events look OK before trying to index them.

My drop {} assumed that the words are space delimited. If there are tabs in the file you would need tabs in the pattern.

Comment out all of the lines of the filter I suggested except the first. Once that is working as you want uncomment the next, and so on.

Thanks @Badger for your reply. But output { stdout { codec => rubydebug } } isnt working for me. I get the tcp error when I try to write to stdout.

Below is the log from filebeat!

2019-05-21T13:59:24.741-0500    ERROR   logstash/async.go:235   Failed to publish events caused by: write tcp [::1]:22970->[::1]:5044: write: connection reset by peer
2019-05-21T13:59:25.741-0500    ERROR   pipeline/output.go:92   Failed to publish events: write tcp [::1]:22970->[::1]:5044: write: connection reset by peer

Where can I find rubudebug output? I'm running logstash & filebeat as a service. /var/log/logstash doesnt show any output.
Can you help me with this?

UPDATE: After updating logstash beats plugin using below command I can see files are being processed. But cant see stdout in console for logstash.
bin/logstash-plugin update logstash-input-beats

Finally created a file output and able to see rubydebug. Thanks!

I tried step by step as suggested by you. I was able to proceed. The only place that I got stuck is I was not able to populate the time field in all the records that was missing time from previous occurrences.

I need help in getting the time populated for all events under that time!

Input is:

07:52:16  Diameter                                                 1       2        3       4              5          6       7      -      -      -
          SLR-I                                                    11       22        33       44              55          66       77      -      -      -
          SLR-U                                                    111       222        333       444              555          666       777      -      -      -
          STR                                                      1111       2222        3333       4444              5555          6666       7777      -      -      
07:52:26  Diameter                                                 0       0        0       0              0          0       0      -      -      -

Logstash filter is:

filter{
grok { add_tag => [ "header" ]
match => [ "message", "Name",   "message", "Detected" ]  }
if "header" in [tags] {      drop { }  }
mutate {       remove_tag => [ "header" ] } 
dissect { mapping => { "message" => "%{time} %{}" } }
mutate {  gsub => [ "message", "^[0-9]{2}:[0-9]{2}:[0-9]{2}\s*", "" ] }
mutate {  strip => ["message"] }
split { }
dissect {      mapping => { "message" => "%{event_name->} %{}" }  }
mutate {     gsub => [ "message", "^[^ ]+ ", "" ]  }
csv {      target => "zzz" columns => [ "Succ", "Fail", "Timeout", "Reject", "Retransmission", 
"Duplicate", "Thrput", "Avg_Response", "Min_Response", "Max_Response" ] separator => " " }
mutate {      rename => { "zzz" => "%{event_name->}" } }
}

The output is as below:

{
     "event_name" => "Diameter",
           "beat" => {
        "name" => "server6646",
    "hostname" => "server6646",
     "version" => "6.2.4"
},
         "source" => "/var/log/SDP/stat_cipdia/aaaPSC.txt",
         "offset" => 149,
     "prospector" => {
    "type" => "log"
},
           "time" => "07:52:16",
     "@timestamp" => 2019-05-22T21:00:47.751Z,
       "@version" => "1",
           "tags" => [
    [0] "beats_input_codec_plain_applied",
    [1] "_grokparsefailure"
],
        "message" => "                                                1       2        3       4              5          6       7      -      -      -",
"%{event_name->}" => {
            "Reject" => "4",
      "Avg_Response" => "-",
      "Max_Response" => "-",
              "Succ" => "1",
              "Fail" => "2",
           "Timeout" => "3",
         "Duplicate" => "6",
            "Thrput" => "7",
    "Retransmission" => "5",
      "Min_Response" => "-"
         }
           }
         {
            "event_name" => "Diameter",
               "beat" => {
        "hostname" => "server6646",
         "version" => "6.2.4",
            "name" => "server6646"
    },
             "source" => "/var/log/SDP/stat_cipdia/aaaPSC.txt",
             "offset" => 786,
         "prospector" => {
        "type" => "log"
    },
               "time" => "07:52:26",
         "@timestamp" => 2019-05-22T21:00:47.752Z,
           "@version" => "1",
               "tags" => [
        [0] "beats_input_codec_plain_applied",
        [1] "_grokparsefailure"
    ],
            "message" => "                                                0       0        0       0              0          0       0      -      -      -",
    "%{event_name->}" => {
                "Reject" => "0",
          "Avg_Response" => "-",
          "Max_Response" => "-",
                  "Succ" => "0",
                  "Fail" => "0",
               "Timeout" => "0",
             "Duplicate" => "0",
                "Thrput" => "0",
        "Retransmission" => "0",
          "Min_Response" => "-"
    }
}
{
         "event_name" => "STR",
               "beat" => {
        "hostname" => "server6646",
         "version" => "6.2.4",
            "name" => "server6646"
    },
             "source" => "/var/log/SDP/stat_cipdia/aaaPSC.txt",
             "offset" => 637,
         "prospector" => {
        "type" => "log"
    },
               "time" => "",
         "@timestamp" => 2019-05-22T21:00:47.752Z,
           "@version" => "1",
               "tags" => [
        [0] "beats_input_codec_plain_applied",
        [1] "_grokparsefailure"
    ],
            "message" => "                                                     1111       2222        3333       4444              5555          6666       7777      -      -",
    "%{event_name->}" => {
                "Reject" => "4444",
          "Avg_Response" => "-",
                  "Succ" => "1111",
                  "Fail" => "2222",
               "Timeout" => "3333",
             "Duplicate" => "6666",
                "Thrput" => "7777",
        "Retransmission" => "5555",
          "Min_Response" => "-"
    }
}
{
         "event_name" => "SLR-I",
               "beat" => {
            "name" => "server6646",
        "hostname" => "server6646",
         "version" => "6.2.4"
    },
             "source" => "/var/log/SDP/stat_cipdia/aaaPSC.txt",
             "offset" => 305,
         "prospector" => {
        "type" => "log"
    },
               "time" => "",
         "@timestamp" => 2019-05-22T21:00:47.752Z,
           "@version" => "1",
               "tags" => [
        [0] "beats_input_codec_plain_applied",
        [1] "_grokparsefailure"
    ],
            "message" => "                                                   11       22        33       44              55          66       77      -      -      -",
    "%{event_name->}" => {
                "Reject" => "44",
          "Avg_Response" => "-",
          "Max_Response" => "-",
                  "Succ" => "11",
                  "Fail" => "22",
               "Timeout" => "33",
             "Duplicate" => "66",
                "Thrput" => "77",
        "Retransmission" => "55",
          "Min_Response" => "-"
    }
}
{
         "event_name" => "SLR-U",
               "beat" => {
            "name" => "server6646",
        "hostname" => "server6646",
         "version" => "6.2.4"
     }
}

In my post I assumed a multiline codec like the one you posted. The filter I posted parses the timestamp, then split the lines so that every line included the timestamp.

I added the input in my previous post. it has /n after every record. What would I do this case?
Should I create a multiline codec first by joining all events that starts with a space to a single event?

Thanks @Badger the code worked after using the multiline codec in filebeat. Is it advised to use multiline codec in Filebeat and then send to Logstash?

Thanks
Ankita

If it works it is fine.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.