To control the log flows in kibana end through logstash config

HI team,

We have getting the huge logs when application team doing the Performance testing. At that time, Our cluster is not stable due to unable to handle the huge much of load. Could you please help us to control the log flow in logstash config or whether it is possible to control the flow in logstash config filter part section ?

Logstash version - 7.15
Logs Flow Path: FileBeat/MetricBeat -> AWS Kafka -> Logstash -> ES -> Kibana

Thanks & Regards,
Yasar Arafaath A.

Can you give more context on what you want to do?

You want to drop logs? It is not clear what you mean with control the log flows.

If the logs generated during the performance tests have some information that you could filter on, then you would be able to drop those logs.

Hi Leandro Pereira,

Sorry for the inconvienience caused,

This is the Logs Flow - FileBeat/MetricBeat -> AWS Kafka -> Logstash -> ES -> Kibana to visualize the logs in kibana.

During performance testing, we receiving many log files at a moment. We are facing logs lagging in our environment. It takes too much time to reach the normal flow. So, our concern is, we have to control the log flows through Logstash, while application team doing performance testing !

We would like to know if we can control the log flow at Logstash - filter part from Logstash config file. could you please guide us to proceed further on this part ?

Thanks,
Yasar Arafaath A.

You didn't said which kind of control you want to do, logstash has many filters that could help you to do some kinf of control.

Here is the list of logstash filters.

You could use the drop filter to drop events from those performance tests, but you would need to have something to filter so you won't drop the normal events.

You also have the throttle filter that you can use.

But you need to have some way to filter on the events created by the performance tests, otherwise those filters will apply to all events.

Hi Leandro Pereira,

Suppose, If application team pushing the 1k of logs while doing performance testing. In the meantime, kibana trying to receiving the whole 1k of logs. but it does not received the whole logs due to not able to handle the huge much of load at that time.

So, In that case, We need to get the logs like 10mins some count of logs and another 10mins some count of logs and this is the way, we need to controlling the logs while doing performance testing.

The above given logs document sizes are just for an example only.

Could you please tell us whether it possible to control the log flow through logstash filter ?

Hope you understand our concern !!

Many Thanks,
Yasar Arafaath A.

It is possible, check my previous answer, it has a link to the list of logstash filters.

In your case you can use the throttle filter, the documentation has some examples.

But as I said, you need to be able to use a if conditional to separate the logs from the performance test from the real logs, or else the filter will apply to every log.

Hi Leandro Pereira,

We tried to controlling the log flow with the given filter. But we are unable to receive the logs. Please tell us, where we need to implement the changes in filter part.

This is ous filter part config:

filter {

json {

source => "message"

}

#mutate {

remove_field => [ "message" ]

}

fingerprint {
source => [ "fingerprint", "log" ]
target => "[@metadata][fingerprint]"
method => "SHA1"
concatenate_sources => true
}
}

image

Please guide us to proceed further !!

Thanks,
Yasar Arafaath A.

You need to share your entire pipeline, it is not possible to know why you can't receive the logs if you do not share the entire pipeline.

Also, use the </> button to correctly share your configuration, it is really hard to read it without the proper formatting.

Hi

Sorry for the inconvenience caused.

Please find the below config and please tell us where to add those throttle filter in filter part section.

Config:

<input {

kafka {
         bootstrap_servers => "####"

         topics             => ["#####"]
         consumer_threads   => 3
         group_id           => "fb-lc-grp"
         security_protocol  => "SSL"
         ssl_truststore_location => "/usr/share/softwares/cert/kafka.client.truststore.jks"
         ssl_truststore_password => "changeit"
         codec              => "json"
    }

}
filter {

json {

source => "message"

}

#mutate {

remove_field => [ "message" ]

}

fingerprint {
source => [ "fingerprint", "log" ]
target => "[@metadata][fingerprint]"
method => "SHA1"
concatenate_sources => true
}
}
output {

if [logtype] == "linuxlog" {

     elasticsearch {

            index => "ej-lc16-%{[logtype]}-%{+YYYY.MM.dd}"
            hosts => ["##"]
            api_key => ["##"]
            ssl => true
            ilm_rollover_alias => "ej-lc16-linuxlog"
            ilm_pattern => "000001"
            ilm_policy => "DeleteIndexAfterOneDay"
            document_id => "%{[@metadata][fingerprint]}"
      }
}

if [logtype] == "applog" {

     elasticsearch {

            index => "ej-lc-%{[logtype]}-%{+YYYY.MM.dd}"
            hosts => ["##"]
            api_key => ["##"]
            ssl => true
            ilm_rollover_alias => "ej-lc-applog"
            ilm_pattern => "000001"
            ilm_policy => "DeleteIndexAfterOneDay"
            document_id => "%{[@metadata][fingerprint]}"
     }
 }

if [logtype] == "linuxlog" {

     elasticsearch {

            index => "ej-lc-%{[logtype]}-%{+YYYY.MM.dd}"
            hosts => ["##"]
            api_key => ["##"]
            ssl => true
            ilm_rollover_alias => "ej-lc-linuxlog"
            ilm_pattern => "000001"
            ilm_policy => "DeleteIndexAfterOneDay"
            document_id => "%{[@metadata][fingerprint]}"

     }
 }

}/>

Thanks,
Yasar Arafaath A.

Hi

<input {

    kafka {
             bootstrap_servers => "b-2.kafka-ej-development.gv3pju.c6.kafka.eu-west-1.amazonaws.com:9094,b-3.kafka-ej-development.gv3pju.c6.kafka.eu-west-1.amazonaws.com:9094,b-1.kafka-ej-development.gv3pju.c6.kafka.eu-west-1.amazonaws.com:9094"

             topics             => ["easyjet_lc_dev_app_topic","easyjet_lc_dev_linuxlog_topic","easyjet_lc_dev_monitoring_topic","easyjet_lc_dev_linux_topic
"]
             consumer_threads   => 3
             group_id           => "fb-lc-grp"
             security_protocol  => "SSL"
             ssl_truststore_location => "/usr/share/softwares/cert/kafka.client.truststore.jks"
             ssl_truststore_password => "changeit"
             codec              => "json"
        }
 }
filter {
#  json {
#      source => "message"
#  }

#mutate {
#       remove_field => [ "message" ]
#       }

   fingerprint {
        source => [ "fingerprint", "log" ]
        target => "[@metadata][fingerprint]"
        method => "SHA1"
        concatenate_sources => true
   }
}
output {

   if [logtype] == "linuxlog" {

         elasticsearch {

                index => "ej-lc16-%{[logtype]}-%{+YYYY.MM.dd}"
                hosts => ["##"]
                api_key => ["##"]
                ssl => true
                ilm_rollover_alias => "ej-lc16-linuxlog"
                ilm_pattern => "000001"
                ilm_policy => "DeleteIndexAfterOneDay"
                document_id => "%{[@metadata][fingerprint]}"
          }
    }


  if [logtype] == "applog" {

         elasticsearch {

                index => "ej-lc-%{[logtype]}-%{+YYYY.MM.dd}"
                hosts => ["##"]
                api_key => ["##"]
                ssl => true
                ilm_rollover_alias => "ej-lc-applog"
                ilm_pattern => "000001"
                ilm_policy => "DeleteIndexAfterOneDay"
                document_id => "%{[@metadata][fingerprint]}"
         }
     }


  if [logtype] == "linuxlog" {

         elasticsearch {

                index => "ej-lc-%{[logtype]}-%{+YYYY.MM.dd}"
                hosts => ["##"]
                api_key => ["##"]
                ssl => true
                ilm_rollover_alias => "ej-lc-linuxlog"
                ilm_pattern => "000001"
                ilm_policy => "DeleteIndexAfterOneDay"
                document_id => "%{[@metadata][fingerprint]}"

         }
     }
}/>
`````````````````````````````````````````````````````````````````````````````````````````````````````````````````````````````````````````````````


Thanks,
Yasar Arafaath A

Hi Leandro Pereira,

Sorry for the inconvenience caused.

Hope, you getting the correct format of config.

Thanks,
Yasar Arafaath A.

All your output have conditionals, if you are not getting any output this could mean that none of your messages are matching your conditionals.

You need to validate if your messages are being correctly parsed and have the fields that you use in your filters and output.

You can add a file output in your configuration to check this.

Something like:

file {
    path => "/tmp/validate-logs.txt"
}

Hi Leandro Pereira,

But we exactly don't know, where we need to put those throttle filter in our logstash filtering part. Could you please confirm us, is that below filtering method is fine or do we need to change the lines.

filter {
throttle {
        before_count => 3
        after_count => 5
        period => 3600
        max_age => 7200
         }
#  json {
#      source => "message"
#  }

#mutate {
#       remove_field => [ "message" ]
#       }

   fingerprint {
        source => [ "fingerprint", "log" ]
        target => "[@metadata][fingerprint]"
        method => "SHA1"
        concatenate_sources => true
   }
}

Thanks,
Yasar Arafaath A

Hi Leandro Pereira,

But we exactly don't know, where we need to put those throttle filter in our logstash filtering part. Could you please confirm us, is that below filtering method is fine or do we need to change the lines.

filter {
throttle {
        before_count => 3
        after_count => 5
        period => 3600
        max_age => 7200
         }
#  json {
#      source => "message"
#  }

#mutate {
#       remove_field => [ "message" ]
#       }

   fingerprint {
        source => [ "fingerprint", "log" ]
        target => "[@metadata][fingerprint]"
        method => "SHA1"
        concatenate_sources => true
   }
}

Thanks,
Yasar Arafaath A

Hi Leandro Pereira,

If possible, could you please update the query !

Thanks,
Yasar Arafaath A.

Hi Leandro Pereira,

Kindly provide the solution for my query.

Thanks,
Yasar Arafaath A.

Hi Leandro Pereira,

We applied that throttle filter in logstash config, the one thing we could able to find out that is, Suppose the application team sending only two events of logs that we can easily find out they only sending the two events by using below filter config. But I don't think so this could be control the log flow of huge logs.

throttle {
        before_count => 3
        after_count => 5
        period => 3600
        max_age => 7200

Could you please provide us some other filters to control the logs flow in logstash side.

Advance thanks,
Yasar Arafaath A.