Logstash filtering drives me nuts

Hi,

For some time now I have this stack, filbeats shipping logs of several application servers to elasticsearch via logstash. Logstash filtering is done in 4 pipelines. Now recently our supplier decided to change a field in the logs and ever since then it seems like my stack is cursed. I got _grokparsefailures. However after making s;aight adjustments, no logs at alle appear in elastic. So someone suggested to use dissect instead of grok. So far, so good. or not. Still get no data in elastic. I simply do not understand why.

My logstash config.

input {
        beats {
        port => 5047
        }
}

###
# ConcurrentStreams API Log importer
# Log events can vary, so lots of logic is required to make sense of it all.
# The following custom failure tags are used:
#       _noeventtype: The log event contains no event-type.
#       _unknowneventtype: When the event-type of the log event is not known (but exists).
###
filter {
        ### Handle Concurrent Streams API log.
        if "api-log" in [tags] and "concurrentstreams" in [tags] {
                ### Trim the message
                mutate {
                  strip => ["message"]
                }

                dissect {
                  mapping => {
                    "message" => "ts: %{ts} %{+ts} | logLevel: %{log-level} | appId: %{app-id} | %{} | SID: %{session-id} | TN: %{transaction-id} | clientIp: %{client-ip} | userId: %{user-id} | apiType: %{} | api: %{api} | platform: %{platform} | %{additional-data}"
                  }
                }

                mutate {
                  strip => ["app-id", "api", "session-id", "transaction-id", "additional-data"]
                }

                # Try to get the eventType field. Not all log events have it...
                grok {
                        keep_empty_captures => true
                        named_captures_only => true
                        tag_on_failure => ["_noeventtype"]
                        match => { "additional-data" => "eventType:\s*(?<event-type>[A-Z_]+)" }
                }

                if [log-level] == "DEBUG" {
                        # Drop all debug messages.
                        drop { }
                }
                else {
                        if ![event-type] {
                                # This is a log event without an event type.
                                # Example: ts: 2019-03-11 21:23:20,695 | logLevel: INFO  | appId: concurrentstream-ms | thread: (HTTP-1743) | SID: cf101ef0-7234-df6b-b923-658780cf3bfa | TN: ffc757c3-759e-043f-ff11-0e46b77b541d | clientIp: 86.84.33.51 | username: 3207382 | apiType: B2B | api: increaseConcurrentStreams | platform: smarttv | {"eventId":"9e5e5af2-0657-4940-9a04-79247af5f4ea","eventSource":"Increase Concurrent Streams","eventType":"StreamConcurrencyLimitReached","timestamp":"1552335800695","headers":{},"payload":"{\"action\":\"StreamConcurrencyLimitReached\",\"platform\":\"smarttv\",\"username\":\"20000001827463\",\"crmAccountId\":\"20000001827463\",\"sessionId\":\"GQd+lIar3VvXG6OQGnOx8Bf1\",\"contentId\":\"30\",\"contentType\":\"Channel\",\"timestamp\":1552335800694,\"ruleId\":\"a4403d7c-a43b-4c5b-b289-caf63f50c6ad\",\"ruleType\":\"Subscriber\",\"streamLimit\":\"30\"}","payloadType":"JSON","payloadEncoding":"UTF-8"}
                                # Everything after the platform is a status-message.
                                mutate {
                                        copy => { "additional-data" => "status-message" }
                                }
                        }
                        else if [event-type] == "APISTART" {
                                # ts: 2019-01-28 00:05:29,297 | logLevel: INFO  | appId: concurrentstream-ms | thread: (HTTP-1847) | SID:  | TN: 223c7343-9ed5-4615-9051-2f4fe96e10dc | clientIp: 10.31.205.92 | username:  | apiType: B2B | api: getConcurrentStreamsPoliciesByRuleType | platform:  | className: com.accenture.avs.be.concurrentstream.web.LoggingInterceptor | methodName: preHandle | eventType: APISTART
                                # Nothing left to do!
                        }
                        else if [event-type] == "REQBODY" {
                                # ts: 2019-01-28 09:32:05,817 | logLevel: INFO  | appId: concurrentstream-ms | thread: (HTTP-1069) | SID: f0d65087-dc3b-1ad9-a26d-1904b95581b1 | TN: 38c8611b-9603-e6fc-85a5-c6a02d860b27 | clientIp: 62.140.132.83 | username: 4 | apiType: B2B | api: increaseConcurrentStreams | platform: android | className: com.accenture.avs.be.concurrentstream.web.LoggingInterceptor | methodName: logRequest | eventType: REQBODY | body: {"crmAccountId":"00012314150010","ruleIds":["23606456-13f8-4f23-b854-74db05dccc51","e6452e95-28ec-4443-b280-1d752a5ad605"],"tags":[{"type":"Platform","value":"android"},{"type":"channel","value":"19"},{"type":"channelOutOfHome","value":"19"},{"type":"outOfHome"}]}

                                # For now we are dropping all REQBODY messages. They take a lot of space and don't contain data we use frequently.
                                drop { }
                                # grok {
                                #       keep_empty_captures => true
                                #       named_captures_only => true
                                #       match => { "additional-data" => "body:\s*(%{GREEDYDATA:outgoing-request-body})?" }
                                # }
                        }
                        else if [event-type] == "APIEND" {
                                ## ts: 2019-01-28 00:05:29,312 | logLevel: INFO  | appId: concurrentstream-ms | thread: (HTTP-1847) | SID:  | TN: 223c7343-9ed5-4615-9051-2f4fe96e10dc | clientIp: 10.31.205.92 | username:  | apiType: B2B | api: getConcurrentStreamsPoliciesByRuleType | platform:  | className: com.accenture.avs.be.concurrentstream.web.LoggingInterceptor | methodName: beforeBodyWrite | eventType: APIEND | userAgent: Apache-HttpClient/4.5.3 (Java/1.8.0_162) | request:  | response: result=OK, resultCode=ACN_200, resultDescription=OK | executionTime(ms): 15
                                grok {
                                        pattern_definitions => {
                                                "DELIM" => "\s*\|\s*"
                                                "STRING" => "[^|]+"
                                        }

                                        keep_empty_captures => true
                                        named_captures_only => true
                                        match => { "additional-data" => "userAgent:\s*(%{STRING:user-agent})%{DELIM}request:\s*(%{STRING:incoming-request-data})%{DELIM}response:\s*(%{STRING:outgoing-response-data})%{DELIM}executionTime\(ms\):\s*%{NUMBER:execution-time}$" }
                                }

                                mutate {
                                        strip => ["outgoing-response-data", "user-agent", "incoming-request-data"]
                                }

                                kv {
                                        field_split => ","
                                        trim_key => " "
                                        trim_value => " "
                                        source => "outgoing-response-data"
                                        target => "outgoing-response-data"
                                }

                                if [outgoing-response-data] {
                                        if [outgoing-response-data][result] {
                                                mutate {
                                                        rename => { "[outgoing-response-data][result]" => "outgoing-response-result" }
                                                }
                                        }

                                        if [outgoing-response-data][resultCode] {
                                                mutate {
                                                        rename => { "[outgoing-response-data][resultCode]" => "outgoing-response-code" }
                                                }
                                        }

                                        if [outgoing-response-data][resultDescription] {
                                                mutate {
                                                        rename => { "[outgoing-response-data][resultDescription]" => "outgoing-response-description" }
                                                }
                                        }

                                        mutate {
                                                remove_field => [ "outgoing-response-data" ]
                                        }
                                }
                        }
                        else if [event-type] == "SYSERROR" {
                                grok {
                                        pattern_definitions => {
                                                "DELIM" => "\s*\|\s*"
                                                "STRING" => "[^|]+"
                                        }

                                        keep_empty_captures => true
                                        named_captures_only => true
                                        match => { "additional-data" => "errorType:\s*%{STRING:error-type}%{DELIM}Exception:\s*%{STRING:error-exception}%{DELIM}message:\s*(%{STRING:error-message})%{DELIM}stacktrace:\s*(?<error-stacktrace>.*)$" }
                                }

                                mutate {
                                        strip => ["error-type", "error-message", "error-exception", "error-stacktrace"]
                                }
                        }
                        else{
                                mutate {
                                        add_tag => [ "_unknowneventtype"]
                                }
                        }

Continued in next post due to bodylimit

                        # General bits
                        # Make sure the timestamp of the inserted event has the timestamp of the actual log event.
                        date {
                                # UserProfile: 2018-10-16 08:13:42,897
                                match => ["ts", "yyyy-MM-dd HH:mm:ss,SSS"]
                                timezone => "CET"
                                target => "@timestamp"
                                remove_field => [ "ts" ]
                        }

                        # Add a fingerprint to prevent duplicate log events.
                        fingerprint {
                                concatenate_sources => true
                                source => ["message","agent.hostname"]
                                target => "[@metadata][fingerprint]"
                                method => "SHA1"
                                key => "deduplication-key"
                        }
                        ### Add an indexbasename to create correct indexname.
                        mutate {
                                add_field => { "[@metadata][indexbasename]" => "concurrentstreams-api" }
                        }
                        # Clean up the parsing helper field.
                        mutate {
                                remove_field => [ "additional-data" ]
                        }

                        # Clean up some fields that are not always present.
                        if ![platform] or [platform] == "" or [platform] == "-" {
                                mutate {
                                        remove_field => [ "platform" ]
                                }
                        }

                        if ![session-id] or [session-id] == "" or [session-id] == "-" {
                                mutate {
                                        remove_field => [ "session-id" ]
                                }
                        }

                        if ![user-id] or [user-id] == "" or [user-id] == "-" {
                                mutate {
                                        remove_field => [ "user-id" ]
                                }
                        }

                        if ![api] or [api] == "" or [api] == "-" {
                                mutate {
                                        remove_field => [ "api" ]
                                }
                        }

                        if ![app-id]  or [app-id] == "" or [app-id] == "-" {
                                mutate {
                                        remove_field => [ "app-id" ]
                                }
                        }

                        if ![client-ip] or [client-ip] == "" or [client-ip] == "-" {
                                mutate {
                                        remove_field => [ "client-ip" ]
                                }
                        }

                        if ![transaction-id] or [transaction-id] == "" or [transaction-id] == "-" {
                                mutate {
                                        remove_field => [ "transaction-id" ]
                                }
                        }
                }

                # Drop the message field to no longer store the initial log-event.
                # And remove the default tag added by Filebeat.
                mutate {
                        #remove_field => [ "message" ]
                        remove_tag => ["beats_input_codec_plain_applied"]
                }
        }
}

output {
        # Send the events to ElasticSearch, which a different index based on the log file the events came from.
        if "api-log" in [tags] or "access-log" in [tags] or "blip" in [tags] or "idservice" in [tags] or "proxysql-stats" in [tags] {
                elasticsearch {
                        hosts => ["pdbs358.grn.prd.itv.local:9200","pdbs359.grn.prd.itv.local:9200"]
                        document_id => "%{[@metadata][fingerprint]}"
                        index => "%{[@metadata][indexbasename]}-ds"
                        action => "create"
                        sniffing => false
                }
        }

        #Just some debug output. Remove for production.
        #  stdout {
        #       codec => rubydebug
        #  }

        ### More debug output, do not use in production.
        # file {
        #       path => "/tmp/logstash.output"
        # }
}

My filebeat config:

filebeat.inputs:

- type: log
  enabled: true

  paths:
   - /var/log/proxysql-stats/*.log

  tags: ["avs6", "proxysql-stats", "concurrentstreams", "amsterdam"]

  exclude_files: ['\.gz$']
  ignore_older: 6h
  close_inactive: 15m
  close_removed: true
  clean_removed: true

  harvester_limit: 0

- type: log
  enabled: true

  paths:
   - /var/log/avs6/*.log*
   # - /logs/*.log*

  exclude_files: ['\.gz$']

  multiline.pattern: '^ts:'
  multiline.negate: true
  multiline.match: after

  tags: ["avs6", "api-log", "concurrentstreams", "amsterdam"]

  ignore_older: 6h
  close_inactive: 5m
  # close_inactive: 1m
  close_removed: true
  clean_removed: true
  scan_frequency: 30s
  harvester_limit: 0

filebeat.config.modules:
  enabled: false

processors:
  - drop_fields:
      fields: ["host"]

fields:
  environment: "Production"
  # environment: docker

queue.mem:
  events: 4096
  flush.min_events: 256
  flush.timeout: 10s

output.logstash:
  enabled: true
  hosts: ["logstashsrv:5047"]
  loadbalance: true
  timeout: 5m
  bulk_max_size: 2048
  slow_start: true

logging:
  level: info
  to_syslog: false
  to_files: true
  files:
    path: /var/log/filebeat/
    name: filebeat
    keepfiles: 14
  permissions: 0755
  metrics:
    enabled: false

Some loglines:

ts: 2021-05-18 09:14:45,600 | logLevel: INFO  | appId: concurrentstream-ms | thread: (http-0.0.0.0:8080-86) | SID: ee6362ac-85b3-22b8-838f-d6b7585bed12 | TN: f98e1f3c-85b9-8861-0ecd-711b75e0036e | clientIp: 127.0.0.1 | userId: 9999999 | apiType: B2B | api: increaseConcurrentStreams | platform: pctv | eventType: APISTART
ts: 2021-05-18 09:14:45,600 | logLevel: INFO  | appId: concurrentstream-ms | thread: (http-0.0.0.0:8080-86) | SID: ee6362ac-85b3-22b8-838f-d6b7585bed12 | TN: f98e1f3c-85b9-8861-0ecd-711b75e0036e | clientIp: 127.0.0.1 | userId: 9999999 | apiType: B2B | api: increaseConcurrentStreams | platform: pctv | eventType: REQBODY | body: {"crmAccountId":"XXX","ruleIds":["a8c287c6-f5cf-45d5-8335-e57a9fff4aac","a7458e6d-5f1f-4d10-b627-928d455ef955","25cc6791-cfbf-41ef-bbdb-9cedd69d60c7","a4403d7c-a43b-4c5b-b289-caf63f50c6ad"],"tags":[{"type":"Platform","value":"pctv"},{"type":"channel","value":"19"}]}
ts: 2021-05-18 09:14:45,614 | logLevel: INFO  | appId: concurrentstream-ms | thread: (http-0.0.0.0:8080-86) | SID: ee6362ac-85b3-22b8-838f-d6b7585bed12 | TN: f98e1f3c-85b9-8861-0ecd-711b75e0036e | clientIp: 127.0.0.1 | userId: 9999999 | apiType: B2B | api: increaseConcurrentStreams | platform: pctv | eventType: APIEND | userAgent: Mozilla/5.0 (X11; Linux armv7l) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/83.0.4103.97 Safari/537.36 CrKey/1.49.250946 | request: channel=pctv, lang=nld | response: result=OK, resultCode=ACN_200, resultDescription=OK | executionTime(ms): 14

The culprit is the field ThreadId which has changed from an earlier form. However I do not understand what is going wrong. So if anyone could lighten me up I would be very, very, happy.

Cheers

This dissect in the first post also gives a dissect failure by the way. Don't understand that one either. When testing locally all seemed fine.

output example (debug output to file)

{
                        "client-ip" => "`127.0.0.1",
            "incoming-request-data" => "channel=stb, policyIds=, lang=nld",
                          "user-id" => "9999999",
                            "agent" => {
                  "id" => "4971d2f8-81db-4aa7-ba1f-5a1fb25480b7",
                "name" => "srv2.prdl.itv.local",
                "type" => "filebeat",
            "hostname" => "srv2.prdl.itv.local",
             "version" => "7.12.0",
        "ephemeral_id" => "7b4de910-7213-4c8c-92c8-2a5f6a1ba6d5"
    },
                          "message" => "ts: 2021-05-18 11:09:01,082 | logLevel: INFO  | appId: concurrentstream-ms | thread: (http-0.0.0.0:8080-124) | SID: 0dbefbcf-f77b-0a15-06f9-9acb7d8ee4fa | TN: ecfaa8e2-75f7-4636-7541-7602fce57305 | clientIp: 127.0.0.1 | userId: 9999999 | apiType: B2B | api: policyCalculation | platform: stb | eventType: APIEND | userAgent: Mozilla/5.0 (QtEmbedded; Linux) AppleWebKit/534.34 (KHTML, like Gecko) NSN/1.0 Safari/534.34 | request: channel=stb, policyIds=, lang=nld | response: result=OK, resultCode=ACN_200, resultDescription=OK | executionTime(ms): 0",
                         "@version" => "1",
                              "api" => "policyCalculation",
                         "platform" => "stb",
    "outgoing-response-description" => "OK",
                       "user-agent" => "Mozilla/5.0 (QtEmbedded; Linux) AppleWebKit/534.34 (KHTML, like Gecko) NSN/1.0 Safari/534.34",
                              "ecs" => {
        "version" => "1.8.0"
    },
                              "log" => {
        "offset" => 418615362,
          "file" => {
            "path" => "/var/log/avs6/concurrentstream-ms.log"
        }
    },
                       "event-type" => "APIEND",
                             "tags" => [
        [0] "avs6",
        [1] "api-log",
        [2] "concurrentstreams",
        [3] "amsterdam"
    ],
                        "log-level" => "INFO ",
                   "execution-time" => "0",
                       "@timestamp" => 2021-05-18T09:09:01.082Z,
                            "input" => {
        "type" => "log"
    },
           "outgoing-response-code" => "ACN_200",
                   "transaction-id" => "ecfaa8e2-75f7-4636-7541-7602fce57305",
                           "fields" => {
        "environment" => "Production"
    },
         "outgoing-response-result" => "OK",
                       "session-id" => "0dbefbcf-f77b-0a15-06f9-9acb7d8ee4fa",
                           "app-id" => "concurrentstream-ms"
}

What I do NOT understand in the above situation is that it seems OK. But yet I do NOT get any info in elastic while other applications through the same pipeline DO get to the respective indices.

Just did a test.
I added a logline to the log in which I changed the field:

thread: (http-0.0.0.0:8080-86)

to

thread: (http-0.0.0.0-8080-86)

and everything is perfectly fine for this line. It ends up in the index without errors.

Anyone have an idea how to fix that? Kan I drop this field in filebeat already?

And I have to recall that one. Tried adding a gsub in logstash and it did not work out

Changed:

                mutate {
                  strip => ["message"]
                #  add_field => { "[servername]" => "papps1476" }
                }

to

                mutate {
                  strip => ["message"]
                  gsub => ["message", "http-0.0.0.0:", "http-0.0.0.0_"]
                #  add_field => { "[servername]" => "papps1476" }
                }

and it did not work out. pulling my hair out here.

After this action the output looks like this:

{
                         "platform" => "stb",
            "incoming-request-data" => "channel=stb, lang=nld",
                           "app-id" => "concurrentstream-ms | thread: (http-0.0.0.0_8080-12)",
                              "api" => "increaseConcurrentStreams",
                        "client-ip" => "127.0.0.1",
                            "input" => {
        "type" => "log"
    },
                       "event-type" => "APIEND",
                         "@version" => "1",
                          "message" => "ts: 2021-05-18 13:08:24,656 | logLevel: INFO  | appId: concurrentstream-ms | thread: (http-0.0.0.0_8080-12) | SID: 597b8805-7bd1-25eb-adc5-d46445926d89 | TN: 36db15fd-9b74-0197-e124-e2ecd156f99f | clientIp: 127.0.0.1 | userId: 9999999| apiType: B2B | api: increaseConcurrentStreams | platform: stb | eventType: APIEND | userAgent: Mozilla/5.0 (QtEmbedded; Linux) AppleWebKit/534.34 (KHTML, like Gecko) NSN/1.0 Safari/534.34 | request: channel=stb, lang=nld | response: result=OK, resultCode=ACN_200, resultDescription=OK | executionTime(ms): 5",
                           "fields" => {
        "environment" => "Production"
    },
                              "ecs" => {
        "version" => "1.8.0"
    },
                       "@timestamp" => 2021-05-18T11:08:24.656Z,
                             "tags" => [
        [0] "avs6",
        [1] "api-log",
        [2] "concurrentstreams",
        [3] "amsterdam"
    ],
                       "session-id" => "597b8805-7bd1-25eb-adc5-d46445926d89",
                   "execution-time" => "5",
                              "log" => {
        "offset" => 481649785,
          "file" => {
            "path" => "/var/log/avs6/concurrentstream-ms.log"
        }
    },
                   "transaction-id" => "36db15fd-9b74-0197-e124-e2ecd156f99f",
         "outgoing-response-result" => "OK",
                            "agent" => {
             "version" => "7.12.0",
                "name" => "srv2.prdl.itv.local",
            "hostname" => "srv2.prdl.itv.local",
        "ephemeral_id" => "b41102a0-9ca6-4007-84e9-7a387617de23",
                "type" => "filebeat",
                  "id" => "4971d2f8-81db-4aa7-ba1f-5a1fb25480b7"
    },
                          "user-id" => "8775561",
                       "user-agent" => "Mozilla/5.0 (QtEmbedded; Linux) AppleWebKit/534.34 (KHTML, like Gecko) NSN/1.0 Safari/534.34",
           "outgoing-response-code" => "ACN_200",
    "outgoing-response-description" => "OK",
                        "log-level" => "INFO "
}

notice the appaerant failure of dissect regarding to the app-id field

OK.

After update of the applications, suddenly the logs go OK again. Totally flabbergasted here. Apearantly something changed in the logging. Still need to analyze for supplier gave no clue that anything changed with regard to logging. But for now the issue is gone.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.