Logstash with ouput clickhouse plugin more than 80% logs are missing

I am using logstash version 7.17.15 in production environment, In that i am using the ouput plugin is clickhouse and the version is 20.8.3.18.

Using the filebeat I am moving the logs into logstash which is there is remove machine. Logstash filter the logs and insert the logs into clickhouse. When traffic is less there is no any issue all the logs goes to clickhouse DB.
When heavy traffic I am getting below error in logstash logs

INFO ] 2023-12-20 18:27:12.686 [pool-11-thread-1] clickhouse - Retrying connection {:url=>"http:/10.0.0.18:8123?query=INSERT+INTO+bolt.sender+FORMAT+JSONEachRow", :uuid=>"6854f6413da3fe8faab0efe1b3141804"}
[ERROR] 2023-12-20 18:27:15.083 [pool-12-thread-1] clickhouse - [HTTP Output Failure] Could not access URL {:url=>"http:/10.0.0.18:8123?query=INSERT+INTO+bolt.sender+FORMAT+JSONEachRow", :method=>nil, :headers=>nil, :message=>"URI does not specify a valid host name: http:/10.0.0.18:8123?query=INSERT+INTO+bolt.sender+FORMAT+JSONEachRow", :class=>"Manticore::ClientProtocolException", :backtrace=>nil, :size=>3639, :uuid=>"6854f6413da3fe8faab0efe1b3141804"}

Some of the logs are inserted into clickhouse some I am getting above error. Why this is happend? How can I resolve data loss here?

Below I have provided my logstash.conf

input {
        beats {
                port => 5044
        }
}
filter {
        if "sender" in [tags] {
                grok
                {
                        match => [ "message" , "BOLTFINALLOG %{GREEDYDATA:senderlog}"]
                }
                mutate {
                        split => {"senderlog" => "|"}
                        add_field => {
                                "sender_host" => "%{[host][hostname]}"
                                "queueid" => "%{[senderlog][0]}"
                                "status" => "%{[senderlog][1]}"
                                "dsn_time" => "%{[senderlog][2]}"
                                "bolt_injection_time" => "%{[senderlog][3]}"
                                "injection_time" => "%{[senderlog][4]}"
                                "listener_time" => "%{[senderlog][5]}"
                                "generation_time" => "%{[senderlog][6]}"
                                "from_address" => "%{[senderlog][7]}"
                                "from_address_res" => "%{[senderlog][7]}"
                                "rcpt_address" => "%{[senderlog][8]}"
                                "rcpt_address_res" => "%{[senderlog][8]}"
                                "delivery_ip" => "%{[senderlog][9]}"
                                "mx_ip" => "%{[senderlog][10]}"
                                "size" => "%{[senderlog][11]}"
                                "vmta" => "%{[senderlog][12]}"
                                "dsn_status" => "%{[senderlog][14]}"
                                "dsn_response" => "%{[senderlog][15]}"
                                "delay" => "%{[senderlog][13]}"
                                "policy_type" => "%{[senderlog][16]}"
                                "retry" => "%{[senderlog][17]}"
                                "[@metadata][table]" => "sender"
                        }
                        remove_field => "@timestamp"
                        remove_field => "host"
                        remove_field => "senderlog"
                        remove_field => "agent"
                        remove_field => "@version"
                        remove_field => "message"
                        remove_field => "event"
                        remove_field => "log"
                        remove_field => "tags"
                        remove_field => "input"
                        remove_field => "ecs"
                }
                if [from_address_res] =~ /^campaign/ {
                mutate {
                        split => {"from_address_res" => "-"}
                        add_field => {
                                "client_name" => "%{[from_address_res][1]}"
                                "clientid" => "%{[from_address_res][2]}"
                                "messageid" => "%{[from_address_res][3]}"
                        }
                }
                }
                mutate{
                        remove_field => "from_address_res"
                }
                mutate {
                        split => {"rcpt_address_res" => "@"}
                        add_field => {
                                "rcpt_domain" => "%{[rcpt_address_res][1]}"
                        }
                        remove_field => "rcpt_address_res"
                }
                mutate {
                        gsub => ["injection_time", "T", " "]
                        gsub => ["bolt_injection_time", "T", " "]
                        gsub => ["dsn_time", "T", " "]
                        gsub => ["listener_time", "T", " "]
                        gsub => ["generation_time", "T", " "]
                }
                mutate {
                        gsub => ["injection_time", "\+05:30", ""]
                        gsub => ["bolt_injection_time", "\+05:30", ""]
                        gsub => ["dsn_time", "\+05:30", ""]
                        gsub => ["listener_time", "\+05:30", ""]
                        gsub => ["generation_time", "\+05:30", ""]
                }
                if [injection_time] == "" {
                        mutate {
                                remove_field => "injection_time"
                        }
                }
                if [generation_time] == "" {
                        mutate {
                                remove_field => "generation_time"
                        }
                }
                if [bolt_injection_time] == "" {
                        mutate {
                                remove_field => "bolt_injection_time"
                        }
                }
                if [dsn_time] == "" {
                        mutate {
                                remove_field => "dsn_time"
                        }
                }
                if [listener_time] == "" {
                        mutate {
                                remove_field => "listener_time"
                        }
                }
        }
}
output
{
        if "sender" == [@metadata][table]
        {
                clickhouse {
                        http_hosts   => ["http://IP:8123"]
                        table        => "bolt.sender"
                        flush_size => 5
                        pool_max => 5
                }
        }

   stdout {
    codec => rubydebug
  }
}

From what you described it looks like an issue on Clickhouse side, do you have any logs for it?

It seems that it can not keep up with the event rate from Logstash.

Are you using Persistent Queue in Logstash? It may help with the data loss.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.