Hi,
I have a problem with the conf file of logstash it doesn't use the grok patterns I've passed. I'm trying to get the IP address so geoip can show me where it came from. It's working with apache but it doesn't with these logs. can anyone help me please?
This is a line of my firewall logs
Apr 9 03:31:08 fw-ba kernel: RULE 29 -- DENY IN=eth2.178 OUT= MAC=01:00:5e:00:00:01:c8:0e:14:f7:32:64:08:00 SRC=192.168.178.1 DST=224.0.0.1 LEN=36 TOS=0x00 PREC=0xC0 TTL=1 ID=0 DF PROTO=2
And I'm using this conf file in /etc/logstash/conf.d/14-firewallba.conf
filter {
#if [type] == "firewallba" {
if [path] == "/var/log/ba/firewalllogs" {
grok {
match => { "message" => "%{SYSLOGTIMESTAMP:syslog_timestamp} %{SYSLOGHOST:syslog_hostname}.* SRC=%{IP:srcip} DST=%{IP:dstip}"}
}
geoip {
source => "srcip"
target => "geoip"
database => "/etc/logstash/GeoLiteCity.dat"
add_field => [ "[geoip][location]", "%{[geoip][longitude]}" ]
add_field => [ "[geoip][location]", "%{[geoip][latitude]}" ]
}
}
}
I have tried type and path but none of the both works. The grok patterns were testen on https://grokdebug.herokuapp.com/
So I don't think the problem is there but also got no idea where it is.
The standard advice is to start small and incrementally add a section to the grok pattern while using the stdout output with codec => rubydebug
and the geoip filter section commented out.
BTW your second add field will overwrite the first one - did you intend that?
OTOH for logs that are very structured like this and you are using LS 5+, you could try the Dissect filter - blog article .
Thank for the reply @guyboertje
I've changed my conf file to
filter {
#if [type] == "firewallba" {
if [path] == "/var/log/ba/firewalllogs" {
grok {
match => { "message" => ".* SRC=%{IP:srcip} DST=%{IP:dstip}"}
}
#geoip {
# source => "srcip"
# target => "geoip"
# database => "/etc/logstash/GeoLiteCity.dat"
# add_field => [ "[geoip][location]", "%{[geoip][longitude]}" ]
# add_field => [ "[geoip][location]", "%{[geoip][latitude]}" ]
#}
}
}
also codec => rudydebug
is writed down on my output file but nothing has changed
the output I get from kibana is
@timestamp May 16th 2017, 15:07:03.540
t@version 1
t_id AVwRTFxUKtwQJIgJR1Qh
t_index filebeat-2017.05.16
#_score
t_type firewallba
tbeat.hostname centos
tbeat.name centos
#count 1
?fields -
thost centos
tinput_type log
tmessage May 7 03:47:22 fw-ba kernel: RULE 33 -- DENY IN=ppp0 OUT= MAC= SRC=140.205.81.53 DST=77.109.86.42 LEN=66 TOS=0x00 PREC=0x00 TTL=113 ID=10582 DF PROTO=UDP SPT=53 DPT=61188 LEN=46
#offset 41,378,036
tsource /var/log/ba/firewalllogs
tsyslog_hostname fw-ba
tsyslog_timestamp May 7 03:47:22
ttags beats_input_codec_plain_applied
ttype firewallba
I'm using LS 2.2, did try to update but it failed so I reverted it back
comment out your elasticsearch output and add
output {
stdout { codec => rubydebug
}
then look at the command line output.
First change your grok to:
filter {
#if [type] == "firewallba" {
if [path] == "/var/log/ba/firewalllogs" {
grok {
match => { "message" => "%{SYSLOGTIMESTAMP:syslog_timestamp} %{DATA:rest}"}
}
#geoip {
# source => "srcip"
# target => "geoip"
# database => "/etc/logstash/GeoLiteCity.dat"
# add_field => [ "[geoip][location]", "%{[geoip][longitude]}" ]
# add_field => [ "[geoip][location]", "%{[geoip][latitude]}" ]
#}
}
}
confirm that you see a field syslog_timestamp
with a value like Apr 9 03:31:08
and a field call rest
that contains the rest of the line.
then add the next piece to grok:
grok {
match => { "message" => "%{SYSLOGTIMESTAMP:syslog_timestamp} %{SYSLOGHOST:syslog_hostname} %{DATA:rest}"}
}
confirm that you still have the syslog_timestamp and now syslog_host
is fw-ba
then add the next piece to grok.
grok {
match => { "message" => "%{SYSLOGTIMESTAMP:syslog_timestamp} %{SYSLOGHOST:syslog_hostname} %{DATA:beforeSRC"}SRC=%{DATA:rest}"}
}
confirm that beforeSRC
is kernel: RULE 29 -- DENY IN=eth2.178 OUT= MAC=01:00:5e:00:00:01:c8:0e:14:f7:32:64:08:00
(space at the end) and rest
starts with the IP address.
then remove the beforeSRC (you don't want that saved to a field.
then add the src ip to grok:
grok {
match => { "message" => "%{SYSLOGTIMESTAMP:syslog_timestamp} %{SYSLOGHOST:syslog_hostname} %{DATA"}SRC=%{IP:srcip}%{DATA:rest}"}
}
and so on...
Once you can see that the event looks like you want uncomment the geoip
confirm that the geoip lookup enhancement worked.
continue until the event looks correct then remove the stdout output section and uncomment the elasticsearch section.
Good luck
pjanzen
(Paul Janzen)
May 17, 2017, 10:53am
5
if you change:
#if [type] == "firewallba" {
to
#if [t_type] == "firewallba" {
and remove the hash in front of it I think it will start working...
Well it seems obvious but in the console for the stdout output check what the type
or path
field actually is.
ttype
or type
or t_type
and tpath
or t_path
or path
@guyboertje and @pjanzen thanks for the replies
There is type
and _type
tried both of them but it didn't work, I also tried source
but that too didn't do anything.
Whenever I try to comment out everything but
output { stdout { codec => rubydebug }
in my output file filebeat somehow fails to start,
this is how it looks like now
output {
elasticsearch {
hosts => ["localhost:9200"]
sniffing => true
manage_template => false
index => "%{[@metadata][beat]}-%{+YYYY.MM.dd}"
document_type => "%{[@metadata][type]}"
}
#Later toegevoegd
#stdout { codes => rubydebug }
}
pjanzen
(Paul Janzen)
May 17, 2017, 11:46am
8
So based on the first message I created a small test.
The output I get is:
{
"path" => "/root/input.txt",
"srcip" => "192.168.178.1",
"@timestamp " => 2017-05-17T11:43:15.962Z,
"syslog_hostname" => "fw-ba",
"syslog_timestamp" => "Apr 9 03:31:08",
"@version " => "1",
"dstip" => "224.0.0.1",
"message" => "Apr 9 03:31:08 fw-ba kernel: RULE 29 -- DENY IN=eth2.178 OUT= MAC=01:00:5e:00:00:01:c8:0e:14:f7:32:64:08:00 SRC=192.168.178.1 DST=224.0.0.1 LEN=36 TOS=0x00 PREC=0xC0 TTL=1 ID=0 DF PROTO=2",
"type" => "firewallba"
}
The conf file I created is this one: (I remove the geoip stuff, this is not important for this test)
Notice I set the type on the input...
input {
file {
path => ['/root/input.txt']
start_position => 'beginning'
sincedb_path => '/dev/null'
type => 'firewallba'
}
}
filter {
if [type] == "firewallba" {
grok {
match => { "message" => "%{SYSLOGTIMESTAMP:syslog_timestamp} %{SYSLOGHOST:syslog_hostname}.* SRC=%{IP:srcip} DST=%{IP:dstip}"}
}
}
}
output {
stdout {
codec => rubydebug
}
}
Is codes
a typo in the message here or does the config have the typo?
@guyboertje
That was a typo in the output file, I fixed that part but there is no rest attribute
@pjanzen
When I try to add type in my input file filebeat fails to start , I also have a few types so maybe that's the reason
But the thing is that my apache conf
filter {
if [type] == "apache-access" {
grok {
match => { "message" => "%{COMBINEDAPACHELOG}" }
}
geoip {
source => "clientip"
target => "geoip"
database => "/etc/logstash/GeoLiteCity.dat"
add_field => [ "[geoip][coordinates]", "%{[geoip][longitude]}" ]
add_field => [ "[geoip][coordinates]", "%{[geoip][latitude]}" ]
}
}
}
And this does what is has to do
tha acces log is located in /var/log/httpd/access_log
now my firewallba conf doesn't it's almost the same
filter {
if [type] == "firewallba" {
#if [source] == "/var/log/ba/firewalllogs" {
grok {
match => { "message" => "%{SYSLOGTIMESTAMP:syslog_timestamp} %{DATA:rest}"}
}
#geoip {
# source => "srcip"
# target => "geoip"
# database => "/etc/logstash/GeoLiteCity.dat"
# add_field => [ "[geoip][location]", "%{[geoip][longitude]}" ]
# add_field => [ "[geoip][location]", "%{[geoip][latitude]}" ]
#}
}
}
This one is located in /var/log/ba/firewalllogs
I also get logs from /var/log/messages/secure/mail/...
pjanzen
(Paul Janzen)
May 17, 2017, 12:10pm
11
Can you share your input config as well? It seems to go wrong there.
hi @pjanzen This is my 02-beats-input.conf file the 5140 port is for pfSense and the 5044 is for beats
input {
beats {
port => 5044
ssl => true
ssl_certificate => "/etc/pki/tls/certs/logstash-forwarder.crt"
ssl_key => "/etc/pki/tls/private/logstash-forwarder.key"
#type =>"firewallba"
#path => "[/var/log/*]"
}
}
#tcp syslog stream via 5140
input {
tcp {
type => "syslog"
port => 5140
}
}
#udp syslogs tream via 5140
input {
udp {
type => "syslog"
port => 5140
}
}
pjanzen
(Paul Janzen)
May 17, 2017, 12:16pm
13
On which input are you receiving the firewallba logs? if that is beats you can un-hash the type and then it should work not?
@pjanzen On the beats but if I un-hash that filebeat says connection refused that's why I un-hashed it again
pjanzen
(Paul Janzen)
May 17, 2017, 12:24pm
15
That is strange according to the docs you can set type
https://www.elastic.co/guide/en/logstash/current/plugins-inputs-beats.html#plugins-inputs-beats-type
What you can do is this:
if [message] =~ /fw-ba/ {
do you grok stuff
}
then you're not depended on the type anymore.
Other wise I am out of options..
That didn't work either but thanks for the help guys @pjanzen @guyboertje
pjanzen
(Paul Janzen)
May 17, 2017, 12:51pm
17
Thats is strange...
Input file contains:
Apr 9 03:31:08 fw-ba kernel: RULE 29 -- DENY IN=eth2.178 OUT= MAC=01:00:5e:00:00:01:c8:0e:14:f7:32:64:08:00 SRC=192.168.178.1 DST=224.0.0.1 LEN=36 TOS=0x00 PREC=0xC0 TTL=1 ID=0 DF PROTO=2
and my filter looks like this
input {
file {
path => ['/root/input.txt']
start_position => 'beginning'
sincedb_path => '/dev/null'
type => 'firewallba'
}
}
filter {
if [message] =~ /fw-ba/ {
grok {
match => { "message" => "%{SYSLOGTIMESTAMP:syslog_timestamp} %{SYSLOGHOST:syslog_hostname}.* SRC=%{IP:srcip} DST=%{IP:dstip}"}
}
}
}
output {
stdout {
codec => rubydebug
}
}
and the looks like this.
{
"path" => "/root/input.txt",
"srcip" => "192.168.178.1",
"@timestamp" => 2017-05-17T12:48:32.464Z,
"syslog_hostname" => "fw-ba",
"syslog_timestamp" => "Apr 9 03:31:08",
"@version" => "1",
"host" => "esearchr1",
"dstip" => "224.0.0.1",
"message" => "Apr 9 03:31:08 fw-ba kernel: RULE 29 -- DENY IN=eth2.178 OUT= MAC=01:00:5e:00:00:01:c8:0e:14:f7:32:64:08:00 SRC=192.168.178.1 DST=224.0.0.1 LEN=36 TOS=0x00 PREC=0xC0 TTL=1 ID=0 DF PROTO=2",
"type" => "firewallba"
}
system
(system)
Closed
June 14, 2017, 12:51pm
18
This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.