Add grok filter for costume log data in Filebeat's NGINX module


(Mohamed Salih) #1

Hi

I'm not sure if this is the best way to go on about this. If there is a better way, please advice.

I've added a new access.log entry for NGINX that tracks the following:

$remote_addr 
$ssl_protocol 
$ssl_cipher 
$request

Because it's not the default log for NGINX, the NGINX module if failing to break down the message into the different components. This is because the grok filter doesn't recognize this log format.
Kibana has the following error:

Provided Grok expressions do not match field value

I looked around for a while and came to the conclusion that I need to add a grok pattern for the new log format to /usr/share/filebeat/module/nginx/access/ingest/default.json.

The log and grok patterns are:

11.11.11.11 TLSv1.2 ECDHE-RSA-AES256-GCM-SHA384 GET / HTTP/1.1
%{IP:clientip} %{DATA:tlsprotocol} %{DATA:cipher} %{WORD:request}\ / %{GREEDYDATA:http.protocol}

The content of default.json

{
  "description": "Pipeline for parsing Nginx access logs. Requires the geoip and user_agent plugins.",
  "processors": [{
    "grok": {
      "field": "message",
      "patterns":[
        "\"?%{IP_LIST:nginx.access.remote_ip_list} - %{DATA:nginx.access.user_name} \\[%{HTTPDATE:nginx.access.time}\\] \"%{WORD:nginx.access.method} %{DATA:nginx.access.url} HTTP/%{NUMBER:nginx.access.http_version}\" %{NUMBER:nginx.access.response_code} %{NUMBER:nginx.access.body_sent.bytes} \"%{DATA:nginx.access.referrer}\" \"%{DATA:nginx.access.agent}\""
        ],
      "pattern_definitions": {
        "IP_LIST": "%{IP}(\"?,?\\s*%{IP})*"
      },
      "ignore_missing": true
    }
  }, {
    "split": {
      "field": "nginx.access.remote_ip_list",
      "separator": "\"?,?\\s+"
    }
  }, {
    "script": {
      "lang": "painless",
      "inline": "boolean isPrivate(def ip) { try { StringTokenizer tok = new StringTokenizer(ip, '.'); int firstByte = Integer.parseInt(tok.nextToken());       int secondByte = Integer.parseInt(tok.nextToken());       if (firstByte == 10) {         return true;       }       if (firstByte == 192 && secondByte == 168) {         return true;       }       if (firstByte == 172 && secondByte >= 16 && secondByte <= 31) {         return true;       }       if (firstByte == 127) {         return true;       }       return false;     } catch (Exception e) {       return false;     }   }   def found = false;   for (def item : ctx.nginx.access.remote_ip_list) {     if (!isPrivate(item)) {       ctx.nginx.access.remote_ip = item;       found = true;       break;     }   }   if (!found) {     ctx.nginx.access.remote_ip = ctx.nginx.access.remote_ip_list[0];   }"
      }
  }, {
    "remove":{
      "field": "message"
    }
  }, {
    "rename": {
      "field": "@timestamp",
      "target_field": "read_timestamp"
    }
  }, {
    "date": {
      "field": "nginx.access.time",
      "target_field": "@timestamp",
      "formats": ["dd/MMM/YYYY:H:m:s Z"]
    }
  }, {
    "remove": {
      "field": "nginx.access.time"
    }
  }, {
    "user_agent": {
      "field": "nginx.access.agent",
      "target_field": "nginx.access.user_agent"
    }
  }, {
    "remove": {
      "field": "nginx.access.agent"
    }
  }, {
    "geoip": {
      "field": "nginx.access.remote_ip",
      "target_field": "nginx.access.geoip"
    }
  }],
  "on_failure" : [{
    "set" : {
      "field" : "error.message",
      "value" : "{{ _ingest.on_failure_message }}"
    }
  }]
}

I tried adding the grok filter to that but then Filebeat stops, I tried a number of ways. But i'm not sure how to add another pattern to that json file without it breaking.

Thanks for any advice.


(Mohamed Salih) #2

I manged to add the new pattern without it breaking.

{
  "description": "Pipeline for parsing Nginx access logs. Requires the geoip and user_agent plugins.",
  "processors": [{
    "grok": {
      "field": "message",
      "patterns":[
        "\"?%{IP_LIST:nginx.access.remote_ip_list} - %{DATA:nginx.access.user_name} \\[%{HTTPDATE:nginx.access.time}\\] \"%{WORD:nginx.access.method} %{DATA:nginx.access.url} HTTP/%{NUMBER:nginx.access.http_version}\" %{NUMBER:nginx.access.response_code} %{NUMBER:nginx.access.body_sent.bytes} \"%{DATA:nginx.access.referrer}\" \"%{DATA:nginx.access.agent}\""
        ],
      "pattern_definitions": {
        "IP_LIST": "%{IP}(\"?,?\\s*%{IP})*"
      },
      "ignore_missing": true
    }
  }, {
    "grok": {
      "field": "message",
      "patterns":[
          "%{IP:client.ip} %{DATA:tlsi.protocol} %{DATA:cipher} %{WORD:request} / %{GREEDYDATA:http.protocol}"
        ],
      "ignore_missing": true
    }
  }, {
    "split": {
      "field": "nginx.access.remote_ip_list",
      "separator": "\"?,?\\s+"
    }
  }, {
    "script": {
      "lang": "painless",
      "inline": "boolean isPrivate(def ip) { try { StringTokenizer tok = new StringTokenizer(ip, '.'); int firstByte = Integer.parseInt(tok.nextToken());       int secondByte = Integer.parseInt(tok.nextToken());       if (firstByte == 10) {         return true;       }       if (firstByte == 192 && secondByte == 168) {         return true;       }       if (firstByte == 172 && secondByte >= 16 && secondByte <= 31) {         return true;       }       if (firstByte == 127) {         return true;       }       return false;     } catch (Exception e) {       return false;     }   }   def found = false;   for (def item : ctx.nginx.access.remote_ip_list) {     if (!isPrivate(item)) {       ctx.nginx.access.remote_ip = item;       found = true;       break;     }   }   if (!found) {     ctx.nginx.access.remote_ip = ctx.nginx.access.remote_ip_list[0];   }"
      }
  }, {
    "remove":{
      "field": "message"
    }
  }, {
    "rename": {
      "field": "@timestamp",
      "target_field": "read_timestamp"
    }
  }, {
    "date": {
      "field": "nginx.access.time",
      "target_field": "@timestamp",
      "formats": ["dd/MMM/YYYY:H:m:s Z"]
    }
  }, {
    "remove": {
      "field": "nginx.access.time"
    }
  }, {
    "user_agent": {
      "field": "nginx.access.agent",
      "target_field": "nginx.access.user_agent"
    }
  }, {
    "remove": {
      "field": "nginx.access.agent"
    }
  }, {
    "geoip": {
      "field": "nginx.access.remote_ip",
      "target_field": "nginx.access.geoip"
    }
  }],
  "on_failure" : [{
    "set" : {
      "field" : "error.message",
      "value" : "{{ _ingest.on_failure_message }}"
    }
  }]
}
                        

And I deleted the ingest pipeline from the dev tools in kibana. The new one is loading.
but kibana is still showing the grok filter is failing.


(Steffen Siering) #3

You can try to debug your grok/pipeline definition using the Kibana grok debugger or via Console using the Ingest Node Simulate API.

The pipeline definition has more than one grok filter. Using the Ingest Node Simulate API in verbose mode you will get back a document with details of input/output/failure for every single processor in the pipeline.


(Mohamed Salih) #4

Hi Steffens,
Thanks for assisting me.

I have tried using those tools to get it to work. I removed the original content in default.json and replaced it with the following as a test.

{
  "description": "Pipeline for parsing Tailored Nginx access logs used for TLS.",
  "processors": [{
    "grok": {
      "field": "message",
      "patterns":[
        "%{IP:nginx.access.remote_ip} %{DATA:nginx.access.protocol} %{DATA:nginx.access.cipher} %{WORD:nginx.access.request} / HTTP/%{NUMBER:nginx.access.http_version}"
       ],
      "ignore_missing": true
    }
  }]
}

This is the pattern for the costume log formats. When I reloaded the pipeline it worked. the filter broke down the message field.

I have seen in other questions people having more than one pattern in a Filebeat module.

I just can't seem to add the costume log format pattern without it coming up with a syntax error or failing completely.


(Mohamed Salih) #5

I was finally able to solve this.

The issue turned out to be a syntax error.

All I had to do was add a comma to the end of the first grok expression in the patterns array, and then add my costume NGINX log expression. This didn't work for me before because of syntax error but I thought it was the Filebeat module not accepting my grok expression.

It's only once I started using a JSON validator that it became much easier.
This is my final default.json

{
        "description": "Pipeline for parsing Nginx access logs. Requires the geoip and user_agent plugins.",
        "processors": [{
                "grok": {
                        "field": "message",
                        "patterns": [
                                "\"?%{IP_LIST:nginx.access.remote_ip_list} - %{DATA:nginx.access.user_name} \\[%{HTTPDATE:nginx.access.time}\\] \"%{WORD:nginx.access.method} %{DATA:nginx.access.url} HTTP/%{NUMBER:nginx.access.http_version}\" %{NUMBER:nginx.access.responss
e_code} %{NUMBER:nginx.access.body_sent.bytes} \"%{DATA:nginx.access.referrer}\" \"%{DATA:nginx.access.agent}\"" EVERYTHING AFTER THE COMMA IS COSTUME PATTERN, "%{IP:nginx.access.remote_ip} %{DATA:nginx.access.protocol} %{DATA:nginx.access.cipher} %{WORD:nginx.access.request} / HTTP/%{NUMBER:nginx.access.http_versioo
n}"" 
                        ],
                        "pattern_definitions": {
                                "IP_LIST": "%{IP}(\"?,?\\s*%{IP})*"
                        },
                        "ignore_missing": true
                }
        }, {
                "split": {
                        "field": "nginx.access.remote_ip_list",
                        "separator": "\"?,?\\s+"
                }
        }, {
                "script": {
                        "lang": "painless",
                        "inline": "boolean isPrivate(def ip) { try { StringTokenizer tok = new StringTokenizer(ip, '.'); int firstByte = Integer.parseInt(tok.nextToken());       int secondByte = Integer.parseInt(tok.nextToken());       if (firstByte == 10) {         retuu
rn true;       }       if (firstByte == 192 && secondByte == 168) {         return true;       }       if (firstByte == 172 && secondByte >= 16 && secondByte <= 31) {         return true;       }       if (firstByte == 127) {         return true;       }       return fall
se;     } catch (Exception e) {       return false;     }   }   def found = false;   for (def item : ctx.nginx.access.remote_ip_list) {     if (!isPrivate(item)) {       ctx.nginx.access.remote_ip = item;       found = true;       break;     }   }   if (!found) {     ctxx
.nginx.access.remote_ip = ctx.nginx.access.remote_ip_list[0];   }"
                }
        }, {
                "remove": {
                        "field": "message"
                }
        }, {
                "rename": {
                        "field": "@timestamp",
                        "target_field": "read_timestamp"
                }
        }, {
                "date": {
                        "field": "nginx.access.time",
                        "target_field": "@timestamp",
                        "formats": ["dd/MMM/YYYY:H:m:s Z"]
                }
        }, {
                "remove": {
                        "field": "nginx.access.time"
                }
        }, {
                "user_agent": {
                        "field": "nginx.access.agent",
                        "target_field": "nginx.access.user_agent"
                }
        }, {
                "remove": {
                        "field": "nginx.access.agent"
                }
        }, {
                "geoip": {
                        "field": "nginx.access.remote_ip",
                        "target_field": "nginx.access.geoip"
                }
        }],
        "on_failure": [{
                "set": {
                        "field": "error.message",
                        "value": "{{ _ingest.on_failure_message }}"
                }
        }]
}

I then removed the old ingest pipeline from kibana, loaded this new one and restarted filebeat. Works perfectly.


(system) closed #6

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.