Can someone please help me with Geo filter on said logstash conf


(R) #1

Hi Guys,

I am not expert in logstash and I need help regarding indexing my opencanary data with geo tags. I already have below conf but I am sure it does not have geo tag.

Can someone please help me with appropriate config.

input {
file {
path => "/var/log/opencanary.log"
start_position => "beginning"
}
}

filter {
mutate {
add_tag => ["opencanary"]
}
json {
source => "message"
target => "parsedJson"
}
mutate {
add_field => {
"dst_host" => "%{[parsedJson][dst_host]}"
"dst_port" => "%{[parsedJson][dst_port]}"
"local_time" => "%{[parsedJson][local_time]}"
"logdata" => "%{[parsedJson][logdata]}"
"logtype" => "%{[parsedJson][logtype]}"
"node_id" => "%{[parsedJson][node_id]}"
"src_host" => "%{[parsedJson][src_host]}"
"src_port" => "%{[parsedJson][src_port]}"
}
remove_field => ["parsedJson","message"]
}
json {
source => "logdata"
target => "parsedlogdata"
}
mutate {
add_field => {
"SESSION" => "%{[parsedlogdata][SESSION]}"
}
remove_field => ["path","@version","parsedlogdata.SESSION"]
}

grok {

match => [ "local_time", "%{DATESTAMP:timestamp}" ]

}

    date {
            match => [ "local_time", "yyyy-MM-dd HH:mm:ss.SSSSSS" ]
            timezone => "Asia/Kolkata"
            remove_field => "timestamp"
    }

}

output {
elasticsearch {
manage_template => false
hosts => "192.168.5.15:9200"
index => "logstash-opencanary-%{+YYYY.MM.dd}"
}
stdout {
codec => rubydebug
}
}


(Magnus Bäck) #2

And what does the log you're attempting to parse look like?


(R) #3

Geo Map from source addresses and heat map according to source IP addresses. And here are the logs examples

{"dst_host": "172.31.43.40", "dst_port": 23, "honeycred": false, "local_time": "2017-07-24 10:59:15.939362", "logdata": {"PASSWORD": "xmhdipc", "USERNAME": "root"}, "lo
gtype": 6001, "node_id": "opencanary-1", "src_host": "91.193.69.190", "src_port": 54656}
{"dst_host": "172.31.43.40", "dst_port": 23, "honeycred": false, "local_time": "2017-07-24 10:59:46.990790", "logdata": {"PASSWORD": "1234", "USERNAME": "admin"}, "logt
ype": 6001, "node_id": "opencanary-1", "src_host": "91.193.69.190", "src_port": 55507}
{"dst_host": "172.31.43.40", "dst_port": 23, "honeycred": false, "local_time": "2017-07-24 11:00:17.880832", "logdata": {"PASSWORD": "support", "USERNAME": "support"},
"logtype": 6001, "node_id": "opencanary-1", "src_host": "91.193.69.190", "src_port": 56282}
{"dst_host": "172.31.43.40", "dst_port": 23, "honeycred": false, "local_time": "2017-07-24 11:00:48.976065", "logdata": {"PASSWORD": "12345", "USERNAME": "root"}, "logt
ype": 6001, "node_id": "opencanary-1", "src_host": "91.193.69.190", "src_port": 57009}
{"dst_host": "172.31.43.40", "dst_port": 23, "honeycred": false, "local_time": "2017-07-24 11:01:19.916128", "logdata": {"PASSWORD": "1001chin", "USERNAME": "root"}, "l
ogtype": 6001, "node_id": "opencanary-1", "src_host": "91.193.69.190", "src_port": 39636}
{"dst_host": "172.31.43.40", "dst_port": 23, "honeycred": false, "local_time": "2017-07-24 11:01:50.909242", "logdata": {"PASSWORD": "password", "USERNAME": "root"}, "l
ogtype": 6001, "node_id": "opencanary-1", "src_host": "91.193.69.190", "src_port": 40404}
{"dst_host": "172.31.43.40", "dst_port": 23, "honeycred": true, "local_time": "2017-07-24 11:02:21.933574", "logdata": {"PASSWORD": "password", "USERNAME": "admin"}, "l
ogtype": 6001, "node_id": "opencanary-1", "src_host": "91.193.69.190", "src_port": 41249}
{"dst_host": "172.31.43.40", "dst_port": 23, "honeycred": false, "local_time": "2017-07-24 11:02:52.982781", "logdata": {"PASSWORD": "54321", "USERNAME": "root"}, "logt
ype": 6001, "node_id": "opencanary-1", "src_host": "91.193.69.190", "src_port": 42182}
{"dst_host": "172.31.43.40", "dst_port": 23, "honeycred": false, "local_time": "2017-07-24 11:03:23.914468", "logdata": {"PASSWORD": "juantech", "USERNAME": "root"}, "l
ogtype": 6001, "node_id": "opencanary-1", "src_host": "91.193.69.190", "src_port": 42988}
{"dst_host": "172.31.43.40", "dst_port": 23, "honeycred": false, "local_time": "2017-07-24 11:03:54.957853", "logdata": {"PASSWORD": "admin", "USERNAME": "root"}, "logt
ype": 6001, "node_id": "opencanary-1", "src_host": "91.193.69.190", "src_port": 43776}
{"dst_host": "172.31.43.40", "dst_port": 23, "honeycred": false, "local_time": "2017-07-24 11:04:25.957863", "logdata": {"PASSWORD": "12345", "USERNAME": "admin"}, "log
type": 6001, "node_id": "opencanary-1", "src_host": "91.193.69.190", "src_port": 44499}
{"dst_host": "172.31.43.40", "dst_port": 23, "honeycred": false, "local_time": "2017-07-24 11:04:56.920228", "logdata": {"PASSWORD": "123456", "USERNAME": "admin"}, "lo
gtype": 6001, "node_id": "opencanary-1", "src_host": "91.193.69.190", "src_port": 45144}
{"dst_host": "172.31.43.40", "dst_port": 23, "honeycred": false, "local_time": "2017-07-24 11:05:27.886476", "logdata": {"PASSWORD": "1111", "USERNAME": "admin"}, "logt
ype": 6001, "node_id": "opencanary-1", "src_host": "91.193.69.190", "src_port": 46026}
{"dst_host": "172.31.43.40", "dst_port": 23, "honeycred": false, "local_time": "2017-07-24 11:05:58.885839", "logdata": {"PASSWORD": "Zte521", "USERNAME": "root"}, "log
type": 6001, "node_id": "opencanary-1", "src_host": "91.193.69.190", "src_port": 46970}
{"dst_host": "172.31.43.40", "dst_port": 23, "honeycred": false, "local_time": "2017-07-24 11:06:29.984932", "logdata": {"PASSWORD": "user", "USERNAME": "user"}, "logty
pe": 6001, "node_id": "opencanary-1", "src_host": "91.193.69.190", "src_port": 59809}
{"dst_host": "172.31.43.40", "dst_port": 23, "honeycred": false, "local_time": "2017-07-24 11:07:00.923142", "logdata": {"PASSWORD": "guest", "USERNAME": "guest"}, "log
type": 6001, "node_id": "opencanary-1", "src_host": "91.193.69.190", "src_port": 60809}
{"dst_host": "172.31.43.40", "dst_port": 23, "honeycred": false, "local_time": "2017-07-24 11:07:31.926856", "logdata": {"PASSWORD": "zlxx.", "USERNAME": "root"}, "logt


(Magnus Bäck) #4

Have you tried adding a geoip filter to look up one of the IP address fields?


(R) #5

Would you please give me example? Though I am going though the logs


(R) #6

Ok - I tried with below setup is that OK

    geoip {
            source => "src_host"
            add_field => [ "[geoip][coordinates]", "%{[geoip][longitude]}" ]
            add_field => [ "[geoip][coordinates]", "%{[geoip][latitude]}"  ]

            }

}

filter {
mutate {
add_tag => ["opencanary"]
}
json {
source => "message"
target => "parsedJson"
}
mutate {
add_field => {
"dst_host" => "%{[parsedJson][dst_host]}"
"dst_port" => "%{[parsedJson][dst_port]}"
"local_time" => "%{[parsedJson][local_time]}"
"logdata" => "%{[parsedJson][logdata]}"
"logtype" => "%{[parsedJson][logtype]}"
"node_id" => "%{[parsedJson][node_id]}"
"src_host" => "%{[parsedJson][src_host]}"
"src_port" => "%{[parsedJson][src_port]}"
}
remove_field => ["parsedJson","message"]

But my query is how do I pass index which is already done? DO I need to delete that from kibana and index it again?


(Magnus Bäck) #7

Why are you storing the lon/lat into [geoip][coordinates]? Logstash's default index template for ES maps the [geoip][location] field as geo_point.

It's not clear where you've placed your geoip filter. Presumably after the mutate filter as the src_host field won't exist otherwise.


(R) #8

OK I changed my filter but still Geoip is not getting indexed..here is complete config file

input {
file {
path => "/var/log/opencanary.log"
start_position => "beginning"
}
}

filter {
mutate {
add_tag => ["opencanary"]
}
json {
source => "message"
target => "parsedJson"
}
mutate {
add_field => {
"dst_host" => "%{[parsedJson][dst_host]}"
"dst_port" => "%{[parsedJson][dst_port]}"
"local_time" => "%{[parsedJson][local_time]}"
"logdata" => "%{[parsedJson][logdata]}"
"logtype" => "%{[parsedJson][logtype]}"
"node_id" => "%{[parsedJson][node_id]}"
"src_host" => "%{[parsedJson][src_host]}"
"src_port" => "%{[parsedJson][src_port]}"
}
remove_field => ["parsedJson","message"]
}

    geoip {
            source => "src_host"

            }

    json {
            source => "logdata"
            target => "parsedlogdata"
    }
    mutate {
            add_field => {
                    "SESSION" => "%{[parsedlogdata][SESSION]}"
            }
            remove_field => ["path","@version","parsedlogdata.SESSION"]
    }

grok {

match => [ "local_time", "%{DATESTAMP:timestamp}" ]

}

    date {
            match => [ "local_time", "yyyy-MM-dd HH:mm:ss.SSSSSS" ]
            timezone => "Asia/Kolkata"
            remove_field => "timestamp"
    }

}

output {
elasticsearch {
manage_template => false
hosts => "192.168.5.15:9200"
index => "logstash-opencanary-%{+YYYY.MM.dd}"
}
stdout {
codec => rubydebug
}
}

Not sure if the index already exists is that the reason geoip is not being reflected? In that case do I need delete the index and recreate one or refresh it?

I tried refreshing couple of time but no luck.


(Magnus Bäck) #9
  • What does an example event look like? Show the output from your stdout output or copy/paste from Kibana's JSON tab.
  • What do the mappings of your index look like? Use ES's get mapping API.

(system) #10

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.