Log in wrong index

Hello,

I've got a strange problem.

I'm using logstash to get logs from syslog and ossec into elasticsearch to make it possible to search through either kibana or queries run directly on it.
I have different index paterns for events from syslog and ossec, the syslog index patern is logstash-%{+YYYY.MM.dd} and for ossec it is ossec-%{+YYYY.MM.dd}

Now the interesting part is that some logs who should go to the syslog indexes go into the ossec indexes, i have no idea how this can happen because these are the 2 config files for logstash:

Ossec config:

input {
lumberjack {
port => 5003
type => "lumberjack"
ssl_certificate => "/etc/logstash/logstash-forwarder.crt"
ssl_key => "/etc/logstash/logstash-forwarder.key"
codec => json
}
}
filter {
geoip {
source => "srcip"
target => "geoip"
database => "/opt/logstash/vendor/geoip/GeoLiteCity.dat"
add_field => [ "[geoip][location]", "%{[geoip][longitude]}" ]
add_field => [ "[geoip][location]", "%{[geoip][latitude]}" ]
}
date {
match => ["timestamp", "YYYY MMM dd HH:mm:ss"]
target => "@timestamp"
}
mutate {
convert => [ "[geoip][location]", "float"]
rename => [ "hostname", "AgentName" ]
rename => [ "geoip", "GeoLocation" ]
rename => [ "file", "AlertsFile" ]
rename => [ "agentip", "AgentIP" ]
rename => [ "[rule][comment]", "[rule][description]" ]
rename => [ "[rule][level]", "[rule][AlertLevel]" ]
remove_field => [ "timestamp" ]
}
}

output {
#stdout { codec => rubydebug }
elasticsearch {
hosts => ["bcksrv16:9200"]
index => "ossec-%{+YYYY.MM.dd}"
document_type => "ossec"
template => "/etc/logstash/elastic-ossec-template.json"
template_name => "ossec"
template_overwrite => true
}

}

And syslog
input {
udp {
port => 5001
type => syslog
}
}

filter {
if [type] == "syslog" {
grok {
break_on_match => true
match => ["message", "%{SYSLOGTIMESTAMP:syslog_timestamp} %{SYSLOGHOST:syslog_hostname} %{DATA:syslog_program}(: %{POSINT:win_eventid})?(?:[%{POSINT:syslog_pid}])?: %{GREEDYDATA:syslog_message}"]
add_field => ["received_at", "%{@timestamp}"]
remove_field => ["host"]
}
syslog_pri {}
date {
match => ["syslog_timestamp", "MMM d HH:mm:ss", "MMM dd HH:mm:ss", "ISO8601", "MM/dd/yy HH:mm:ss"]
}
}
}

filter {
grok {
break_on_match => true
match => [
"message", "%{HAPROXYHTTP}"
]
add_tag => ["HAProxy"]
}
geoip {
database => "/opt/logstash/vendor/geoip/GeoLiteCity.dat"
source => "client_ip"
target => "geoip"
}
}

output {
elasticsearch {
hosts => bcksrv16
index => "logstash-%{+YYYY.MM.dd}"
document_type => "syslog"
}
}

I've checked and there is no way that the syslog events are sent to the logstash ossec port.

If anyone has an idea please let me know.

Greetings Richard.

This is a common misconception.

Logstash has a single event pipeline. Dividing your configuration into multiple files doesn't change that fact.
Having these two configuration files...

file A:

input { ... }
output { ... }

file B:

input { ... }
output { ... }

...is exactly equivalent to having just this file:

input { ... }
output { ... }
input { ... }
output { ... }

Unless you have conditionals, events from all inputs are filtered through all filters and end up in all outputs.

1 Like

Helo Magnusbaeck.

Thanks for your reply, that helps a bit. But how can i fix this because i'm new to logstash and elasticsearch?

Greetings Richard.

Have a look at https://www.elastic.co/guide/en/logstash/current/event-dependent-configuration.html.

Again sorry i was to quick, i read both the document and your first comment again and i think i might have a bit of an understanding.

I have to filter the events and get them in the right output right?
What I'm not sure about yet is how to do it exactly, So some help there would be very much appreciated.

I think if i where to do it like this it should work right?

input {
	lumberjack {
		port => 5003
		type => "lumberjack"
		ssl_certificate => "/etc/logstash/logstash-forwarder.crt"
		ssl_key => "/etc/logstash/logstash-forwarder.key"
		codec => json
	}
}
filter {
	if [type] == "ossec" {
		geoip {
			source => "srcip"
			target => "geoip"
			database => "/opt/logstash/vendor/geoip/GeoLiteCity.dat"
			add_field => ["[geoip][location]", "%{[geoip][longitude]}"]
			add_field => ["[geoip][location]", "%{[geoip][latitude]}"]
		}
		date {
			match => ["timestamp", "YYYY MMM dd HH:mm:ss"]
			target => "@timestamp"
		}
		mutate {
			convert => ["[geoip][location]", "float"]
			rename => ["hostname", "AgentName"]
			rename => ["geoip", "GeoLocation"]
			rename => ["file", "AlertsFile"]
			rename => ["agentip", "AgentIP"]
			rename => ["[rule][comment]", "[rule][description]"]
			rename => ["[rule][level]", "[rule][AlertLevel]"]
			remove_field => ["timestamp"]
		}
	}
}

output {
	 # stdout {
		codec => rubydebug
	}
	elasticsearch {
		hosts => ["bcksrv16:9200"]
		index => "ossec-%{+YYYY.MM.dd}"
		document_type => "ossec"
		template => "/etc/logstash/elastic-ossec-template.json"
		template_name => "ossec"
		template_overwrite => true
	}
}


input {
	udp {
		port => 5001
		type => syslog
	}
}

filter {
	if [type] == "syslog" {
		grok {
			break_on_match => true
			match => ["message", "%{SYSLOGTIMESTAMP:syslog_timestamp} %{SYSLOGHOST:syslog_hostname} %{DATA:syslog_program}(: %{POSINT:win_eventid})?(?:\[%{POSINT:syslog_pid}\])?: %{GREEDYDATA:syslog_message}"]
			add_field => ["received_at", "%{@timestamp}"]
			remove_field => ["host"]
		}
		syslog_pri {}
		date {
			match => ["syslog_timestamp", "MMM  d HH:mm:ss", "MMM dd HH:mm:ss", "ISO8601", "MM/dd/yy HH:mm:ss"]
		}
	}
}

output {
	elasticsearch {
		hosts => bcksrv16
		index => "logstash-%{+YYYY.MM.dd}"
		document_type => "syslog"
	}
}

I have removed one filter part from the syslog part because it's no longer needed, we don't sent those items to logstash anymore.

Thanks for any help

You're already using conditionals so just continue doing that.

output {
  if [type] == "syslog" {
    elasticsearch {
      hosts => bcksrv16
      index => "logstash-%{+YYYY.MM.dd}"
    }
  }
}

Similarly for the other events.

Thank you very much, sometimes it is so much simpler then i think. Why didn't i think of this.

I have changed the config files a bit they are now like this
For ossec:

input {
	lumberjack {
		port => 5003
		type => "lumberjack"
		ssl_certificate => "/etc/logstash/logstash-forwarder.crt"
		ssl_key => "/etc/logstash/logstash-forwarder.key"
		codec => json
	}
}
filter {
	if [type] == "ossec" {
		geoip {
			source => "srcip"
			target => "geoip"
			database => "/opt/logstash/vendor/geoip/GeoLiteCity.dat"
			add_field => ["[geoip][location]", "%{[geoip][longitude]}"]
			add_field => ["[geoip][location]", "%{[geoip][latitude]}"]
		}
		date {
			match => ["timestamp", "YYYY MMM dd HH:mm:ss"]
			target => "@timestamp"
		}
		mutate {
			convert => ["[geoip][location]", "float"]
			rename => ["hostname", "AgentName"]
			rename => ["geoip", "GeoLocation"]
			rename => ["file", "AlertsFile"]
			rename => ["agentip", "AgentIP"]
			rename => ["[rule][comment]", "[rule][description]"]
			rename => ["[rule][level]", "[rule][AlertLevel]"]
			remove_field => ["timestamp"]
		}
	}
}

output {
	 # stdout {
		codec => rubydebug
	}
	elasticsearch {
		hosts => ["bcksrv16:9200"]
		index => "ossec-%{+YYYY.MM.dd}"
		document_type => "ossec"
		template => "/etc/logstash/elastic-ossec-template.json"
		template_name => "ossec"
		template_overwrite => true
	}

}

and for syslog:

input {
	udp {
		port => 5002
		type => syslog
	}
}

filter {
	if [type] == "syslog" {
		mutate {
			add_tag => ["ids"]
		}
		grok {
			break_on_match => false
			match => {
				"message" => [" \[SNORTIDS\[ALERT\]: \[%{DATA:sensor}\] \] \|\| %{TIMESTAMP_ISO8601:eventtime} %{INT:sev} \[%{INT:gid}:%{INT:sid}:%{INT:srev}\] %{GREEDYDATA:signature} \|\| %{DATA:classtype} \|\| %{INT:proto} %{IPV4:srcip} %{IPV4:dstip} \|\| %{INT:srcport} %{INT:dstport} \|\|", " \[SNORTIDS\[LOG\]: \[%{DATA:sensor}\] \] \|\| %{TIMESTAMP_ISO8601:eventtime} %{INT:sev} \[%{INT:gid}:%{INT:sid}:%{INT:srev}\] %{GREEDYDATA:signature} \|\| %{DATA:classtype} \|\| %{INT:proto} %{IPV4:srcip} %{IPV4:dstip} %{GREEDYDATA} \|\| %{DATA} %{GREEDYDATA:payload} \|\|", " \[SNORTIDS\[LOG\]: \[%{DATA:sensor}\] \] \|\| %{TIMESTAMP_ISO8601:eventtime} %{INT:sev} \[%{INT:gid}:%{INT:sid}:%{INT:srev}\] %{GREEDYDATA:signature} \|\| %{DATA:classtype} \|\| %{INT:proto} %{IPV4:srcip} %{IPV4:dstip} %{GREEDYDATA:ipdata} \|\| %{GREEDYDATA:tcpdata} \|\| %{DATA} %{GREEDYDATA:payload} \|\|"]
			}
			add_field => ["received_at", "%{@timestamp}"]
			 # remove_field => ["host"]
			add_field => ["received_from", "%{host}"]
		}

		geoip {
			database => "/opt/logstash/vendor/geoip/GeoLiteCity.dat"
			source => "srcip"
			target => "srcgeo"
		}

		geoip {
			database => "/opt/logstash/vendor/geoip/GeoLiteCity.dat"
			source => "dstip"
			target => "dstgeo"
		}

		syslog_pri {}

		if [payload]{
			mutate {
				add_field => ["payload_decoded", ""]
			}

			ruby {
				code => 'event["payload_decoded"] = event["payload"].pack("H*")'
			}
		}
	}
}

output {
	if [type] == "syslog" {
		elasticsearch {
			hosts => bcksrv16
			index => "logstash-%{+YYYY.MM.dd}"
		}
	}
}

But i keep getting events like these in the ossec indeces:

{
    	"_index" : "ossec-2016.10.12",
    	"_type" : "ossec",
    	"_id" : "AVe4LO5KJsgfXD3umEAy",
    	"_score" : null,
    	"_source" : {
    		"message" : "some message",
    		"@version" : "1",
    		"@timestamp" : "2016-10-12T09:15:32.000Z",
    		"type" : "syslog",
    		"tags" : [
    			"ids",
    			"_grokparsefailure"
    		],
    		"syslog_severity_code" : 5,
    		"syslog_facility_code" : 1,
    		"syslog_facility" : "user-level",
    		"syslog_severity" : "notice",
    		"syslog_timestamp" : "Oct 12 11:15:32",
    		"syslog_hostname" : "host1",
    		"syslog_program" : "CRON",
    		"syslog_pid" : "25704",
    		"syslog_message" : "some message",
    		"received_at" : "2016-10-12T09:15:34.519Z"
    	},
    	"fields" : {
    		"received_at" : [
    			1476263734519
    		],
    		"@timestamp" : [
    			1476263732000
    		]
    	},
    	"sort" : [
    		1476263732000
    	]
    }

In the config file for the syslog events I've tried the field name as [source][type] and [_source][type] but both made everything go to the ossec index. So it has to be [type] == "syslog" but i don't understand why these won't go into the syslog filter. I have more examples but i can only have 5000 characters in the reply.

Any idea?

Your elasticsearch output for ossec isn't wrapped in a conditional so it'll still get all events.

1 Like

I added it and you where completely right, i don't know why i forgot that. The events now should go to either one or the other right? there is no way that some events would be dropped?

We only sent from syslog and ossec.

The events now should go to either one or the other right?

Yes, if type is either syslog or ossec.

As far as i know, the only way to check it is to have a third index which gets filled with events that are not of the type syslog and ossec, right?

That, or you could save those events to a local file.

also a good idea, will look into it later. Now i have everything working like i want.

Hi,

I'm not sure if i can continue here or if i should create a new topic.
But the syslog events go into their indices but the ossec logs don't get into the ossec indices anymore and i have no clue why because when i make a new config where i either output the incoming data to stdout i get the data and if i use stdin to copy an event on the command line it also works.

At this moment the config is this:

input {
  lumberjack {
    port => 5003
    type => "lumberjack"
    ssl_certificate => "/etc/logstash/logstash-forwarder.crt"
    ssl_key => "/etc/logstash/logstash-forwarder.key"
    codec => json
    tags => ["ossec"]
  }
}

filter {
   if "ossec" in [tags] {
   geoip {
      source => "srcip"
      target => "geoip"
      database => "/opt/logstash/vendor/geoip/GeoLiteCity.dat"
      add_field => [ "[geoip][location]", "%{[geoip][longitude]}" ]
      add_field => [ "[geoip][location]", "%{[geoip][latitude]}"  ]
    }
    date {
        match => ["timestamp", "YYYY MMM dd HH:mm:ss"]
        target => "@timestamp"
    }
    mutate {
      convert => [ "[geoip][location]", "float"]
      rename => [ "hostname", "AgentName" ]
      rename => [ "geoip", "GeoLocation" ]
      rename => [ "file", "AlertsFile" ]
      rename => [ "agentip", "AgentIP" ]
      rename => [ "[rule][comment]", "[rule][description]" ]
      rename => [ "[rule][level]", "[rule][AlertLevel]" ]
      remove_field => [ "timestamp" ]
    }
  }
}

output {
   if "ossec" in [tags] {
    elasticsearch {
         hosts => bcksrv16
         index => "ossec-%{+YYYY.MM.dd}"
         #document_type => "ossec"
         #template => "/etc/logstash/elastic-ossec-template.json"
         #template_name => "ossec"
         #template_overwrite => true
    }
  }
}

So i tried this:

input {
        stdin {
                codec => json
                tags => ["ossec"]
        }
}


filter {
   if "ossec" in [tags] {
   geoip {
      source => "srcip"
      target => "geoip"
      database => "/opt/logstash/vendor/geoip/GeoLiteCity.dat"
      add_field => [ "[geoip][location]", "%{[geoip][longitude]}" ]
      add_field => [ "[geoip][location]", "%{[geoip][latitude]}"  ]
    }
    date {
        match => ["timestamp", "YYYY MMM dd HH:mm:ss"]
        target => "@timestamp"
    }
    mutate {
      convert => [ "[geoip][location]", "float"]
      rename => [ "hostname", "AgentName" ]
      rename => [ "geoip", "GeoLocation" ]
      rename => [ "file", "AlertsFile" ]
      rename => [ "agentip", "AgentIP" ]
      rename => [ "[rule][comment]", "[rule][description]" ]
      rename => [ "[rule][level]", "[rule][AlertLevel]" ]
      remove_field => [ "timestamp" ]
    }
  }
}

output {
   if "ossec" in [tags] {
    elasticsearch {
         hosts => bcksrv16
         index => "ossec-%{+YYYY.MM.dd}"
         document_type => "ossec"
         template => "/etc/logstash/elastic-ossec-template.json"
         template_name => "ossec"
         template_overwrite => true
    }
  }
}

and:

input {
  lumberjack {
    port => 5003
    type => "lumberjack"
    ssl_certificate => "/etc/logstash/logstash-forwarder.crt"
    ssl_key => "/etc/logstash/logstash-forwarder.key"
    codec => json
    tags => ["ossec"]
  }
}

filter {
   if "ossec" in [tags] {
   geoip {
      source => "srcip"
      target => "geoip"
      database => "/opt/logstash/vendor/geoip/GeoLiteCity.dat"
      add_field => [ "[geoip][location]", "%{[geoip][longitude]}" ]
      add_field => [ "[geoip][location]", "%{[geoip][latitude]}"  ]
    }
    date {
        match => ["timestamp", "YYYY MMM dd HH:mm:ss"]
        target => "@timestamp"
    }
    mutate {
      convert => [ "[geoip][location]", "float"]
      rename => [ "hostname", "AgentName" ]
      rename => [ "geoip", "GeoLocation" ]
      rename => [ "file", "AlertsFile" ]
      rename => [ "agentip", "AgentIP" ]
      rename => [ "[rule][comment]", "[rule][description]" ]
      rename => [ "[rule][level]", "[rule][AlertLevel]" ]
      remove_field => [ "timestamp" ]
    }
  }
}
output {
   if "ossec" in [tags] {
   stdout { codec => rubydebug }
}

Both test cases do what you would expect. One files up my screen quit fast because of the amount of event and the other gets the event from the stdin and enters it into elasticsearch. So why does the automated one doesn't work?

1 Like