Newbie and new log format: shoutcast streaming server


(Alejandro Olivan) #1

Hi forum...

I'm quite new to all this logstash / elasticsearch (kibana) world so please take it easy...

I have successfully created a centralized ELK setup to monitor our pfsense firewalls... and thereby, I have been asked about the possibilities of the creature.
Obviously my success has been due to the fact that i just studied and adapted current documentation about pfsense - ELK integration.

Now, since there's a lack of usage on ELK stack for streaming services, I have to figure out how to do it.
I have started with Shoutcast 1.9.8 "legacy" server log file in its w3c format.
I have copyed a little portion of a real logfile to my log server (where logstash and elasticsearch run) and I'm trying to make those log lines to be understood and ingested to elasticsearch...

I'm gonna post my job to the time and a shoutcast w3c log sample, just in case someone may help me...

Here's a log line sample (it is single lined plain text log file)
11.22.33.44 11.22.33.44 2015-05-11 11:27:56 /stream?title=Some%20stream%20tittle 200 MPEG%20OVERRIDE 11391937 704 129448 GET

I use a file fragmented setup... since this is what I have inherited by following howtos that lead me to successfully mix pfsense2.1 , 2.1 and suricata logs very nicelly :slight_smile:

So, Here is my logstash input setup fragment:
...
input {
file {
type => "STRtesting"
path => [ "/var/log/shoutcast/test_w3c.log" ]
}
}
...

Then I have to tag it in order to "separate2 from my current suricata/firewall stuff...
So I have a filter with something like this
....
filter {
if [type] == "STRtesting" {
mutate {
add_tag => "STRtesting"
}
}
}....

And finally my filter shoutcast file

filter {
if "STRtesting" in [tags] {
grok {
match => [ "message", "%{IP:src_ip} %{IP:src_dns} %{TIMESTAMP_ISO8601:date} %{NOTSPACE:stream} %{NUMBER:c_reply} %{NOTSPACE:user_agent} %{NUMBER:sc_bytes} %{NUMBER:x_duration} %{NUMBER:avgbandwidt} %{NOTSPACE:c_query}" ]
}

if [src_ip]  {
  geoip {
    source => "src_ip"
    target => "geoip"
    add_field => [ "[geoip][coordinates]", "%{[geoip][longitude]}" ]
    add_field => [ "[geoip][coordinates]", "%{[geoip][latitude]}"  ]
  }
  mutate {
    convert => [ "[geoip][coordinates]", "float" ]
  }
}

}

}

NOTE:
I have previously tested log entries against my grok pattern in grok debugger ... and it looked good, although it needs some clearing...
%{IP:src_ip} %{IP:src_dns} %{TIMESTAMP_ISO8601:date} %{NOTSPACE:stream} %{NUMBER:c_reply} %{NOTSPACE:user_agent} %{NUMBER:sc_bytes} %{NUMBER:x_duration} %{NUMBER:avgbandwidt} %{NOTSPACE:c_query}

Obviously... my question comes since I'm unable to find any of my sample log file entries in elasticsearch.

Could you please give me some orientation?
really anyona as ever used elk for monitoring streaming? icecast2 shoutcast are very common!

Thank you in advance for your patience...

Best regards!


(Alejandro Olivan) #2

I have tested my log file entries agains an stdin filter stdout debug running setup, as demonstrated at https://deviantony.wordpress.com ( great entry!!! :slight_smile: ).

I have polished my filter, but it seemed to work... I have this at the end...

filter {
if "ShoutcastTest" in [tags] {
grok {
match => [ "message", "%{IP:src_ip} %{IP:src_dns} %{TIMESTAMP_ISO8601:date} %{NOTSPACE:stream} %{NUMBER:c_reply} %{NOTSPACE:user_agent} %{NUMBER:sc_bytes} %{NUMBER:x_duration} %{NUMBER:avgbandwidt} %{NOTSPACE:c_query}" ]
}

mutate {
  gsub => [
	"stream", "\/stream\?title=", "",
  		"stream", "\%20", " ",
            "stream", "\%2D", "-",
            "stream", "\%3A", ":",
            "user_agent", "\%20", " ",
            "user_agent", "\%2D", "-",
            "user_agent", "\%3A", ":"
]
}



date {
  match => [ "timestamp", "ISO8601" ]
}


mutate {
  remove_field => [ "message" ]
  remove_tag => [ "_grokparsefailure" ]
}


if [src_ip]  {
  geoip {
    source => "src_ip"
    target => "geoip"
    database => "/opt/logstash/vendor/geoip/GeoLiteCity.dat"
    add_field => [ "[geoip][coordinates]", "%{[geoip][longitude]}" ]
    add_field => [ "[geoip][coordinates]", "%{[geoip][latitude]}"  ]
  }
  mutate {
    convert => [ "[geoip][coordinates]", "float" ]
  }
  if ![geoip.ip] {
    if [dest_ip]  {
      geoip {
        source => "dest_ip"
        target => "geoip"
        database => "/opt/logstash/vendor/geoip/GeoLiteCity.dat"
        add_field => [ "[geoip][coordinates]", "%{[geoip][longitude]}" ]
        add_field => [ "[geoip][coordinates]", "%{[geoip][latitude]}"  ]
      }	
      mutate {
        convert => [ "[geoip][coordinates]", "float" ]
      }
    }
  }
}

}
}

It seems to create a good looking stdout ....

{"@version":"1","@timestamp":"2015-05-11T14:40:24.016Z","type":"STRtesting","host":"graphs","tags":["STRtesting"],"src_ip":"11.22.33.44","src_dns":"11.22.33.44","date":"2015-05-11 11:29:04","stream":"Some stream tittle here","c_reply":"200","user_agent":"MPEG OVERRIDE","sc_bytes":"14522224","x_duration":"919","avgbandwidt":"126416","c_query":"GET","geoip":{"ip":"176.149.125.58","country_code2":"FR","country_code3":"FRA","country_name":"France","continent_code":"EU","latitude":46.0,"longitude":2.0,"timezone":"Europe/Paris","location":[2.0,46.0],"coordinates":[2.0,46.0]}}

Still, I'm unable to get it to work...


(Alejandro Olivan) #3

Got half the way!!!

The problem was my log file was not being readed...
Since I am testing, I have a test log file that i have to read FROM THE BEGINNING... :smile:

So... input has to be

file {
type => "STRtesting"
path => "/var/log/shoutcast/test_w3c.log"
start_position => "beginning"
sincedb_path => "/dev/null"
format => "plain"
}

And it injects the data into the database!
Now I have to figure out how to present it in a kibana/dashboard!!


(system) #4