How to split linux and windows data in two different indexes

Hi.
I created two conf file in /etc/logstash/conf.d for two hosts: Linux hosts with filebeat and windows hosts with winlogbeat.
In output part I wrote two dofferent indexes in pattern: "linux-%{+YYYY.MM.dd}" and "windows-%{+YYYY.MM.dd}".
In input part I wrote different ports for each hosts: 5045 for linux and 5046 for windows with host address "0.0.0.0" in both files.
Now the whole data from both hosts is in the only one index: windows-2018.08.09.
What I did wrong?

You've configured Logstash with a single pipeline. Unless you use conditionals all events will reach all filters and outputs. Logstash will concatenate all files in /etc/logstash/conf.d.

Can you give an example for these conditionals?
What is a better way to configure: create two pipeline for linux hosts and windows hosts or create a conditional in *.conf file?
Thanks

Can you give an example for these conditionals?

What is a better way to configure: create two pipeline for linux hosts and windows hosts or create a conditional in *.conf file?

I'm struggling to come up with an example where it makes any significant difference.

I still didn't recognize how I can configure Logstash. I'll appreciate if you can explain.
I have 100 servers (10 linux and 90 windows). I want to split all logs into 2 indexes : linux and windows.
Do I need to create a huge *.conf file in /etc/logstash/conf.d with inputs from 100 servers with different ports under conditions you've suggested or I need to create many pipelines with many *.conf files?
May be it will be more convenient and appropriate to concatenate all events into one kind of index splitted by days (I need also to delete indexes older than 1 year)?

I have 100 servers (10 linux and 90 windows). I want to split all logs into 2 indexes : linux and windows.
Do I need to create a huge *.conf file in /etc/logstash/conf.d with inputs from 100 servers with different ports under conditions you've suggested or I need to create many pipelines with many *.conf files?

One Beats input is enough but you can have more if you want. See the example at elasticsearch - Make logstash add different inputs to different indices - Stack Overflow.

May be it will be more convenient and appropriate to concatenate all events into one kind of index splitted by days (I need also to delete indexes older than 1 year)?

You'll definitely want to use time-based indexes (though not necessarily daily; depending on the log volumes monthly indexes might be better).

1 Like

The monthly index will be with this pattern in an output:
"%{type}-%{+YYYY.MM}"?

Yes.

Thank you!

But I still don't understand :

How I can split the data into different indexes if in many conf files with conditions ES concatenates all events. For example these are two different conf files in conf.d:

For sys log with 5044 port:

input {
beats {
port => 5044
host => "0.0.0.0"
}
}

filter {
if [fileset][module] == "system" {
if [fileset][name] == "auth" {
grok {
match => { "message" => ["%{SYSLOGTIMESTAMP:[system][auth][timestamp]} %{SYSLOGHOST:[system][auth][hostname]} sshd(?:[%{POSINT:[system][auth][pid]}])?: %{DATA:[system][auth][ssh][event]} %{DATA:[system][auth][ssh][method]} for (invalid user )?%{DATA:[system][auth][user]} from %{IPORHOST:[system][auth][ssh][ip]} port %{NUMBER:[system][auth][ssh][port]} ssh2(: %{GREEDYDATA:[system][auth][ssh][signature]})?",
"%{SYSLOGTIMESTAMP:[system][auth][timestamp]} %{SYSLOGHOST:[system][auth][hostname]} sshd(?:[%{POSINT:[system][auth][pid]}])?: %{DATA:[system][auth][ssh][event]} user %{DATA:[system][auth][user]} from %{IPORHOST:[system][auth][ssh][ip]}",
"%{SYSLOGTIMESTAMP:[system][auth][timestamp]} %{SYSLOGHOST:[system][auth][hostname]} sshd(?:[%{POSINT:[system][auth][pid]}])?: Did not receive identification string from %{IPORHOST:[system][auth][ssh][dropped_ip]}",
"%{SYSLOGTIMESTAMP:[system][auth][timestamp]} %{SYSLOGHOST:[system][auth][hostname]} sudo(?:[%{POSINT:[system][auth][pid]}])?: \s*%{DATA:[system][auth][user]} :frowning: %{DATA:[system][auth][sudo][error]} ;)? TTY=%{DATA:[system][auth][sudo][tty]} ; PWD=%{DATA:[system][auth][sudo][pwd]} ; USER=%{DATA:[system][auth][sudo][user]} ; COMMAND=%{GREEDYDATA:[system][auth][sudo][command]}",
"%{SYSLOGTIMESTAMP:[system][auth][timestamp]} %{SYSLOGHOST:[system][auth][hostname]} groupadd(?:[%{POSINT:[system][auth][pid]}])?: new group: name=%{DATA:system.auth.groupadd.name}, GID=%{NUMBER:system.auth.groupadd.gid}",
"%{SYSLOGTIMESTAMP:[system][auth][timestamp]} %{SYSLOGHOST:[system][auth][hostname]} useradd(?:[%{POSINT:[system][auth][pid]}])?: new user: name=%{DATA:[system][auth][user][add][name]}, UID=%{NUMBER:[system][auth][user][add][uid]}, GID=%{NUMBER:[system][auth][user][add][gid]}, home=%{DATA:[system][auth][user][add][home]}, shell=%{DATA:[system][auth][user][add][shell]}$",
"%{SYSLOGTIMESTAMP:[system][auth][timestamp]} %{SYSLOGHOST:[system][auth][hostname]} %{DATA:[system][auth][program]}(?:[%{POSINT:[system][auth][pid]}])?: %{GREEDYMULTILINE:[system][auth][message]}"] }
pattern_definitions => {
"GREEDYMULTILINE"=> "(.|\n)"
}
remove_field => "message"
}
date {
match => [ "[system][auth][timestamp]", "MMM d HH:mm:ss", "MMM dd HH:mm:ss" ]
}
geoip {
source => "[system][auth][ssh][ip]"
target => "[system][auth][ssh][geoip]"
}
}
else if [fileset][name] == "syslog" {
grok {
match => { "message" => ["%{SYSLOGTIMESTAMP:[system][syslog][timestamp]} %{SYSLOGHOST:[system][syslog][hostname]} %{DATA:[system][syslog][program]}(?:[%{POSINT:[system][syslog][pid]}])?: %{GREEDYMULTILINE:[system][syslog][message]}"] }
pattern_definitions => { "GREEDYMULTILINE" => "(.|\n)
" }
remove_field => "message"
}
date {
match => [ "[system][syslog][timestamp]", "MMM d HH:mm:ss", "MMM dd HH:mm:ss" ]
}
}
}
}

output {
elasticsearch {
sniffing => true
hosts => ["uk1lv8702:9200", "uk1lv8703:9200", "uk1lv8704:9200"]
index => "linux-%{+YYYY.MM}" # days index
manage_template => false
#index => "%{[@metadata][beat]}-%{+YYYY.MM.dd}"
}
}

For eventlog with 5046 port:

input {
beats {
port => 5046
host => "0.0.0.0"
}
}

filter {
grok {
match => { "message" => "%{GREEDYDATA:message}" }
}
}

output {
elasticsearch {
sniffing => true
hosts => ["uk1lv8702:9200", "uk1lv8703:9200", "uk1lv8704:9200"]
index => "windows-%{+YYYY.MM.dd}"
manage_template => false
}
}

You must have some characteristic (tag or field) that sets apart your different kinds of logs (you haven't showed example events so I can't get more concrete than that). Use that to select which filters and outputs to apply. You can set any fields and tags you want in the Winlogbeat and Filebeat configurations. Let's say you set type to "winevent" in Winlogbeat; then you can wrap the current contents of the filter and output sections in your Winlogbeat file with

if [type] == "winevent" {
  ...
}

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.