Logstash input with filtered output

I need to divide Winlogbeat input and push it to two separate indices when a beat has a particular event ID.

I used an if statement but the indices seems to have the same data.
This is my pipeline configuration:

    input {
  beats {
    port => 5045
  }
}

 output {
     elasticsearch {
      hosts => ["https://x1x:9200", "https://x2x:9200", "https://x3x:9200"]
      ssl => true
      ssl_certificate_verification => false
      user => admin
      password => admin:
      ilm_enabled => false
     index => "windows-%{+YYYY.MM.dd}"
   }
  }
if [winlog.event_id] == [4634] and [event.code] == [4635] {
   elasticsearch {
      hosts => ["https://x1x:9200", "https://x2x:9200", "https://x3x:9200"]
      ssl => true
      ssl_certificate_verification => false
      user => admin
      password => admin
      ilm_enabled => false
     index => "sharepoint-%{+YYYY.MM}"
   }
  }

it is necessary to handle the if else conditions well so that everything is in the output block

I have tried also this way but the result was that the logs (all of them) goes only in the sharepoint index.
My goal is to send in sharepoint only the logs that have 4634 as winlog.event_id, and the others (with different event_id) in windows index.

```

    input {
      beats {
        port => 5045
      }
    }

     output {
      if [winlog.event_id] == [4634] and [event.code] == [4635] {
       elasticsearch {
          hosts => ["https://x1x:9200", "https://x2x:9200", "https://x3x:9200"] 
          ssl => true
          ssl_certificate_verification => false
          user => admin
          password => admin
          ilm_enabled => false
         index => "sharepoint-%{+YYYY.MM}" 
       }
     }
      else {
       elasticsearch {
          hosts => ["https://x1x:9200", "https://x2x:9200", "https://x3x:9200"] 
          ssl => true
          ssl_certificate_verification => false
          user => admin
          password => admin
          ilm_enabled => false
         index => "windows-%{+YYYY.MM.dd}"
       }
     }
    }

```

If the field name has a full stop in it then that will work. If the winlog field is an object that contains an event_id field then that should be [winlog][event_id]. Similarly for the [event] field. logstash does not use the same syntax for naming fields that elasticsearch and kibana do.

I've tried to change it a little bit but no luck. Also now the sharepoint index doesn't get data anymore.

Even the filter doesn't "filter" out that beat containing the user.

Here is the actual configuration :no_mouth:

    input {
      beats {
        port => 5045
      }
    }
    filter {
          if [user][name] == ["sqlSvcAcc"] {
            drop { }
          }
    }

    output {
         elasticsearch {
          hosts => ["https://x:9200", "https://x:9200", "https://x:9200"]
          ssl => true
          ssl_certificate_verification => false
          user => admin
          password => admin
          ilm_enabled => false
         index => "windows-%{+YYYY.MM.dd}"
       }
     if [winlog][event_id] == [4634] or [event][code] == [4634] {
         elasticsearch {
          hosts => ["https://x:9200", "https://x:9200", "https://x:9200"]
          ssl => true
          ssl_certificate_verification => false
          user => admin
          password => admin
          ilm_enabled => false
         index => "sharepoint-%{+YYYY.MM}"
       }
      }
    }

I have no idea how logstash parses that. If the event id is numeric you should use

if [winlog][event_id] == 4634

If it is a string then use

if [winlog][event_id] == "4634"

Similarly for [event][code].

1 Like

After a reboot, the configuration seems to be OK!

BUT, now i need to filter out the "4634" events id from windows index. (The sharepoint index is receveing only 4634 as it should.)

AND, sqlSvcAcc user is still not filtered somehow..

This is the actual configuration:

> 
>     input {
>       beats {
>         port => 5045
>       }
>     }
>     filter {
>           if [user][name] == ["sqlSvcAcc"] or [related][user] == ["sqlSvcAcc"]{
>             drop { }
>           }
>     }
> 
>     output {
>          elasticsearch {
>           hosts => ["https://a:9200", "https://b:9200", "https://c:9200"]
>           ssl => true
>           ssl_certificate_verification => false
>           user => admin
>           password => admin
>           ilm_enabled => false
>          index => "windows-%{+YYYY.MM.dd}"
>        }
>      if [winlog][event_id] == 4624 or [event][code] == 4624 {
>          elasticsearch {
>           hosts => ["https://a:9200", "https://b:9200", "https://c:9200"]
>           ssl => true
>           ssl_certificate_verification => false
>           user => admin
>           password => admin
>           ilm_enabled => false
>          index => "sharepoint-%{+YYYY.MM}"
>        }
>       }
>     }
> 

After a morining of struggle i finally managed how to fix it, this is the last configuration, everything works fine. Thanks for the help.

input {
  beats {
    port => 5045
  }
}
filter {
      if [user][name] == "sqlSvcAcc" or [related][user] == "sqlSvcAcc" {
        drop { }
      }
}

output {

 if [winlog][event_id] != 4624 or [event][code] != 4624 {
     elasticsearch {
      hosts => ["https://host1:9200", "https://host2:9200", "https://host3:9200"]
      ssl => true
      ssl_certificate_verification => false
      user => admin
      password => admin
      ilm_enabled => false
     index => "windows-%{+YYYY.MM.dd}"
   }
 }
 else if [winlog][event_id] == 4624 or [event][code] == 4624{
     elasticsearch {
      hosts => ["https://host1:9200", "https://host2:9200", "https://host3:9200"]
      ssl => true
      ssl_certificate_verification => false
      user => admin
      password => admin
      ilm_enabled => false
     index => "sharepoint-%{+YYYY.MM}"
   }
  }
}

That does not test if the [user][name] field is equal to "sqlSvcAcc". See this github issue for a discussion of what it does do. Remove the brackets.

1 Like

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.