Ingress controller logs directly from filebeat to elasticsearch

hi , I want to ship ingress controllers log directly to elasticsearch via filebeat.
Here is my log format for ingress

{\"remote_ip\": \"%{GREEDYDATA:remote_ip}\",\"method\": \"%{GREEDYDATA:method}\", \"host\": \"%{GREEDYDATA:Hostname}\", \"url\": \"%{GREEDYDATA:url}\", \"http_protocol\": \"HTTP/%{GREEDYDATA:httpversion}\", \"response_code\": \"%{GREEDYDATA:response_code}\", \"referrer\": \"%{GREEDYDATA:referrer}\", \"body\": \"%{GREEDYDATA:body}\", \"agent\": \"%{GREEDYDATA:agent}\"}, \"connection\": \"%{GREEDYDATA:connection}\", \"request_id\": \"%{GREEDYDATA:request_id}\", \"connection_requests\": \"%{GREEDYDATA:connection_requests}\", \"time\": { \"iso8601\": \"%{TIMESTAMP_ISO8601:timestamp}\", \"request\": \"%{GREEDYDATA:request_time}\" }, \"http\": { \"x_forwarded_for\": \"%{GREEDYDATA:x_forwarded_for}\", \"x_forwarded_proto\": \"%{GREEDYDATA:x_forwarded_proto}\", \"x_real_ip\": \"%{GREEDYDATA:x_real_ip}\", \"upstream\": { \"addr\": \"%{GREEDYDATA:upstream_addr}:%{GREEDYDATA:upstream_port}\", \"status\": \"%{GREEDYDATA:upstream_status}\", \"response_time\": \"%{GREEDYDATA:upstream_response_time}

and here is my filebeat config:

      filebeat.autodiscover:
        providers:
          - type: kubernetes
            hints.enabled: true
            templates:
              - condition:
                  equals:
                    kubernetes.container.name: "ingress-nginx-controller"
                config:
                  - module: nginx
                    ingress_controller:
                      enabled: true
                      input:
                        type: container
                        fields:
                          type: ingress 
                        paths:
                          - /var/log/containers/*-${data.kubernetes.container.id}.log
      processors:
        - add_kubernetes_metadata:
      output.elasticsearch:
        indices:
          - index: "ingress-template-%{+yyyy.MM.dd}-000001"
            when.equals:
              fields.type: "ingress"
            setup.template:
              name: "ingress-template"
              pattern: "ingress-*"
              type: index
            setup.ilm:
              enabled: true
              rollover_alias: "ingress-template"
              pattern: "{now/d}-000001"
              policy_name: "ingress-custom"
        hosts: ["X", "X"]
        bulk_max_size: 100

and here is my ingest pipeline:

      description:
        Pipeline for parsing Nginx ingress controller access logs. Requires the
        geoip and user_agent plugins.
      processors:

        - set:
            field: zahra
            value: "zahra"
        - set:
            field: event.ingested
            value: "{{_ingest.timestamp}}"
        - rename:
            field: message
            target_field: event.original
        - grok:
            field: event.original
            patterns:
              - "{\"remote_ip\": \"%{GREEDYDATA:remote_ip}\",\"method\": \"%{GREEDYDATA:method}\", \"host\": \"%{GREEDYDATA:Hostname}\", \"url\": \"%{GREEDYDATA:url}\", \"http_protocol\": \"HTTP/%{GREEDYDATA:httpversion}\", \"response_code\": \"%{GREEDYDATA:response_code}\", \"referrer\": \"%{GREEDYDATA:referrer}\", \"body\": \"%{GREEDYDATA:body}\", \"agent\": \"%{GREEDYDATA:agent}\"}, \"connection\": \"%{GREEDYDATA:connection}\", \"request_id\": \"%{GREEDYDATA:request_id}\", \"connection_requests\": \"%{GREEDYDATA:connection_requests}\", \"time\": { \"iso8601\": \"%{TIMESTAMP_ISO8601:timestamp}\", \"request\": \"%{GREEDYDATA:request_time}\" }, \"http\": { \"x_forwarded_for\": \"%{GREEDYDATA:x_forwarded_for}\", \"x_forwarded_proto\": \"%{GREEDYDATA:x_forwarded_proto}\", \"x_real_ip\": \"%{GREEDYDATA:x_real_ip}\", \"upstream\": { \"addr\": \"%{GREEDYDATA:upstream_addr}:%{GREEDYDATA:upstream_port}\", \"status\": \"%{GREEDYDATA:upstream_status}\", \"response_time\": \"%{GREEDYDATA:upstream_response_time}" 
            ignore_missing: true
        - grok:
            field: nginx.ingress_controller.info
            patterns:
              - "%{WORD:http.request.method} %{DATA:url.original} HTTP/%{NUMBER:http.version}"
              - ""
            ignore_missing: true
        - uri_parts:
            field: url.original
            ignore_failure: true
        - set:
            field: url.domain
            value: "{{destination.domain}}"
            if: ctx.url?.domain == null && ctx.destination?.domain != null
            ignore_failure: true
        - remove:
            field: nginx.ingress_controller.info
        - split:
            field: nginx.ingress_controller.remote_ip_list
            separator: '"?,?\s+'
            ignore_missing: true
        - split:
            field: nginx.ingress_controller.upstream_address_list
            separator: '"?,?\s+'
            ignore_missing: true
        - split:
            field: nginx.ingress_controller.upstream.response.length_list
            separator: '"?,?\s+'
            ignore_missing: true
        - split:
            field: nginx.ingress_controller.upstream.response.time_list
            separator: '"?,?\s+'
            ignore_missing: true
        - split:
            field: nginx.ingress_controller.upstream.response.status_code_list
            separator: '"?,?\s+'
            ignore_missing: true
        - split:
            field: nginx.ingress_controller.origin
            separator: '"?,?\s+'
            ignore_missing: true
        - set:
            field: source.address
            if: ctx.source?.address == null
            value: ""
        - set:
            field: http.request.id
            value: '{{{nginx.ingress_controller.http.request.id}}}'
            ignore_empty_value: true
            ignore_failure: true
        - script:
            if: ctx.nginx?.ingress_controller?.upstream?.response?.length_list != null && ctx.nginx.ingress_controller.upstream.response.length_list.length > 0
            lang: painless
            source: >-
              try {
                if (ctx.nginx.ingress_controller.upstream.response.length_list.length == null) {
                  return;
                }
                int last_length = 0;
                for (def item : ctx.nginx.ingress_controller.upstream.response.length_list) {
                  last_length =  Integer.parseInt(item);
                }
                ctx.nginx.ingress_controller.upstream.response.length = last_length;
              }
              catch (Exception e) {
                ctx.nginx.ingress_controller.upstream.response.length = null;
              }
        - script:
            if: ctx.nginx?.ingress_controller?.upstream?.response?.time_list != null && ctx.nginx.ingress_controller.upstream.response.time_list.length > 0
            lang: painless
            source: >-
              try {
                if (ctx.nginx.ingress_controller.upstream.response.time_list.length == null) {
                  return;
                }
                float res_time = 0;
                for (def item : ctx.nginx.ingress_controller.upstream.response.time_list) {
                  res_time = res_time + Float.parseFloat(item);
                }
                ctx.nginx.ingress_controller.upstream.response.time = res_time;
              }
              catch (Exception e) {
                ctx.nginx.ingress_controller.upstream.response.time = null;
              }
        - script:
            if: ctx.nginx?.ingress_controller?.upstream?.response?.status_code_list != null && ctx.nginx.ingress_controller.upstream.response.status_code_list.length > 0
            lang: painless
            source: >-
              try {
                if (ctx.nginx.ingress_controller.upstream.response.status_code_list.length == null) {
                  return;
                }
                int last_status_code;
                for (def item : ctx.nginx.ingress_controller.upstream.response.status_code_list) {
                  last_status_code = Integer.parseInt(item);
                }
                ctx.nginx.ingress_controller.upstream.response.status_code = last_status_code;
              }
              catch (Exception e) {
                ctx.nginx.ingress_controller.upstream.response.time = null;
              }
        - script:
            if: ctx.nginx?.ingress_controller?.upstream_address_list != null && ctx.nginx.ingress_controller.upstream_address_list.length > 0
            lang: painless
            source: >-
              try {
                if (ctx.nginx.ingress_controller.upstream_address_list.length == null) {
                  return;
                }
                def last_upstream = "";
                for (def item : ctx.nginx.ingress_controller.upstream_address_list) {
                  last_upstream = item;
                }
                StringTokenizer tok = new StringTokenizer(last_upstream, ":");
                if (tok.countTokens()>1) {
                  ctx.nginx.ingress_controller.upstream.ip = tok.nextToken();
                  ctx.nginx.ingress_controller.upstream.port = Integer.parseInt(tok.nextToken());
                } else {
                  ctx.nginx.ingress_controller.upstream.ip = last_upstream;
                }
              }
              catch (Exception e) {
                ctx.nginx.ingress_controller.upstream.ip = null;
                ctx.nginx.ingress_controller.upstream.port = null;
              }
        - script:
            if: ctx.nginx?.ingress_controller?.remote_ip_list != null && ctx.nginx.ingress_controller.remote_ip_list.length > 0
            lang: painless
            source: >-
              boolean isPrivate(def dot, def ip) {
                try {
                  StringTokenizer tok = new StringTokenizer(ip, dot);
                  int firstByte = Integer.parseInt(tok.nextToken());
                  int secondByte = Integer.parseInt(tok.nextToken());
                  if (firstByte == 10) {
                    return true;
                  }
                  if (firstByte == 192 && secondByte == 168) {
                    return true;
                  }
                  if (firstByte == 172 && secondByte >= 16 && secondByte <= 31) {
                    return true;
                  }
                  if (firstByte == 127) {
                    return true;
                  }
                  return false;
                }
                catch (Exception e) {
                  return false;
                }
              }
              try {
                ctx.source.address = null;
                if (ctx.nginx.ingress_controller.remote_ip_list == null) {
                  return;
                }
                def found = false;
                for (def item : ctx.nginx.ingress_controller.remote_ip_list) {
                  if (!isPrivate(params.dot, item)) {
                    ctx.source.address = item;
                    found = true;
                    break;
                  }
                }
                if (!found) {
                  ctx.source.address = ctx.nginx.ingress_controller.remote_ip_list[0];
                }
              }
              catch (Exception e) {
                ctx.source.address = null;
              }
            params:
              dot: .
        - remove:
            field: source.address
            if: ctx.source.address == null
        - grok:
            field: source.address
            patterns:
              - ^%{IP:source.ip}$
            ignore_failure: true
        - rename:
            field: "@timestamp"
            target_field: event.created
        - date:
            field: nginx.ingress_controller.time
            target_field: "@timestamp"
            formats:
              - dd/MMM/yyyy:H:m:s Z
            on_failure:
              - append:
                  field: error.message
                  value: "{{ _ingest.on_failure_message }}"
        - remove:
            field: nginx.ingress_controller.time
        - user_agent:
            field: user_agent.original
            ignore_missing: true
        - geoip:
            field: source.ip
            target_field: source.geo
            ignore_missing: true
        - geoip:
            database_file: GeoLite2-ASN.mmdb
            field: source.ip
            target_field: source.as
            properties:
              - asn
              - organization_name
            ignore_missing: true
        - rename:
            field: source.as.asn
            target_field: source.as.number
            ignore_missing: true
        - rename:
            field: source.as.organization_name
            target_field: source.as.organization.name
            ignore_missing: true
        - set:
            field: event.kind
            value: event
        - append:
            field: event.category
            value: web
        - append:
            field: event.type
            value: info
        - set:
            field: event.outcome
            value: success
            if: "ctx?.http?.response?.status_code != null && ctx.http.response.status_code < 400"
        - set:
            field: event.outcome
            value: failure
            if: "ctx?.http?.response?.status_code != null && ctx.http.response.status_code >= 400"
        - append:
            field: related.ip
            value: "{{source.ip}}"
            if: "ctx?.source?.ip != null"
        - append:
            field: related.ip
            value: "{{destination.ip}}"
            if: "ctx?.destination?.ip != null"
        - append:
            field: related.user
            value: "{{user.name}}"
            if: "ctx?.user?.name != null"
        - script:
            lang: painless
            description: This script processor iterates over the whole document to remove fields with null values.
            source: |
              void handleMap(Map map) {
                for (def x : map.values()) {
                  if (x instanceof Map) {
                      handleMap(x);
                  } else if (x instanceof List) {
                      handleList(x);
                  }
                }
                map.values().removeIf(v -> v == null);
              }
              void handleList(List list) {
                for (def x : list) {
                    if (x instanceof Map) {
                        handleMap(x);
                    } else if (x instanceof List) {
                        handleList(x);
                    }
                }
              }
              handleMap(ctx);
      on_failure:
        - set:
            field: error.message
            value: "{{ _ingest.on_failure_message }}" 

in my discover some fields doesnot index that you can see in below image:


my question is how can I index all these fields so can apply filtering in my searches

Welcome to our community! :smiley:

What makes you say they are not indexed?

Thanks for your response , I deleted the ilm and index template and created them again and my issue has been resolved , Thanks

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.