hi @kvch,
Thanks for the quick response
Please find the configuration files below
# Filebeat Configuration#
filebeat.prospectors:
- type: log
  # Change to true to enable this prospector configuration.
  enabled: true
  # Paths that should be crawled and fetched. Glob based paths.
  paths:      
    - /home/manoj/Documents/repository/logs/request-response-logger.log
  
# Filebeat modules ====
filebeat.config.modules:
  # Glob pattern for configuration loading
  path: ${path.config}/modules.d/*.yml
  
#=== Elasticsearch template setting ======
setup.template.settings:
  index.number_of_shards: 3
  #index.codec: best_compression
  #_source.enabled: false
#----------------------------- Logstash output --------------------------------
output.logstash:
  # The Logstash hosts
  hosts: ["localhost:5044"]
Logstash configuration
input {
    beats {
      host => "localhost"
      port => 5044
    }
}
filter {
    if [source] =~ "request-response-logger" {
        mutate { replace => { "type" => "request-response" } }
        if [message] =~ "TRANSACTION:request" {
            grok {
                match => { "message" => "\[%{TIMESTAMP_ISO8601:timestamp}]  %{LOGLEVEL:level} \{%{DATA:logtype}} -  TRANSACTION:%{WORD:transaction},API_REQUEST_ID:%{GREEDYDATA:tid},API_NAME:%{WORD:api},SP_NAME:%{GREEDYDATA:user},API_PUBLISHER:%{WORD:publisher},API_VERSION:%{WORD:api_v},API_CONTEXT:%{GREEDYDATA:context},APPLICATION_NAME:%{WORD:app},APPLICATION_ID:%{NUMBER:app_id},CONSUMER_KEY:%{GREEDYDATA:consumerKey},API_RESOURCE_PATH:%{GREEDYDATA:resourcePath},METHOD:%{WORD:method},BODY:%{GREEDYDATA:body}" }
            }
            aggregate { #store required data in a map
                task_id => "%{tid}"
                code => "
                    map['reqBody'] = event.get('body')
                    map['user'] = event.get('user')                    
                    map['application'] = event.get('app')
                    map['api'] = event.get('api')
                    map['request-time'] = event.get('timestamp')
                    map['response-time'] = event.get('timestamp')#set dummy value for response time
                    map['status'] = 0
                "
                map_action => "create"
            }
            drop {}#drop the request before persisting, to save indexing space in elasticsearch server
        }
        if [message] =~ "TRANSACTION:response" {
            grok {
                match => { "message" => "\[%{TIMESTAMP_ISO8601:response-time}]  %{LOGLEVEL:level} \{%{DATA:logtype}} -  TRANSACTION:%{WORD:transaction},API_REQUEST_ID:%{GREEDYDATA:tid},HTTP_STATUS:%{NUMBER:status},RESPONSE_TIME:%{NUMBER:original_response_time:float},BODY:%{GREEDYDATA:response}" }
                remove_field => ["message"] 
                add_field => { 
                    "request" => "0"
                    "user" => "0"
                    "application" => "0"
                    "api" => "no-api-info" 
                }  
            }
            aggregate {                
                task_id => "%{tid}"
                code => "
                    event.set('request', map['reqBody'])
                    event.set('user', map['user'])    
                    event.set('application', map['application'])
                    event.set('api', map['api'])
                    event.set('request-time', map['request-time'])
                    event.set('transaction', 'full')     
                "
                map_action => "update"
                end_of_task => true
                push_map_as_event_on_timeout => true
                timeout => 20
                timeout_task_id_field => "tid"
                timeout_code => "
                    event.set('response','Response-timeout')
                    event.set('type','request-response')
                    event.set('transaction', 'request')
                "
            }
            if [transaction] =~ "full" {
                ruby {
                    init => "require 'time'"
                    code => "duration = (DateTime.parse(event.get('response-time')).to_time.to_f*1000 - DateTime.parse(event.get('request-time')).to_time.to_f*1000) rescue nil; event.set('service-time', duration); "
                }
            } else {
                mutate {
                    add_field => { "service-time" => "0.0" }
                }    
                if [transaction] =~ "response" {
                    mutate {
                        copy => { "response-time" => "request-time" }
                        #set request time default value from response
                        
                    }
                }
            }
            #ncell reporting requirement, to remove "," from timestamp and replace with a "."
            mutate {
                gsub => [
                    "request-time", ",", ".",
                    "response-time", ",", "."                    
                ]
            }
        }
    }
}
output {
    if [type] == "request-response" and [tid] != "null" {
        elasticsearch {
            hosts => [ "localhost:9200" ]
            index => "transactions-%{+YYYY.MM.dd}"
        }    
    }
}
In my logstash configuration, I have assumed that filebeat is sending the logs in order. So my logic is when the request log cones it is mapped with uuid (tid) using aggregation and wait for the response. if the response with same id received before 20s the logs will be aggregated and saved. If not request log is saved alone after 20s. In my case since the resonce comes first, it is saved without any witing time. So the request comes next and wait for 20s and saved after timeout.