Integrate APM Logs Format to ELK

I have use ELK Stack to my system.
My system have integrate my system logs format

{
  "@timestamp": "2023-11-03T08:47:32.547Z",
  "log.level": "INFO",
  "message": "Schedule messages: size=1, markerTime=2023-11-03T15:47:17.544131492",
  "ecs.version": "1.2.0",
  "service.name": "example-service",
  "service.version": "4.3.0",
  "service.environment": "stg",
  "event.dataset": "example-service",
  "process.thread.name": "scheduling-1",
  "log.logger": "example.module.inbox.scheduler.FirebaseSenderScheduler",
  "transaction.id": "afb2b520605fb4b5",
  "trace.id": "cf62981eafc6e9bc8656a13d1409c562",
}

But when sent to Elasticsearch, it not mapping to kibana with each keyword and can search on each keyword.

Can have me to this solution.
I hope can show this on each keywork such as log.level or message this json.

You need to parse the message field I guess.
What tool are you using to collect the logs and send them to Elasticsearch?

i using filebeat and logstash.
Filebeat collect logs of application and send to logstash.
And Logstash send to Elasticsearch

What are you using Logstash for?
What kind of logs are you collecting?

I using logstash to collect logs from filebeat and sent to elasticsearch.
At filebeat, i collect log from console message of my application.
My application re-formated log to ecs pino

No transformation in Logstash?
You should directly send from filebeat to Elasticsearch.
In Elasticsearch create an ingest pipeline if needed to transform the data.

Could you share your filebeat configuration file?

I using logstash because i need send log to s3 (logstash with input is filebeat and output is s3 and elasticsearch) and using ilm policy to management index.
I use elk version 8.7.1

That makes sense.

Please share:

  • A sample log line
  • The filebeat config
  • Logstash pipeline config

Log line:

{"@timestamp": "2023-11-03T08:47:32.547Z","log.level": "INFO","message": "Schedule messages: size=1, markerTime=2023-11-03T15:47:17.544131492","ecs.version": "1.2.0","service.name": "example-service","service.version": "4.3.0","service.environment": "stg","event.dataset": "example-service","process.thread.name": "scheduling-1","log.logger": "example.module.inbox.scheduler.FirebaseSenderScheduler","transaction.id": "afb2b520605fb4b5","trace.id": "cf62981eafc6e9bc8656a13d1409c562",}

Filebeat config

filebeat.yml: |-
    filebeat.autodiscover:
      providers:
        - type: kubernetes
          node: ${NODE_NAME}
          hints.enabled: true
          hints.default_config.enabled: false
          templates:
          - condition:
              or:
                - equals:
                    kubernetes.container.name: "example-service"
            config:
            - type: container
              encoding: plain
              paths: ["/var/log/containers/*${data.kubernetes.container.id}.log"]
              multiline.type: pattern
              multiline.pattern: '^{'
              multiline.negate: true
              multiline.match: after
              processors:
                - drop_fields:
                    fields:
                    - kubernetes.node
                    - kubernetes.replicaset.name
                    - kubernetes.labels
                    - kubernetes.namespace
                    - kubernetes.namespace_uid
                    - kubernetes.namespace_labels.name
                    - kubernetes.namespace_labels
                    - kubernetes.container.image
                    - kubernetes.pod.name
                    - kubernetes.pod.ip
                    - kubernetes.deployment.name
                    - container.runtime
                    - container.image.name
                    ignore_missing: true
                - if:
                    equals:
                      kubernetes.container.name: "example-service"
                  then:
                    - add_fields:
                        fields:
                          log_type: example

Logstash config:

logstash.yml: |-
    http.host: "0.0.0.0"
    path.config: /usr/share/logstash/pipeline
    pipeline.workers: 3
    pipeline.batch.size: 100
    pipeline.ecs_compatibility: v8

Logstash pipeline

logstash.conf: |-
    input {
      beats {
        port => 5044
      }
    }
    filter {
      mutate { remove_field => ["input_type", "[event][original]", "[tags]", "[fields][fields_under_root]" ] }
      if [kubernetes][labels][logstyle] == "nginx" {
        #Nginx
        grok {
          match => { "message" => ["%{IPORHOST:[nginx][access][remote_ip]} - %{DATA:[nginx][access][user_name]} \[%{HTTPDATE:[nginx][access][time]}\] \"%{WORD:[nginx][access][method]} %{DATA:[nginx][access][url]} HTTP/%{NUMBER:[nginx][access][http_version]}\" %{NUMBER:[nginx][access][response_code]} %{NUMBER:[nginx][access][body_sent][bytes]}( \"%{DATA:[nginx][access][referrer]}\")?( \"%{DATA:[nginx][access][agent]}\")?",
          "%{IPORHOST:[nginx][access][remote_ip]} - %{DATA:[nginx][access][user_name]} \\[%{HTTPDATE:[nginx][access][time]}\\] \"-\" %{NUMBER:[nginx][access][response_code]} -" ] }
        }

        # date {
        #  match => [ "[nginx][access][time]", "dd/MMM/YYYY:H:m:s Z" ]
        #  remove_field => "[nginx][access][time]"
        # }

        useragent {
          source => "[nginx][access][agent]"
          target => "[nginx][access][user_agent]"
          remove_field => "[nginx][access][agent]"
        }

        geoip {
          source => "[nginx][access][remote_ip]"
          target => "[nginx][access][geoip]"
        }
      }
      else if [kubernetes][pod][labels][app] == "filebeat" {
        #filebeat
        grok {
          match => [ "message", "(?<timestamp>%{TIMESTAMP_ISO8601})\\s+%{LOGLEVEL:level}\\s+%{DATA}\\s+%{GREEDYDATA:logmessage}" ]
        }
      }
      else {
        #HTD java
        grok {
          match => [ "message", "(?<timestamp>%{TIMESTAMP_ISO8601}) - \[(?<thread>[A-Za-z0-9-]+)\] %{LOGLEVEL:level}\\s+(?<class>[A-Za-z0-9.]*\.[A-Za-z0-9#_]+)\\s* - %{GREEDYDATA:logmessage}" ]
        }        
      }
    }
    output {
      if [event][module] == "nginx" {
        elasticsearch {
          hosts => [""]
          user => 'logstash-user'
          password => 'logstash-pwd'
          ilm_enabled => true
          ilm_rollover_alias => "prodenv-nginx"
          ilm_pattern => "{now/d}-000001"
          ilm_policy => "prodenv-nginx-policy"
          timeout => 5
          retry_on_conflict => 3
          retry_max_interval => 15
          retry_initial_interval => 5
          resurrect_delay => 3
        }
        s3 {
          access_key_id => ""
          secret_access_key => ""
          region => ""
          bucket => ""
          size_file => 10485760
          time_file => 30
          codec => "line"
          canned_acl => "private"
          prefix => "prodenv-%{[event][module]}/%{+YYYY}/%{+MM}/%{+dd}"
        }
      }
      else {
        elasticsearch {
          hosts => [""]
          user => 'logstash-user'
          password => 'logstash-pwd'
          ilm_enabled => true
          ilm_rollover_alias => "prodenv-example"
          ilm_pattern => "{now/d}-000001"
          ilm_policy => "prodenv-example-policy"
          timeout => 5
          retry_on_conflict => 3
          retry_max_interval => 15
          retry_initial_interval => 5
          resurrect_delay => 3
        }
        s3 {
          access_key_id => ""
          secret_access_key => ""
          region => ""
          bucket => ""
          size_file => 10485760
          time_file => 30
          codec => "line"
          canned_acl => "private"
          prefix => "prodenv-example/%{+YYYY}/%{+MM}/%{+dd}"
        }
      }

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.