I am using logstash version 7.17.15 in production environment, In that i am using the ouput plugin is clickhouse and the version is 20.8.3.18.
Using the filebeat I am moving the logs into logstash which is there is remove machine. Logstash filter the logs and insert the logs into clickhouse. When traffic is less there is no any issue all the logs goes to clickhouse DB.
When heavy traffic I am getting below error in logstash logs
INFO ] 2023-12-20 18:27:12.686 [pool-11-thread-1] clickhouse - Retrying connection {:url=>"http:/10.0.0.18:8123?query=INSERT+INTO+bolt.sender+FORMAT+JSONEachRow", :uuid=>"6854f6413da3fe8faab0efe1b3141804"}
[ERROR] 2023-12-20 18:27:15.083 [pool-12-thread-1] clickhouse - [HTTP Output Failure] Could not access URL {:url=>"http:/10.0.0.18:8123?query=INSERT+INTO+bolt.sender+FORMAT+JSONEachRow", :method=>nil, :headers=>nil, :message=>"URI does not specify a valid host name: http:/10.0.0.18:8123?query=INSERT+INTO+bolt.sender+FORMAT+JSONEachRow", :class=>"Manticore::ClientProtocolException", :backtrace=>nil, :size=>3639, :uuid=>"6854f6413da3fe8faab0efe1b3141804"}
Some of the logs are inserted into clickhouse some I am getting above error. Why this is happend? How can I resolve data loss here?
Below I have provided my logstash.conf
input {
beats {
port => 5044
}
}
filter {
if "sender" in [tags] {
grok
{
match => [ "message" , "BOLTFINALLOG %{GREEDYDATA:senderlog}"]
}
mutate {
split => {"senderlog" => "|"}
add_field => {
"sender_host" => "%{[host][hostname]}"
"queueid" => "%{[senderlog][0]}"
"status" => "%{[senderlog][1]}"
"dsn_time" => "%{[senderlog][2]}"
"bolt_injection_time" => "%{[senderlog][3]}"
"injection_time" => "%{[senderlog][4]}"
"listener_time" => "%{[senderlog][5]}"
"generation_time" => "%{[senderlog][6]}"
"from_address" => "%{[senderlog][7]}"
"from_address_res" => "%{[senderlog][7]}"
"rcpt_address" => "%{[senderlog][8]}"
"rcpt_address_res" => "%{[senderlog][8]}"
"delivery_ip" => "%{[senderlog][9]}"
"mx_ip" => "%{[senderlog][10]}"
"size" => "%{[senderlog][11]}"
"vmta" => "%{[senderlog][12]}"
"dsn_status" => "%{[senderlog][14]}"
"dsn_response" => "%{[senderlog][15]}"
"delay" => "%{[senderlog][13]}"
"policy_type" => "%{[senderlog][16]}"
"retry" => "%{[senderlog][17]}"
"[@metadata][table]" => "sender"
}
remove_field => "@timestamp"
remove_field => "host"
remove_field => "senderlog"
remove_field => "agent"
remove_field => "@version"
remove_field => "message"
remove_field => "event"
remove_field => "log"
remove_field => "tags"
remove_field => "input"
remove_field => "ecs"
}
if [from_address_res] =~ /^campaign/ {
mutate {
split => {"from_address_res" => "-"}
add_field => {
"client_name" => "%{[from_address_res][1]}"
"clientid" => "%{[from_address_res][2]}"
"messageid" => "%{[from_address_res][3]}"
}
}
}
mutate{
remove_field => "from_address_res"
}
mutate {
split => {"rcpt_address_res" => "@"}
add_field => {
"rcpt_domain" => "%{[rcpt_address_res][1]}"
}
remove_field => "rcpt_address_res"
}
mutate {
gsub => ["injection_time", "T", " "]
gsub => ["bolt_injection_time", "T", " "]
gsub => ["dsn_time", "T", " "]
gsub => ["listener_time", "T", " "]
gsub => ["generation_time", "T", " "]
}
mutate {
gsub => ["injection_time", "\+05:30", ""]
gsub => ["bolt_injection_time", "\+05:30", ""]
gsub => ["dsn_time", "\+05:30", ""]
gsub => ["listener_time", "\+05:30", ""]
gsub => ["generation_time", "\+05:30", ""]
}
if [injection_time] == "" {
mutate {
remove_field => "injection_time"
}
}
if [generation_time] == "" {
mutate {
remove_field => "generation_time"
}
}
if [bolt_injection_time] == "" {
mutate {
remove_field => "bolt_injection_time"
}
}
if [dsn_time] == "" {
mutate {
remove_field => "dsn_time"
}
}
if [listener_time] == "" {
mutate {
remove_field => "listener_time"
}
}
}
}
output
{
if "sender" == [@metadata][table]
{
clickhouse {
http_hosts => ["http://IP:8123"]
table => "bolt.sender"
flush_size => 5
pool_max => 5
}
}
stdout {
codec => rubydebug
}
}