hi , I want to ship ingress controllers log directly to elasticsearch via filebeat.
Here is my log format for ingress
{\"remote_ip\": \"%{GREEDYDATA:remote_ip}\",\"method\": \"%{GREEDYDATA:method}\", \"host\": \"%{GREEDYDATA:Hostname}\", \"url\": \"%{GREEDYDATA:url}\", \"http_protocol\": \"HTTP/%{GREEDYDATA:httpversion}\", \"response_code\": \"%{GREEDYDATA:response_code}\", \"referrer\": \"%{GREEDYDATA:referrer}\", \"body\": \"%{GREEDYDATA:body}\", \"agent\": \"%{GREEDYDATA:agent}\"}, \"connection\": \"%{GREEDYDATA:connection}\", \"request_id\": \"%{GREEDYDATA:request_id}\", \"connection_requests\": \"%{GREEDYDATA:connection_requests}\", \"time\": { \"iso8601\": \"%{TIMESTAMP_ISO8601:timestamp}\", \"request\": \"%{GREEDYDATA:request_time}\" }, \"http\": { \"x_forwarded_for\": \"%{GREEDYDATA:x_forwarded_for}\", \"x_forwarded_proto\": \"%{GREEDYDATA:x_forwarded_proto}\", \"x_real_ip\": \"%{GREEDYDATA:x_real_ip}\", \"upstream\": { \"addr\": \"%{GREEDYDATA:upstream_addr}:%{GREEDYDATA:upstream_port}\", \"status\": \"%{GREEDYDATA:upstream_status}\", \"response_time\": \"%{GREEDYDATA:upstream_response_time}
and here is my filebeat config:
filebeat.autodiscover:
providers:
- type: kubernetes
hints.enabled: true
templates:
- condition:
equals:
kubernetes.container.name: "ingress-nginx-controller"
config:
- module: nginx
ingress_controller:
enabled: true
input:
type: container
fields:
type: ingress
paths:
- /var/log/containers/*-${data.kubernetes.container.id}.log
processors:
- add_kubernetes_metadata:
output.elasticsearch:
indices:
- index: "ingress-template-%{+yyyy.MM.dd}-000001"
when.equals:
fields.type: "ingress"
setup.template:
name: "ingress-template"
pattern: "ingress-*"
type: index
setup.ilm:
enabled: true
rollover_alias: "ingress-template"
pattern: "{now/d}-000001"
policy_name: "ingress-custom"
hosts: ["X", "X"]
bulk_max_size: 100
and here is my ingest pipeline:
description:
Pipeline for parsing Nginx ingress controller access logs. Requires the
geoip and user_agent plugins.
processors:
- set:
field: zahra
value: "zahra"
- set:
field: event.ingested
value: "{{_ingest.timestamp}}"
- rename:
field: message
target_field: event.original
- grok:
field: event.original
patterns:
- "{\"remote_ip\": \"%{GREEDYDATA:remote_ip}\",\"method\": \"%{GREEDYDATA:method}\", \"host\": \"%{GREEDYDATA:Hostname}\", \"url\": \"%{GREEDYDATA:url}\", \"http_protocol\": \"HTTP/%{GREEDYDATA:httpversion}\", \"response_code\": \"%{GREEDYDATA:response_code}\", \"referrer\": \"%{GREEDYDATA:referrer}\", \"body\": \"%{GREEDYDATA:body}\", \"agent\": \"%{GREEDYDATA:agent}\"}, \"connection\": \"%{GREEDYDATA:connection}\", \"request_id\": \"%{GREEDYDATA:request_id}\", \"connection_requests\": \"%{GREEDYDATA:connection_requests}\", \"time\": { \"iso8601\": \"%{TIMESTAMP_ISO8601:timestamp}\", \"request\": \"%{GREEDYDATA:request_time}\" }, \"http\": { \"x_forwarded_for\": \"%{GREEDYDATA:x_forwarded_for}\", \"x_forwarded_proto\": \"%{GREEDYDATA:x_forwarded_proto}\", \"x_real_ip\": \"%{GREEDYDATA:x_real_ip}\", \"upstream\": { \"addr\": \"%{GREEDYDATA:upstream_addr}:%{GREEDYDATA:upstream_port}\", \"status\": \"%{GREEDYDATA:upstream_status}\", \"response_time\": \"%{GREEDYDATA:upstream_response_time}"
ignore_missing: true
- grok:
field: nginx.ingress_controller.info
patterns:
- "%{WORD:http.request.method} %{DATA:url.original} HTTP/%{NUMBER:http.version}"
- ""
ignore_missing: true
- uri_parts:
field: url.original
ignore_failure: true
- set:
field: url.domain
value: "{{destination.domain}}"
if: ctx.url?.domain == null && ctx.destination?.domain != null
ignore_failure: true
- remove:
field: nginx.ingress_controller.info
- split:
field: nginx.ingress_controller.remote_ip_list
separator: '"?,?\s+'
ignore_missing: true
- split:
field: nginx.ingress_controller.upstream_address_list
separator: '"?,?\s+'
ignore_missing: true
- split:
field: nginx.ingress_controller.upstream.response.length_list
separator: '"?,?\s+'
ignore_missing: true
- split:
field: nginx.ingress_controller.upstream.response.time_list
separator: '"?,?\s+'
ignore_missing: true
- split:
field: nginx.ingress_controller.upstream.response.status_code_list
separator: '"?,?\s+'
ignore_missing: true
- split:
field: nginx.ingress_controller.origin
separator: '"?,?\s+'
ignore_missing: true
- set:
field: source.address
if: ctx.source?.address == null
value: ""
- set:
field: http.request.id
value: '{{{nginx.ingress_controller.http.request.id}}}'
ignore_empty_value: true
ignore_failure: true
- script:
if: ctx.nginx?.ingress_controller?.upstream?.response?.length_list != null && ctx.nginx.ingress_controller.upstream.response.length_list.length > 0
lang: painless
source: >-
try {
if (ctx.nginx.ingress_controller.upstream.response.length_list.length == null) {
return;
}
int last_length = 0;
for (def item : ctx.nginx.ingress_controller.upstream.response.length_list) {
last_length = Integer.parseInt(item);
}
ctx.nginx.ingress_controller.upstream.response.length = last_length;
}
catch (Exception e) {
ctx.nginx.ingress_controller.upstream.response.length = null;
}
- script:
if: ctx.nginx?.ingress_controller?.upstream?.response?.time_list != null && ctx.nginx.ingress_controller.upstream.response.time_list.length > 0
lang: painless
source: >-
try {
if (ctx.nginx.ingress_controller.upstream.response.time_list.length == null) {
return;
}
float res_time = 0;
for (def item : ctx.nginx.ingress_controller.upstream.response.time_list) {
res_time = res_time + Float.parseFloat(item);
}
ctx.nginx.ingress_controller.upstream.response.time = res_time;
}
catch (Exception e) {
ctx.nginx.ingress_controller.upstream.response.time = null;
}
- script:
if: ctx.nginx?.ingress_controller?.upstream?.response?.status_code_list != null && ctx.nginx.ingress_controller.upstream.response.status_code_list.length > 0
lang: painless
source: >-
try {
if (ctx.nginx.ingress_controller.upstream.response.status_code_list.length == null) {
return;
}
int last_status_code;
for (def item : ctx.nginx.ingress_controller.upstream.response.status_code_list) {
last_status_code = Integer.parseInt(item);
}
ctx.nginx.ingress_controller.upstream.response.status_code = last_status_code;
}
catch (Exception e) {
ctx.nginx.ingress_controller.upstream.response.time = null;
}
- script:
if: ctx.nginx?.ingress_controller?.upstream_address_list != null && ctx.nginx.ingress_controller.upstream_address_list.length > 0
lang: painless
source: >-
try {
if (ctx.nginx.ingress_controller.upstream_address_list.length == null) {
return;
}
def last_upstream = "";
for (def item : ctx.nginx.ingress_controller.upstream_address_list) {
last_upstream = item;
}
StringTokenizer tok = new StringTokenizer(last_upstream, ":");
if (tok.countTokens()>1) {
ctx.nginx.ingress_controller.upstream.ip = tok.nextToken();
ctx.nginx.ingress_controller.upstream.port = Integer.parseInt(tok.nextToken());
} else {
ctx.nginx.ingress_controller.upstream.ip = last_upstream;
}
}
catch (Exception e) {
ctx.nginx.ingress_controller.upstream.ip = null;
ctx.nginx.ingress_controller.upstream.port = null;
}
- script:
if: ctx.nginx?.ingress_controller?.remote_ip_list != null && ctx.nginx.ingress_controller.remote_ip_list.length > 0
lang: painless
source: >-
boolean isPrivate(def dot, def ip) {
try {
StringTokenizer tok = new StringTokenizer(ip, dot);
int firstByte = Integer.parseInt(tok.nextToken());
int secondByte = Integer.parseInt(tok.nextToken());
if (firstByte == 10) {
return true;
}
if (firstByte == 192 && secondByte == 168) {
return true;
}
if (firstByte == 172 && secondByte >= 16 && secondByte <= 31) {
return true;
}
if (firstByte == 127) {
return true;
}
return false;
}
catch (Exception e) {
return false;
}
}
try {
ctx.source.address = null;
if (ctx.nginx.ingress_controller.remote_ip_list == null) {
return;
}
def found = false;
for (def item : ctx.nginx.ingress_controller.remote_ip_list) {
if (!isPrivate(params.dot, item)) {
ctx.source.address = item;
found = true;
break;
}
}
if (!found) {
ctx.source.address = ctx.nginx.ingress_controller.remote_ip_list[0];
}
}
catch (Exception e) {
ctx.source.address = null;
}
params:
dot: .
- remove:
field: source.address
if: ctx.source.address == null
- grok:
field: source.address
patterns:
- ^%{IP:source.ip}$
ignore_failure: true
- rename:
field: "@timestamp"
target_field: event.created
- date:
field: nginx.ingress_controller.time
target_field: "@timestamp"
formats:
- dd/MMM/yyyy:H:m:s Z
on_failure:
- append:
field: error.message
value: "{{ _ingest.on_failure_message }}"
- remove:
field: nginx.ingress_controller.time
- user_agent:
field: user_agent.original
ignore_missing: true
- geoip:
field: source.ip
target_field: source.geo
ignore_missing: true
- geoip:
database_file: GeoLite2-ASN.mmdb
field: source.ip
target_field: source.as
properties:
- asn
- organization_name
ignore_missing: true
- rename:
field: source.as.asn
target_field: source.as.number
ignore_missing: true
- rename:
field: source.as.organization_name
target_field: source.as.organization.name
ignore_missing: true
- set:
field: event.kind
value: event
- append:
field: event.category
value: web
- append:
field: event.type
value: info
- set:
field: event.outcome
value: success
if: "ctx?.http?.response?.status_code != null && ctx.http.response.status_code < 400"
- set:
field: event.outcome
value: failure
if: "ctx?.http?.response?.status_code != null && ctx.http.response.status_code >= 400"
- append:
field: related.ip
value: "{{source.ip}}"
if: "ctx?.source?.ip != null"
- append:
field: related.ip
value: "{{destination.ip}}"
if: "ctx?.destination?.ip != null"
- append:
field: related.user
value: "{{user.name}}"
if: "ctx?.user?.name != null"
- script:
lang: painless
description: This script processor iterates over the whole document to remove fields with null values.
source: |
void handleMap(Map map) {
for (def x : map.values()) {
if (x instanceof Map) {
handleMap(x);
} else if (x instanceof List) {
handleList(x);
}
}
map.values().removeIf(v -> v == null);
}
void handleList(List list) {
for (def x : list) {
if (x instanceof Map) {
handleMap(x);
} else if (x instanceof List) {
handleList(x);
}
}
}
handleMap(ctx);
on_failure:
- set:
field: error.message
value: "{{ _ingest.on_failure_message }}"
in my discover some fields doesnot index that you can see in below image:
my question is how can I index all these fields so can apply filtering in my searches