Logstash config for transactions

Hi
need to write logstash config that parse log file from this path: "/tmp/logs/*" store in elastic.

here is the log:

09:54:37:566 R[SRV1]L[477]T[0300]ID[696119]
09:54:37:566 S[SRV2]L[477]T[0300]ID[696119]
09:54:55:284 R[SRV1]L[453]T[0310]ID[696119]
09:54:55:284 S[SRV2]L[453]T[0310]ID[696119]

keys and value in lines like this:

key=time,value=09:54:37:566
if line have key=R then value=SRV1 OR if line have key=S then value=SRV2 (R means recieve, S means send), two R with two S (SRV1,SRV2) with same ID(696119) and T(0300=0310) means a complete transaction.
key=L, value=477
key=T, value=0300
key=ID, value=696119

FYI: four lines like above log that I mentioned are single transaction, now need to extract from file that contain lot's of these lines.
1-number of transactions
2-incomplete transactions
3-duraion of transactions

Any idea?
Thanks

You can use something like this:

input {
  file {
   path => ""/tmp/logs/*"
   start_position => beginning
      codec => multiline { 
      pattern => "\d{2}:\d{2}:\d{2}:\d{3}"
      negate => true
      what => "previous"
      }
   #sincedb_path => "NUL"
   mode => "tail"
   }
} # input

filter {

    grok {
      break_on_match => false
      match => {
        "message" => ["^%{TIME:time}\s*%",
				 "R\[%{DATA:R}\]",
				 "S\[%{DATA:S}\]",
				 "L\[%{INT:L}\]",
				 "T\[%{INT:T}\]",
				 "ID\[%{INT:ID}\]"  ]
      }
    }
    prune { blacklist_names => [ "host","event", "log" ] }
}

output {
     stdout {
        codec => rubydebug{}
    }
}

Sometimes is useful to keep log and event fields, up to you.
Result

{
             "R" => "SRV1",
       "message" => "09:54:37:566 R[SRV1]L[477]T[0300]ID[696119]\r",
             "T" => "0300",
             "L" => "477",
            "ID" => "696119",
    "@timestamp" => 2023-06-19T06:33:45.637209100Z
}
{
       "message" => "09:54:37:566 S[SRV2]L[477]T[0300]ID[696119]\r",
             "T" => "0300",
             "S" => "SRV2",
             "L" => "477",
            "ID" => "696119",
    "@timestamp" => 2023-06-19T06:33:45.638203800Z
}
{
             "R" => "SRV1",
       "message" => "09:54:55:284 R[SRV1]L[453]T[0310]ID[696119]\r",
             "T" => "0310",
             "L" => "453",
            "ID" => "696119",
    "@timestamp" => 2023-06-19T06:33:45.638203800Z
}

@Rios how about these:
1-number of transactions
2-incomplete transactions
3-duraion of transactions

You could start with something like

    grok { match => { "message" => "^%{NOTSPACE:[@metadata][ts]} (?<rs>[RS])\[%{WORD}\]L\[%{NUMBER:L:int}\]T\[%{NUMBER:T:int}\]ID\[%{NUMBER:ID:int}\]" } }

    date { match => [ "[@metadata][ts]", "HH:mm:ss:SSS" ] }
    aggregate {
        task_id => "%{ID}"
        code => '
            map["count"] ||= 0
            map["count"] += 1
            map["L"] ||= event.get("L")
            map["T"] ||= event.get("T")
            map["start"] ||= event.get("@timestamp")
            map["tags"] = [ "transaction" ]
            if map["count"] == 4
                map["tags"] << "complete"
                map["duration"] = event.get("@timestamp").to_f - map["start"].to_f
            end
            #event.cancel
        '
        push_map_as_event_on_timeout => true
        timeout_task_id_field => "ID"
        timeout => 6
    }
    if "transaction" in [tags] {
        mutate { add_field => { "[@metadata][id]" => 1 } }
        aggregate {
            task_id => "%{[@metadata][id]}"
            code => '
                map["count"] ||= 0
                map["count"] += 1

                map["complete"] ||= 0
                if event.get("tags").include? "complete"
                    map["complete"] += 1
                end

                map["duration"] ||= 0
                begin
                    map["duration"] += event.get("duration")
                rescue
                end

                map["tags"] = [ "summary" ]
            '
            push_map_as_event_on_timeout => true
            timeout => 20
        }

Obviously you may need to rewrite parts of that to suit your use case. As always with aggregate you will need pipeline.workers set to 1 and pipeline.ordered to evaluate to true.

2 Likes

@Badger Thanks it work perfectly but need to extract timestamp like this:

extract "date" from log file name like this:

log.s1cusappab1.20230619

extract "time" from time that store in each line of log, like this:

09:54:37:566 R[SRV1]L[477]T[0300]ID[696119]

any idea?
Thanks

@Badger two other issues:
1- show delimiter for ID like this: 624,325 how get ride of it, correct value is: 624325
2-generate negative value for duration -0.259, correct value is: 0.259

FYI: try this for item 2 but not fix issue

map["duration"] = event.get("@timestamp").to_f - map["start"].to_f

Any idea?
Thanks

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.