How to send logs to logstash from filebeat in the order they are printed in the log file


(Manoj Hettiarachchi) #1

Description:

In my system, I am using Filebeat, Logstash, Elasticsearch, and Kibana.

For every transaction in the system, a log is printed in the log file and it is saved in the Elasticsearch db.

There are around 200 logs are printed in each second.

Each transaction is represented by two log lines as request and response. Always request log is printed before the response log. There is a unique filled called "uuid" is available in both log lines. We use the "uuid" to aggregate the request log and response log in logstash. The "aggregate" filter is used to aggregate the request and response and saved as a single doc in the elasticseach db. The timeout is set to 20s for request to wait for a matching response with same uuid.

Problem:

In this trffic the aggregation function is not applied correctly and request and response are ented to db as separate entries.

I suspect this is due to filebeat sending the response log before the request log to the logstash.

If that is the issue please advise on how to avoid this behavior and config filebeat to send logs to logstash in order.

Or if this is due to some other cause please help to resolve this issue.

Thanks


(Noémi Ványi) #2

Filebeat keeps the order of incoming messages when they are forwarded. It is possible that Filebeat sends events over multiple connections and the request-response pairs might be sent over different connections in different bulks. But I would expect 20 seconds to be enough for Logstash to receive both elements of the pairs. So I think there is a problem with the configuration.

Could you please share your Filebeat and Logstash configurations formatted using </>? Could you also share debug logs of Filebeat?


(Manoj Hettiarachchi) #3

hi @kvch,
Thanks for the quick response

Please find the configuration files below

# Filebeat Configuration#
filebeat.prospectors:
- type: log

  # Change to true to enable this prospector configuration.
  enabled: true

  # Paths that should be crawled and fetched. Glob based paths.
  paths:      
    - /home/manoj/Documents/repository/logs/request-response-logger.log

  
# Filebeat modules ====

filebeat.config.modules:
  # Glob pattern for configuration loading
  path: ${path.config}/modules.d/*.yml

  
#=== Elasticsearch template setting ======

setup.template.settings:
  index.number_of_shards: 3
  #index.codec: best_compression
  #_source.enabled: false

#----------------------------- Logstash output --------------------------------
output.logstash:
  # The Logstash hosts
  hosts: ["localhost:5044"]

Logstash configuration

input {
    beats {
      host => "localhost"
      port => 5044
    }
}

filter {
    if [source] =~ "request-response-logger" {
        mutate { replace => { "type" => "request-response" } }

        if [message] =~ "TRANSACTION:request" {
            grok {
                match => { "message" => "\[%{TIMESTAMP_ISO8601:timestamp}]  %{LOGLEVEL:level} \{%{DATA:logtype}} -  TRANSACTION:%{WORD:transaction},API_REQUEST_ID:%{GREEDYDATA:tid},API_NAME:%{WORD:api},SP_NAME:%{GREEDYDATA:user},API_PUBLISHER:%{WORD:publisher},API_VERSION:%{WORD:api_v},API_CONTEXT:%{GREEDYDATA:context},APPLICATION_NAME:%{WORD:app},APPLICATION_ID:%{NUMBER:app_id},CONSUMER_KEY:%{GREEDYDATA:consumerKey},API_RESOURCE_PATH:%{GREEDYDATA:resourcePath},METHOD:%{WORD:method},BODY:%{GREEDYDATA:body}" }
            }
            aggregate { #store required data in a map
                task_id => "%{tid}"
                code => "
                    map['reqBody'] = event.get('body')
                    map['user'] = event.get('user')                    
                    map['application'] = event.get('app')
                    map['api'] = event.get('api')
                    map['request-time'] = event.get('timestamp')
                    map['response-time'] = event.get('timestamp')#set dummy value for response time
                    map['status'] = 0
                "
                map_action => "create"
            }
            drop {}#drop the request before persisting, to save indexing space in elasticsearch server
        }

        if [message] =~ "TRANSACTION:response" {
            grok {
                match => { "message" => "\[%{TIMESTAMP_ISO8601:response-time}]  %{LOGLEVEL:level} \{%{DATA:logtype}} -  TRANSACTION:%{WORD:transaction},API_REQUEST_ID:%{GREEDYDATA:tid},HTTP_STATUS:%{NUMBER:status},RESPONSE_TIME:%{NUMBER:original_response_time:float},BODY:%{GREEDYDATA:response}" }
                remove_field => ["message"] 
                add_field => { 
                    "request" => "0"
                    "user" => "0"
                    "application" => "0"
                    "api" => "no-api-info" 
                }  
            }

            aggregate {                
                task_id => "%{tid}"

                code => "
                    event.set('request', map['reqBody'])
                    event.set('user', map['user'])    
                    event.set('application', map['application'])
                    event.set('api', map['api'])
                    event.set('request-time', map['request-time'])
                    event.set('transaction', 'full')     
                "
                map_action => "update"
                end_of_task => true
                push_map_as_event_on_timeout => true
                timeout => 20
                timeout_task_id_field => "tid"
                timeout_code => "
                    event.set('response','Response-timeout')
                    event.set('type','request-response')
                    event.set('transaction', 'request')
                "
            }
            if [transaction] =~ "full" {
                ruby {
                    init => "require 'time'"
                    code => "duration = (DateTime.parse(event.get('response-time')).to_time.to_f*1000 - DateTime.parse(event.get('request-time')).to_time.to_f*1000) rescue nil; event.set('service-time', duration); "
                }
            } else {
                mutate {
                    add_field => { "service-time" => "0.0" }
                }    
                if [transaction] =~ "response" {
                    mutate {
                        copy => { "response-time" => "request-time" }
                        #set request time default value from response
                        
                    }
                }
            }

            #ncell reporting requirement, to remove "," from timestamp and replace with a "."
            mutate {
                gsub => [
                    "request-time", ",", ".",
                    "response-time", ",", "."                    
                ]
            }
        }
    }
}

output {
    if [type] == "request-response" and [tid] != "null" {
        elasticsearch {
            hosts => [ "localhost:9200" ]
            index => "transactions-%{+YYYY.MM.dd}"
        }    
    }
}

In my logstash configuration, I have assumed that filebeat is sending the logs in order. So my logic is when the request log cones it is mapped with uuid (tid) using aggregation and wait for the response. if the response with same id received before 20s the logs will be aggregated and saved. If not request log is saved alone after 20s. In my case since the resonce comes first, it is saved without any witing time. So the request comes next and wait for 20s and saved after timeout.


(Noémi Ványi) #4

Could you also add an example request-response log pair?


(Manoj Hettiarachchi) #5

hello @kvch,

please find the request and response log below.

request

[2018-10-18 15:10:24,208] INFO {REQUEST_RESPONSE_LOGGER} - TRANSACTION:request,API_REQUEST_ID:urn:uuid:b4732cef-e655-47c3-affa-ab56d3080df2,API_NAME:elkapi,SP_NAME:manoj@carbon.super,API_PUBLISHER:publisher1,API_VERSION:v1,API_CONTEXT:/read/v1,APPLICATION_NAME:Int1,APPLICATION_ID:15,CONSUMER_KEY:UlsSZIbUfGoD8l8dkpYtF3XnqJsa,API_RESOURCE_PATH:?resourcepath=mypath,METHOD:POST,BODY:<soapenv:Body xmlns:soapenv="http://schemas.xmlsoap.org/soap/envelope/"><jsonObject><wso2telcoAnalyticELK-0.1><id>TEST_1.0.1</id><topic>EXLB_Bearer</topic><description>All Fixed.</description></wso2telcoAnalyticELK-0.1></jsonObject></soapenv:Body>

response

[2018-10-18 15:10:24,228] INFO {REQUEST_RESPONSE_LOGGER} - TRANSACTION:response,API_REQUEST_ID:urn:uuid:b4732cef-e655-47c3-affa-ab56d3080df2,HTTP_STATUS:200,RESPONSE_TIME:16.0,BODY:<soapenv:Body xmlns:soapenv="http://schemas.xmlsoap.org/soap/envelope/"><jsonObject><success>true</success><message>Successs</message></jsonObject></soapenv:Body>


(Manoj Hettiarachchi) #6

please can somebody help me. I'm could not find any solution to this issue yet.


(Manoj Hettiarachchi) #7

I think I have found the cause of the issue. It is mentioned in the documentation that, if we use the "aggregate" filter in logstash the number of worker threads should be set to one. Else there can be mismatches because each worker is independent of each other.

" You should be very careful to set Logstash filter workers to 1 ( -w 1 flag) for this filter to work correctly otherwise events may be processed out of sequence and unexpected results will occur"

But since I have a considerable traffic in my system (250 per second ) it is not practical to set the number of workers to 1.

Does anybody know an alternative to fix this issue?