hi @kvch,
Thanks for the quick response
Please find the configuration files below
# Filebeat Configuration#
filebeat.prospectors:
- type: log
# Change to true to enable this prospector configuration.
enabled: true
# Paths that should be crawled and fetched. Glob based paths.
paths:
- /home/manoj/Documents/repository/logs/request-response-logger.log
# Filebeat modules ====
filebeat.config.modules:
# Glob pattern for configuration loading
path: ${path.config}/modules.d/*.yml
#=== Elasticsearch template setting ======
setup.template.settings:
index.number_of_shards: 3
#index.codec: best_compression
#_source.enabled: false
#----------------------------- Logstash output --------------------------------
output.logstash:
# The Logstash hosts
hosts: ["localhost:5044"]
Logstash configuration
input {
beats {
host => "localhost"
port => 5044
}
}
filter {
if [source] =~ "request-response-logger" {
mutate { replace => { "type" => "request-response" } }
if [message] =~ "TRANSACTION:request" {
grok {
match => { "message" => "\[%{TIMESTAMP_ISO8601:timestamp}] %{LOGLEVEL:level} \{%{DATA:logtype}} - TRANSACTION:%{WORD:transaction},API_REQUEST_ID:%{GREEDYDATA:tid},API_NAME:%{WORD:api},SP_NAME:%{GREEDYDATA:user},API_PUBLISHER:%{WORD:publisher},API_VERSION:%{WORD:api_v},API_CONTEXT:%{GREEDYDATA:context},APPLICATION_NAME:%{WORD:app},APPLICATION_ID:%{NUMBER:app_id},CONSUMER_KEY:%{GREEDYDATA:consumerKey},API_RESOURCE_PATH:%{GREEDYDATA:resourcePath},METHOD:%{WORD:method},BODY:%{GREEDYDATA:body}" }
}
aggregate { #store required data in a map
task_id => "%{tid}"
code => "
map['reqBody'] = event.get('body')
map['user'] = event.get('user')
map['application'] = event.get('app')
map['api'] = event.get('api')
map['request-time'] = event.get('timestamp')
map['response-time'] = event.get('timestamp')#set dummy value for response time
map['status'] = 0
"
map_action => "create"
}
drop {}#drop the request before persisting, to save indexing space in elasticsearch server
}
if [message] =~ "TRANSACTION:response" {
grok {
match => { "message" => "\[%{TIMESTAMP_ISO8601:response-time}] %{LOGLEVEL:level} \{%{DATA:logtype}} - TRANSACTION:%{WORD:transaction},API_REQUEST_ID:%{GREEDYDATA:tid},HTTP_STATUS:%{NUMBER:status},RESPONSE_TIME:%{NUMBER:original_response_time:float},BODY:%{GREEDYDATA:response}" }
remove_field => ["message"]
add_field => {
"request" => "0"
"user" => "0"
"application" => "0"
"api" => "no-api-info"
}
}
aggregate {
task_id => "%{tid}"
code => "
event.set('request', map['reqBody'])
event.set('user', map['user'])
event.set('application', map['application'])
event.set('api', map['api'])
event.set('request-time', map['request-time'])
event.set('transaction', 'full')
"
map_action => "update"
end_of_task => true
push_map_as_event_on_timeout => true
timeout => 20
timeout_task_id_field => "tid"
timeout_code => "
event.set('response','Response-timeout')
event.set('type','request-response')
event.set('transaction', 'request')
"
}
if [transaction] =~ "full" {
ruby {
init => "require 'time'"
code => "duration = (DateTime.parse(event.get('response-time')).to_time.to_f*1000 - DateTime.parse(event.get('request-time')).to_time.to_f*1000) rescue nil; event.set('service-time', duration); "
}
} else {
mutate {
add_field => { "service-time" => "0.0" }
}
if [transaction] =~ "response" {
mutate {
copy => { "response-time" => "request-time" }
#set request time default value from response
}
}
}
#ncell reporting requirement, to remove "," from timestamp and replace with a "."
mutate {
gsub => [
"request-time", ",", ".",
"response-time", ",", "."
]
}
}
}
}
output {
if [type] == "request-response" and [tid] != "null" {
elasticsearch {
hosts => [ "localhost:9200" ]
index => "transactions-%{+YYYY.MM.dd}"
}
}
}
In my logstash configuration, I have assumed that filebeat is sending the logs in order. So my logic is when the request log cones it is mapped with uuid (tid) using aggregation and wait for the response. if the response with same id received before 20s the logs will be aggregated and saved. If not request log is saved alone after 20s. In my case since the resonce comes first, it is saved without any witing time. So the request comes next and wait for 20s and saved after timeout.