Logstash intermittently working

hi Team,

I'm using Starting Logstash {"logstash.version"=>"6.8.23"}, my es query is able to show the results in dev-tools, but when i run them through logstash from my local machine, its not working, let me give my sample query,

input {

  elasticsearch {
    id => "input_c_response_time"
    hosts => "hostAddress"
    index => "logstash-*"
    schedule => "*/2 * * * *"
    docinfo => true
    add_field => [ "metric", "c_response_time" ]
    query => '{"query":{"bool":{"must":[{"query_string":{"query":"environment: stage AND role: api AND (\"in custommessagerecoverer\" AND class: \"c.g.c.s.m.CustomMessageRecoverer\") OR (\"Message processed successfully\" AND class: \"c.g.c.s.m.SubscriptionListener\")"}},{"range":{"@timestamp":{"gte":"now-24h","lte":"now-1m","format":"epoch_millis"}}}]}}}'
  }
}

i'm running using loglevel: trace i don't see any errors,
i can share below output after successfully starting logstash

[2025-03-04T00:13:31,136][DEBUG][logstash.instrument.periodicpoller.cgroup] One or more required cgroup files or directories not found: /proc/self/cgroup, /sys/fs/cgroup/cpuacct, /sys/fs/cgroup/cpu
[2025-03-04T00:13:31,182][DEBUG][logstash.instrument.periodicpoller.jvm] collector name {:name=>"ParNew"}
[2025-03-04T00:13:31,182][DEBUG][logstash.instrument.periodicpoller.jvm] collector name {:name=>"ConcurrentMarkSweep"}
[2025-03-04T00:13:32,165][DEBUG][logstash.pipeline        ] Pushing flush onto pipeline {:pipeline_id=>"main", :thread=>"#<Thread:0x7c29a018 sleep>"}
[2025-03-04T00:13:36,146][DEBUG][logstash.instrument.periodicpoller.cgroup] One or more required cgroup files or directories not found: /proc/self/cgroup, /sys/fs/cgroup/cpuacct, /sys/fs/cgroup/cpu
[2025-03-04T00:13:36,192][DEBUG][logstash.instrument.periodicpoller.jvm] collector name {:name=>"ParNew"}
[2025-03-04T00:13:36,192][DEBUG][logstash.instrument.periodicpoller.jvm] collector name {:name=>"ConcurrentMarkSweep"}
[2025-03-04T00:13:37,172][DEBUG][logstash.pipeline        ] Pushing flush onto pipeline {:pipeline_id=>"main", :thread=>"#<Thread:0x7c29a018 sleep>"}

can someone help why its not working?

Thank you,
Vaseem

Hello Vaseem,

Can you share the whole logstash pipeline?

What you could try:

  • maybe your data is filtered out: remove any filters
  • maybe the pipeline works and the data cannot be written to the target system: use file output with rubydebug codec to see if data is coming from your pipeline at all

Btw, you should consider upgrading to a newer version. Starting with 7.x, Logstash has a setting pipeline.separate_logs which creates a separate log file for each pipeline making analysis much easier.

Best regards
Wolfram

Hi @Wolfram_Haussig

Thank you for your response.
our input source logs are in Asynchronous in nature , any my filter plugin looks like below

filter {

    if [metric] == "reponse_times" {
      mutate {
        remove_field => [ "tags" ]
	    }

      grok {
        match => { "message" => ["(?:%{TIMESTAMP_ISO8601:timestamp}) (?:%{NOTSPACE:nonce}) INFO (?:\[%{NOTSPACE:queue}\]) (?:%{NOTSPACE:className}) - Message processed successfully in (?:%{NUMBER:timeTaken:int}) ms for routingKey:(?:%{NOTSPACE:routingKey})","(?:%{TIMESTAMP_ISO8601:timestamp}) (?:(%{NUMBER:business_id}\|%{NUMBER:user_id}\|)?%{NOTSPACE:nonce}) INFO (?:\[%{NOTSPACE:queue}\]) (?:%{NOTSPACE:className}) - In CustomMessageRecoverer after (?:%{NUMBER:timeTaken:int}) ms for routingKey:(?:%{NOTSPACE:routingKey})"]}
  		}
      if "In CustomMessageRecoverer after" in [message] {
        mutate {
          add_field => [ "status", "fail" ]
        }
  		 }
       else{
        mutate {
          add_field => [ "status", "pass" ]
  		  }
       }
       mutate {
         split => ["routingKey", ":" ]
         add_field => [ "contextType", "%{[routingKey][1]}" ]
         add_field => [ "contextId" , "%{[routingKey][2]}" ]
      }

  }
}

Thanks,
Vaseem