hello,i have a question when i use the logstash. everythind looks ok, but my data of my kibana is slower and slower.There is an hour gap now,eg:now time is 8:20,but the data of kibana only show up to 7:32.Simultaneously,there is a error in my logstash's logs.It's show below:
[ERROR][logstash.outputs.elasticsearch] Got a bad response code from server, but this code is not considered retryable. Request will be dropped {:code=>504, :response_body=>""}
please help me,thank you very much!!!
my logstash.conf as below:
input {
kafka {
bootstrap_servers => "xx.xx.xx.xx:9092,xx.xx.xx.xx:9092,xx.xx.xx.xx:9092"
group_id => 'logstash_server'
topics => ['nginx_slb_log']
consumer_threads => 10
decorate_events => true
codec => json {
charset => "UTF-8"
}
}
}
filter {
if [type] == 'nginx_slb_log' {
ruby {
init => "@kname = ['time_local', 'request_id', 'request_time', 'remote_addr_port', 'status', 'request_method', 'request_url','server_protocol','request_length','bytes_sent','upstream_addr_all','sent_http_content_type','http_referer','http_user_agent','upstream_response_time','upstream_status','ssl_cipher','ssl_protocol','ssl_session_id','ssl_session_reused','ssl_server_name','ssl_client_verify','tcpinfo_rtt','tcpinfo_rttvar','tcpinfo_snd_cwnd','tcpinfo_rcv_space','request_completion']"
code => "
new_event = LogStash::Event.new(Hash[@kname.zip(event.get('message').split('|'))])
new_event.remove('@timestamp')
event.append(new_event)
temp_value = event.get('time_local').strip
event.set('time_local', temp_value[1..(temp_value.length-2)])
event.set('upstream_status', event.get('upstream_status').to_i)
"
}
if [remote_addr_port] {
ruby {
init => "@kname = ['remote_addr', 'remote_port']"
code => "
new_event = LogStash::Event.new(Hash[@kname.zip(event.get('remote_addr_port').split(':'))])
new_event.remove('@timestamp')
event.append(new_event)
"
remove_field => ["remote_addr_port"]
}
}
if [request_url] {
ruby {
code => "
event.set('scheme', event.get('request_url').split(':')[0])
event.set('http_host', event.get('request_url').split('://')[-1].split('/')[0])
event.set('request_uri', event.get('request_url').split('//')[-1].split('.com')[-1])
"
remove_field => ["request_url"]
}
}
if [upstream_addr_all] {
ruby {
init => "@kname = ['upstream_addr', 'upstream_port']"
code => "
event.set('upstream_addr_all', event.get('upstream_addr_all').split('/')[-1])
new_event = LogStash::Event.new(Hash[@kname.zip(event.get('upstream_addr_all').split(':'))])
new_event.remove('@timestamp')
event.append(new_event)
"
}
}
mutate {
convert => [
"request_time", "float",
"upstream_response_time", "float",
"bytes_sent", "integer",
"request_length", "integer",
"status", "integer"
]
remove_field => ["beat", "offset", "@version", "message", "kafka", "request_url", "upstream_addr_all"]
}
date {
match => ["time_local", "dd/MMM/yyyy:HH:mm:ss Z"]
}
}
}
output {
if [type] == 'nginx_slb_log' {
elasticsearch {
hosts => "xxx.es.amazonaws.com.cn:443"
ssl => true
index => "logstash-nginx-slb-%{+YYYY-MM-dd}"
template_overwrite => true
}
}
}