How to filter based on log_stream

hello. in aws cloud watch, i have group 1 that has 4 streams, how can i get logs from just one of the streams in logstash? i am using cloudwatch_logs plugin in logstash.
i have this

cloudwatch_logs {
log_group => ["Group 1" ]
region => "us-west-2"
access_key_id => "sfsdfsdf"
secret_access_key => "sdsdfdsfsd"
}
im

I'm not familiar with the cloudwatch_logs input, but it looks like the stream name is being stored in the [cloudwatch_logs][log_stream] field so you can look at that field to e.g. drop all events except those from a particular stream (see https://www.elastic.co/guide/en/logstash/current/event-dependent-configuration.html#conditionals).

Thanks Magnus, c
i see ongoing streams in this format in logstash console

{
"@timestamp" => 2017-05-26T22:44:12.951Z,
"@version" => "1",
"message" => "15:44:12.950 [main] INFO org.springframework.data.rest.webmvc.RepositoryRestHandlerMapping - Mapped "{[/{repository}/{id}],methods=[OPTIONS],produces=[application/hal+json || application/json || application/*+json;charset=UTF-8]}" onto public org.springframework.http.ResponseEntity<?> org.springframework.data.rest.webmvc.RepositoryEntityController.optionsForItemResource(org.springframework.data.rest.webmvc.RootResourceInformation)",
"cloudwatch_logs" => {
"event_id" => "33358316657573528386365774610515871725880801939687604257",
"log_group" => "my Log-Group",
"ingestion_time" => 2017-05-26T22:44:15.081Z,
"log_stream" => "dev-container"
}

how do i filter out just those have log_stream=='my favorite_stream'???

how do i filter out just those have log_stream==‘my favorite_stream’???

Use a drop filter and wrap it in a conditional as described in the documentation I linked to so events from your favorite stream aren't dropped.

thanks. how about filtering for a period of time?
is there something like
if [ingestion_time] > '2017-01-01'
??

Pretty much, but for that to work I think you'll have to convert the ingestion_time field to a string (it's currently a timestamp). Use a mutate filter's convert option or that.

Hi Magnus, I am too close can you please help me here as i am not familiar with the syntax

 filter {   
  mutate {
       convert => ["ingestion_time", "String"]
 }
if  [cloudwatch_logs][ingestion_time]  < "2017-06-12" {
           drop{}
}

}

i cant get it to work

Error registering plugin {:plugin=>"#<LogStash::FilterDelegator:0xefe471f @id="3442afcb1ec1a741b185766e812106260160fc3c-2", @klass=LogStash::Filters::Mutate, @metric_events=#<

   convert => ["ingestion_time", "String"]

Your field is named [cloudwatch_logs][ingestion_time], not plain ingestion_time.

i tried convert => ["[cloudwatch_logs][ingestion_time]", "String"] and still get errors

Post those errors in full (what you posted above is incomplete) and make sure you post the logs as preformatted text so they doesn't get mangled.

filter {   
 mutate {
   		convert => ["[cloudwatch_logs][ingestion_time]", "String"]
}  
    if  [cloudwatch_logs][ingestion_time]  < "2017-06-12" {
         drop{}
   }

}

and this is the error i get

[2017-08-10T08:07:09,286][ERROR][logstash.agent ] Cannot create pipeline {:reason=>"Expected one of #, => at line 17, column 12 (byte 397) after filter { \n \t mutate {\n \t\tconvert => ["[cloudwatch_logs][ingestion_time]", "String"]\n \t \n\n \n if "}

What you've posted here look okay but the problem could be elsewhere in your file.

Sending Logstash's logs to C:/elk-5.5.1/logstash-5.5.1/logs which is now configured via log4j2.properties
[2017-08-10T08:15:07,831][INFO ][logstash.outputs.elasticsearch] Elasticsearch pool URLs updated {:changes=>{:removed=>[], :added=>[http://localhost:9200/]}}
[2017-08-10T08:15:07,831][INFO ][logstash.outputs.elasticsearch] Running health check to see if an Elasticsearch connection is working {:healthcheck_url=>http://localhost:9200/, :path=>"/"}
[2017-08-10T08:15:07,956][WARN ][logstash.outputs.elasticsearch] Restored connection to ES instance {:url=>#Java::JavaNet::URI:0x1c2aa0cd}
[2017-08-10T08:15:07,956][INFO ][logstash.outputs.elasticsearch] Using mapping template from {:path=>nil}
[2017-08-10T08:15:08,019][INFO ][logstash.outputs.elasticsearch] Attempting to install template {:manage_template=>{"template"=>"logstash-", "version"=>50001, "settings"=>{"index.refresh_interval"=>"5s"}, "mappings"=>{"default"=>{"_all"=>{"enabled"=>true, "norms"=>false}, "dynamic_templates"=>[{"message_field"=>{"path_match"=>"message", "match_mapping_type"=>"string", "mapping"=>{"type"=>"text", "norms"=>false}}}, {"string_fields"=>{"match"=>"", "match_mapping_type"=>"string", "mapping"=>{"type"=>"text", "norms"=>false, "fields"=>{"keyword"=>{"type"=>"keyword", "ignore_above"=>256}}}}}], "properties"=>{"@timestamp"=>{"type"=>"date", "include_in_all"=>false}, "@version"=>{"type"=>"keyword", "include_in_all"=>false}, "geoip"=>{"dynamic"=>true, "properties"=>{"ip"=>{"type"=>"ip"}, "location"=>{"type"=>"geo_point"}, "latitude"=>{"type"=>"half_float"}, "longitude"=>{"type"=>"half_float"}}}}}}}}
[2017-08-10T08:15:08,035][INFO ][logstash.outputs.elasticsearch] New Elasticsearch output {:class=>"LogStash::Outputs::ElasticSearch", :hosts=>[#Java::JavaNet::URI:0x63f87640]}
[2017-08-10T08:15:08,035][ERROR][logstash.pipeline ] Error registering plugin {:plugin=>"#<LogStash::FilterDelegator:0x4e700119 @id="a8f5b95a3626715416998f32d48250291ab92d15-2", @klass=LogStash::Filters::Mutate, @metric_events=#<LogStash::Instrument::NamespacedMetric:0x25947f1b @metric=#<LogStash::Instrument::Metric:0x7c42abff @collector=#<LogStash::Instrument::Collector:0x4360f9db @agent=nil, @metric_store=#<LogStash::Instrument::MetricStore:0xd564067 @store=#<Concurrent::map:0x0000000006407c entries=2 default_proc=nil>, @structured_lookup_mutex=#Mutex:0xbbc8a71, @fast_lookup=#<Concurrent::map:0x00000000064080 entries=52 default_proc=nil>>>>, @namespace_name=[:stats, :pipelines, :main, :plugins, :filters, :"a8f5b95a3626715416998f32d48250291ab92d15-2", :events]>, @logger=#<LogStash::Logging::Logger:0x26224609 @logger=#Java::OrgApacheLoggingLog4jCore::Logger:0x76a27519>, @filter=<LogStash::Filters::Mutate convert=>{"[cloudwatch_logs][ingestion_time]"=>"String"}, id=>"a8f5b95a3626715416998f32d48250291ab92d15-2", enable_metric=>true, periodic_flush=>false>>", :error=>"translation missing: en.logstash.agent.configuration.invalid_plugin_register"}
[2017-08-10T08:15:08,050][ERROR][logstash.agent ] Pipeline aborted due to error {:exception=>#<LogStash::ConfigurationError: translation missing: en.logstash.agent.configuration.invalid_plugin_register>, :backtrace=>["C:/elk-5.5.1/logstash-5.5.1/vendor/bundle/jruby/1.9/gems/logstash-filter-mutate-3.1.5/lib/logstash/filters/mutate.rb:189:in register'", "org/jruby/RubyHash.java:1342:ineach'", "C:/elk-5.5.1/logstash-5.5.1/vendor/bundle/jruby/1.9/gems/logstash-filter-mutate-3.1.5/lib/logstash/filters/mutate.rb:183:in register'", "C:/elk-5.5.1/logstash-5.5.1/logstash-core/lib/logstash/pipeline.rb:281:inregister_plugin'", "C:/elk-5.5.1/logstash-5.5.1/logstash-core/lib/logstash/pipeline.rb:292:in register_plugins'", "org/jruby/RubyArray.java:1613:ineach'", "C:/elk-5.5.1/logstash-5.5.1/logstash-core/lib/logstash/pipeline.rb:292:in register_plugins'", "C:/elk-5.5.1/logstash-5.5.1/logstash-core/lib/logstash/pipeline.rb:302:instart_workers'", "C:/elk-5.5.1/logstash-5.5.1/logstash-core/lib/logstash/pipeline.rb:226:in run'", "C:/elk-5.5.1/logstash-5.5.1/logstash-core/lib/logstash/agent.rb:398:instart_pipeline'"]}
[2017-08-10T08:15:08,175][INFO ][logstash.agent ] Successfully started Logstash API endpoint {:port=>9600}
[2017-08-10T08:15:11,066][WARN ][logstash.agent ] stopping pipeline {:id=>"main"}

sorry about that. I missed a closing brace. now i see the real error. thanks for your help

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.