Logstash parsing data incorrectly

0

I am using Logstash version 6.5.4 Data is read and parsed using Filebeat -> Logstash -> Elasticsearch

But as I can see on Kibana for a few cases, I am getting duplicate field data i.e data is converted into an array of duplicate field data separated by a comma .

My message sent from filebeat was:

2024-03-12 13:27:00.126,d85b4ecb-c4b7-4168-bcc1-9b6a3508ce4a,System,NotificationsProcessor,MQ_TO_JOBSERVICE,Tue Mar 12 13:27:00 IST 2024,TENANT,,null,SUCCESSFUL,94,126,'Task completed successfully',TESTING,null,'{"notificationsCount":31}'
{
  "_version": 3,
  "_source": {
    "jobSpecificMetaData": [
      "{\"notificationsCount\":31}",
      "{\"notificationsCount\":31}",
      "{\"notificationsCount\":31}"
    ],
    "current_step_time_ms": 94,
    "request_id": [
      "d85b4ecb-c4b7-4168-bcc1-9b6a3508ce4a",
      "d85b4ecb-c4b7-4168-bcc1-9b6a3508ce4a",
      "d85b4ecb-c4b7-4168-bcc1-9b6a3508ce4a"
    ],
    "tenant_mode": [
      "TESTING",
      "TESTING",
      "TESTING"
    ],
    "total_time_ms": 126,
    "created_timestamp": [
      "2024-03-12 13:27:00.126",
      "2024-03-12 13:27:00.126",
      "2024-03-12 13:27:00.126"
    ],
    "@timestamp": "2024-03-12T07:57:02.887Z",
    "execution_level": [
      "TENANT",
      "TENANT",
      "TENANT"
    ],
    "status": [
      "SUCCESSFUL",
      "SUCCESSFUL",
      "SUCCESSFUL"
    ]
  }
}

I am unable to debug this issue. Kindly help as this is breaking my dashboard and reporting.

You need to share both your filebeat.yml and your logstash configuration.

Filebeat

filebeat.inputs:

# Each - is an input. Most options can be set at the input level, so
# you can use different inputs for various configurations.
# Below are the input specific configurations.

- type: log

  # Change to true to enable this input configuration.
  enabled: true

  # Paths that should be crawled and fetched. Glob based paths.
  paths:
    - /usr/local/apache-tomcat/logs/access_log*.txt
  fields:
    release_version: 1619
  #----------------------------- Logstash output --------------------------------
  output.logstash:
   # The Logstash hosts
   hosts: ["host1","host2"]
   loadbalance: true

Logstash:

input {
 beats {
   port => 5044
  }
}

filter{

  mutate {
      remove_field => ["offset","prospector","input","version","tags"]
      add_field => { "host" => "%{[beat][name]}" }
  }
    
    grok {
      match => { "message" => "\[%{USER:tenant} (?<email>-|[a-zA-Z0-9_.+=:-]+@[0-9A-Za-z][0-9A-Za-z-]{0,62}(?:\.(?:[0-9A-Za-z][0-9A-Za-z-]{0,62}))*)(?:\s+%{DATA:api_user_name})?\] %{IP:client_ip} \[%{HTTPDATE:apache_timestamp}\] \"%{WORD:method} %{NOTSPACE:url} HTTP/%{NUMBER:http_version}\" \[%{GREEDYDATA:OperationName}\] \[%{GREEDYDATA:useragent}\] %{NUMBER:server_response} %{NUMBER:bytes_sent} %{NUMBER:req_process_ms} \[%{GREEDYDATA:androidAppReleaseVersion}\] \[%{GREEDYDATA:androidDeviceUUID}\] \[%{GREEDYDATA:egress_time} %{GREEDYDATA:script_excl_egress_time} %{GREEDYDATA:channel_src_code} %{GREEDYDATA:courier_src_code}\] \[%{DATA:api_status} %{DATA:request_identifier} %{DATA:error_response_code} %{GREEDYDATA:error_response_message}\]" }      

      match => { "message" => "\[%{USER:tenant} (?<email>-|[a-zA-Z0-9_.+=:-]+@[0-9A-Za-z][0-9A-Za-z-]{0,62}(?:\.(?:[0-9A-Za-z][0-9A-Za-z-]{0,62}))*)(?:\s+%{DATA:api_user_name})?\] %{IP:client_ip} \[%{HTTPDATE:apache_timestamp}\] \"%{WORD:method} %{NOTSPACE:url} HTTP/%{NUMBER:http_version}\" \[%{GREEDYDATA:OperationName}\] \[%{GREEDYDATA:useragent}\] %{NUMBER:server_response} %{NUMBER:bytes_sent} %{NUMBER:req_process_ms} \[%{GREEDYDATA:androidAppReleaseVersion}\] \[%{GREEDYDATA:androidDeviceUUID}\] \[%{GREEDYDATA:egress_time} %{GREEDYDATA:script_excl_egress_time} %{GREEDYDATA:channel_src_code} %{GREEDYDATA:courier_src_code}\]"}
      
      match => { "message" => "\[%{USER:tenant} (?<email>-|[a-zA-Z0-9_.+=:-]+@[0-9A-Za-z][0-9A-Za-z-]{0,62}(?:\.(?:[0-9A-Za-z][0-9A-Za-z-]{0,62}))*)\] %{IP:client_ip} \[%{HTTPDATE:apache_timestamp}\] \"%{WORD:method} %{NOTSPACE:url} HTTP/%{NUMBER:http_version}\" \[%{GREEDYDATA:OperationName}\] \[%{GREEDYDATA:useragent}\] %{NUMBER:server_response} %{NUMBER:bytes_sent} %{NUMBER:req_process_ms} \[%{GREEDYDATA:androidAppReleaseVersion}\] \[%{GREEDYDATA:androidDeviceUUID}\]" }
    }


    if [useragent] != "-" and [useragent] != "" {
      useragent {
        add_tag => [ "UA" ]
        source => "useragent"
      }
    }

    if [tenant] == '-'{
      drop { }
    }
     
    if "_grokparsefailure" in [tags] {
      drop { }
    }  
 
    ruby {
      code => "
          event.set('index-name', event.get('source').split('_log.').last.split('.').first);
        if event.get('OperationName') != '-' && event.get('url') != nil
          if event.get('url').include?'/services/soap/?'
            event.set('url', '/services/soap/' + event.get('OperationName')  + event.get('url').split('/services/soap/').last);
          elsif event.get('url').include?'/services/soap/'
            event.set('url', '/services/soap/' + event.get('OperationName') + '/' + event.get('url').split('/services/soap/').last);
          else 
            event.set('url', '/services/soap/' + event.get('OperationName') + event.get('url').split('/services/soap').last);
          end
        end
        if event.get('url').include?'?'
          event.set('base_url',event.get('url').split('?').first);
          event.set('query_params',event.get('url').split('?').last);
        else
          event.set('base_url',event.get('url'));
        end
        event.set('cloud',event.get('host').split('.',2).last.split('.').first);
        if event.get('cloud').start_with?'cloud'
          event.set('client_type','seller');
        else
          event.set('client_type','enterprise');
        end
        if event.get('base_url').include?'soap'
          event.set('api_group','soap');
        else
          event.set('api_group','rest');
        end
        if event.get('egress_time') != nil && event.get('egress_time') == '-'
           event.set('egress_time','0');
        end
        if event.get('script_excl_egress_time') != nil && event.get('script_excl_egress_time') == '-'
           event.set('script_excl_egress_time','0');
        end
        if event.get('error_response_code') != nil && event.get('error_response_code') == '-'
           event.set('error_response_code','0');
        end
         "
    }

   mutate {
            convert => {
                "req_process_ms" => "integer"
                "egress_time" => "integer"
                "script_excl_egress_time" => "integer"
            }
        }

    ruby {
        code => "
        event.set('req_prcoess_excl_script_time',event.get('req_process_ms') - event.get('egress_time') - event.get('script_excl_egress_time'));
        "
    }
  }

}

output {
    elasticsearch {
        hosts => ["elastic1","elastic2","elastic3"]
        index => "access-%{index-name}"
    }
}

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.