How to filter based on log_stream


(roxana) #1

hello. in aws cloud watch, i have group 1 that has 4 streams, how can i get logs from just one of the streams in logstash? i am using cloudwatch_logs plugin in logstash.
i have this

cloudwatch_logs {
log_group => ["Group 1" ]
region => "us-west-2"
access_key_id => "sfsdfsdf"
secret_access_key => "sdsdfdsfsd"
}
im


(Magnus Bäck) #2

I'm not familiar with the cloudwatch_logs input, but it looks like the stream name is being stored in the [cloudwatch_logs][log_stream] field so you can look at that field to e.g. drop all events except those from a particular stream (see https://www.elastic.co/guide/en/logstash/current/event-dependent-configuration.html#conditionals).


(roxana) #3

Thanks Magnus, c
i see ongoing streams in this format in logstash console

{
"@timestamp" => 2017-05-26T22:44:12.951Z,
"@version" => "1",
"message" => "15:44:12.950 [main] INFO org.springframework.data.rest.webmvc.RepositoryRestHandlerMapping - Mapped "{[/{repository}/{id}],methods=[OPTIONS],produces=[application/hal+json || application/json || application/*+json;charset=UTF-8]}" onto public org.springframework.http.ResponseEntity<?> org.springframework.data.rest.webmvc.RepositoryEntityController.optionsForItemResource(org.springframework.data.rest.webmvc.RootResourceInformation)",
"cloudwatch_logs" => {
"event_id" => "33358316657573528386365774610515871725880801939687604257",
"log_group" => "my Log-Group",
"ingestion_time" => 2017-05-26T22:44:15.081Z,
"log_stream" => "dev-container"
}

how do i filter out just those have log_stream=='my favorite_stream'???


(Magnus Bäck) #4

how do i filter out just those have log_stream==‘my favorite_stream’???

Use a drop filter and wrap it in a conditional as described in the documentation I linked to so events from your favorite stream aren't dropped.


(roxana) #5

thanks. how about filtering for a period of time?
is there something like
if [ingestion_time] > '2017-01-01'
??


(Magnus Bäck) #6

Pretty much, but for that to work I think you'll have to convert the ingestion_time field to a string (it's currently a timestamp). Use a mutate filter's convert option or that.


(roxana) #7

Hi Magnus, I am too close can you please help me here as i am not familiar with the syntax

 filter {   
  mutate {
       convert => ["ingestion_time", "String"]
 }
if  [cloudwatch_logs][ingestion_time]  < "2017-06-12" {
           drop{}
}

}

i cant get it to work


(roxana) #8

Error registering plugin {:plugin=>"#<LogStash::FilterDelegator:0xefe471f @id="3442afcb1ec1a741b185766e812106260160fc3c-2", @klass=LogStash::Filters::Mutate, @metric_events=#<


(Magnus Bäck) #9
   convert => ["ingestion_time", "String"]

Your field is named [cloudwatch_logs][ingestion_time], not plain ingestion_time.


(roxana) #10

i tried convert => ["[cloudwatch_logs][ingestion_time]", "String"] and still get errors


(Magnus Bäck) #11

Post those errors in full (what you posted above is incomplete) and make sure you post the logs as preformatted text so they doesn't get mangled.


(roxana) #12
filter {   
 mutate {
   		convert => ["[cloudwatch_logs][ingestion_time]", "String"]
}  
    if  [cloudwatch_logs][ingestion_time]  < "2017-06-12" {
         drop{}
   }

}

and this is the error i get

[2017-08-10T08:07:09,286][ERROR][logstash.agent ] Cannot create pipeline {:reason=>"Expected one of #, => at line 17, column 12 (byte 397) after filter { \n \t mutate {\n \t\tconvert => ["[cloudwatch_logs][ingestion_time]", "String"]\n \t \n\n \n if "}


(Magnus Bäck) #13

What you've posted here look okay but the problem could be elsewhere in your file.


(roxana) #14

Sending Logstash's logs to C:/elk-5.5.1/logstash-5.5.1/logs which is now configured via log4j2.properties
[2017-08-10T08:15:07,831][INFO ][logstash.outputs.elasticsearch] Elasticsearch pool URLs updated {:changes=>{:removed=>[], :added=>[http://localhost:9200/]}}
[2017-08-10T08:15:07,831][INFO ][logstash.outputs.elasticsearch] Running health check to see if an Elasticsearch connection is working {:healthcheck_url=>http://localhost:9200/, :path=>"/"}
[2017-08-10T08:15:07,956][WARN ][logstash.outputs.elasticsearch] Restored connection to ES instance {:url=>#Java::JavaNet::URI:0x1c2aa0cd}
[2017-08-10T08:15:07,956][INFO ][logstash.outputs.elasticsearch] Using mapping template from {:path=>nil}
[2017-08-10T08:15:08,019][INFO ][logstash.outputs.elasticsearch] Attempting to install template {:manage_template=>{"template"=>"logstash-", "version"=>50001, "settings"=>{"index.refresh_interval"=>"5s"}, "mappings"=>{"default"=>{"_all"=>{"enabled"=>true, "norms"=>false}, "dynamic_templates"=>[{"message_field"=>{"path_match"=>"message", "match_mapping_type"=>"string", "mapping"=>{"type"=>"text", "norms"=>false}}}, {"string_fields"=>{"match"=>"", "match_mapping_type"=>"string", "mapping"=>{"type"=>"text", "norms"=>false, "fields"=>{"keyword"=>{"type"=>"keyword", "ignore_above"=>256}}}}}], "properties"=>{"@timestamp"=>{"type"=>"date", "include_in_all"=>false}, "@version"=>{"type"=>"keyword", "include_in_all"=>false}, "geoip"=>{"dynamic"=>true, "properties"=>{"ip"=>{"type"=>"ip"}, "location"=>{"type"=>"geo_point"}, "latitude"=>{"type"=>"half_float"}, "longitude"=>{"type"=>"half_float"}}}}}}}}
[2017-08-10T08:15:08,035][INFO ][logstash.outputs.elasticsearch] New Elasticsearch output {:class=>"LogStash::Outputs::ElasticSearch", :hosts=>[#Java::JavaNet::URI:0x63f87640]}
[2017-08-10T08:15:08,035][ERROR][logstash.pipeline ] Error registering plugin {:plugin=>"#<LogStash::FilterDelegator:0x4e700119 @id="a8f5b95a3626715416998f32d48250291ab92d15-2", @klass=LogStash::Filters::Mutate, @metric_events=#<LogStash::Instrument::NamespacedMetric:0x25947f1b @metric=#<LogStash::Instrument::Metric:0x7c42abff @collector=#<LogStash::Instrument::Collector:0x4360f9db @agent=nil, @metric_store=#<LogStash::Instrument::MetricStore:0xd564067 @store=#<Concurrent::map:0x0000000006407c entries=2 default_proc=nil>, @structured_lookup_mutex=#Mutex:0xbbc8a71, @fast_lookup=#<Concurrent::map:0x00000000064080 entries=52 default_proc=nil>>>>, @namespace_name=[:stats, :pipelines, :main, :plugins, :filters, :"a8f5b95a3626715416998f32d48250291ab92d15-2", :events]>, @logger=#<LogStash::Logging::Logger:0x26224609 @logger=#Java::OrgApacheLoggingLog4jCore::Logger:0x76a27519>, @filter=<LogStash::Filters::Mutate convert=>{"[cloudwatch_logs][ingestion_time]"=>"String"}, id=>"a8f5b95a3626715416998f32d48250291ab92d15-2", enable_metric=>true, periodic_flush=>false>>", :error=>"translation missing: en.logstash.agent.configuration.invalid_plugin_register"}
[2017-08-10T08:15:08,050][ERROR][logstash.agent ] Pipeline aborted due to error {:exception=>#<LogStash::ConfigurationError: translation missing: en.logstash.agent.configuration.invalid_plugin_register>, :backtrace=>["C:/elk-5.5.1/logstash-5.5.1/vendor/bundle/jruby/1.9/gems/logstash-filter-mutate-3.1.5/lib/logstash/filters/mutate.rb:189:in register'", "org/jruby/RubyHash.java:1342:ineach'", "C:/elk-5.5.1/logstash-5.5.1/vendor/bundle/jruby/1.9/gems/logstash-filter-mutate-3.1.5/lib/logstash/filters/mutate.rb:183:in register'", "C:/elk-5.5.1/logstash-5.5.1/logstash-core/lib/logstash/pipeline.rb:281:inregister_plugin'", "C:/elk-5.5.1/logstash-5.5.1/logstash-core/lib/logstash/pipeline.rb:292:in register_plugins'", "org/jruby/RubyArray.java:1613:ineach'", "C:/elk-5.5.1/logstash-5.5.1/logstash-core/lib/logstash/pipeline.rb:292:in register_plugins'", "C:/elk-5.5.1/logstash-5.5.1/logstash-core/lib/logstash/pipeline.rb:302:instart_workers'", "C:/elk-5.5.1/logstash-5.5.1/logstash-core/lib/logstash/pipeline.rb:226:in run'", "C:/elk-5.5.1/logstash-5.5.1/logstash-core/lib/logstash/agent.rb:398:instart_pipeline'"]}
[2017-08-10T08:15:08,175][INFO ][logstash.agent ] Successfully started Logstash API endpoint {:port=>9600}
[2017-08-10T08:15:11,066][WARN ][logstash.agent ] stopping pipeline {:id=>"main"}


(roxana) #15

sorry about that. I missed a closing brace. now i see the real error. thanks for your help


(system) #16

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.