Can someone please help me with Geo filter on said logstash conf

Hi Guys,

I am not expert in logstash and I need help regarding indexing my opencanary data with geo tags. I already have below conf but I am sure it does not have geo tag.

Can someone please help me with appropriate config.

input {
file {
path => "/var/log/opencanary.log"
start_position => "beginning"
}
}

filter {
mutate {
add_tag => ["opencanary"]
}
json {
source => "message"
target => "parsedJson"
}
mutate {
add_field => {
"dst_host" => "%{[parsedJson][dst_host]}"
"dst_port" => "%{[parsedJson][dst_port]}"
"local_time" => "%{[parsedJson][local_time]}"
"logdata" => "%{[parsedJson][logdata]}"
"logtype" => "%{[parsedJson][logtype]}"
"node_id" => "%{[parsedJson][node_id]}"
"src_host" => "%{[parsedJson][src_host]}"
"src_port" => "%{[parsedJson][src_port]}"
}
remove_field => ["parsedJson","message"]
}
json {
source => "logdata"
target => "parsedlogdata"
}
mutate {
add_field => {
"SESSION" => "%{[parsedlogdata][SESSION]}"
}
remove_field => ["path","@version","parsedlogdata.SESSION"]
}

grok {

match => [ "local_time", "%{DATESTAMP:timestamp}" ]

}

    date {
            match => [ "local_time", "yyyy-MM-dd HH:mm:ss.SSSSSS" ]
            timezone => "Asia/Kolkata"
            remove_field => "timestamp"
    }

}

output {
elasticsearch {
manage_template => false
hosts => "192.168.5.15:9200"
index => "logstash-opencanary-%{+YYYY.MM.dd}"
}
stdout {
codec => rubydebug
}
}

And what does the log you're attempting to parse look like?

Geo Map from source addresses and heat map according to source IP addresses. And here are the logs examples

{"dst_host": "172.31.43.40", "dst_port": 23, "honeycred": false, "local_time": "2017-07-24 10:59:15.939362", "logdata": {"PASSWORD": "xmhdipc", "USERNAME": "root"}, "lo
gtype": 6001, "node_id": "opencanary-1", "src_host": "91.193.69.190", "src_port": 54656}
{"dst_host": "172.31.43.40", "dst_port": 23, "honeycred": false, "local_time": "2017-07-24 10:59:46.990790", "logdata": {"PASSWORD": "1234", "USERNAME": "admin"}, "logt
ype": 6001, "node_id": "opencanary-1", "src_host": "91.193.69.190", "src_port": 55507}
{"dst_host": "172.31.43.40", "dst_port": 23, "honeycred": false, "local_time": "2017-07-24 11:00:17.880832", "logdata": {"PASSWORD": "support", "USERNAME": "support"},
"logtype": 6001, "node_id": "opencanary-1", "src_host": "91.193.69.190", "src_port": 56282}
{"dst_host": "172.31.43.40", "dst_port": 23, "honeycred": false, "local_time": "2017-07-24 11:00:48.976065", "logdata": {"PASSWORD": "12345", "USERNAME": "root"}, "logt
ype": 6001, "node_id": "opencanary-1", "src_host": "91.193.69.190", "src_port": 57009}
{"dst_host": "172.31.43.40", "dst_port": 23, "honeycred": false, "local_time": "2017-07-24 11:01:19.916128", "logdata": {"PASSWORD": "1001chin", "USERNAME": "root"}, "l
ogtype": 6001, "node_id": "opencanary-1", "src_host": "91.193.69.190", "src_port": 39636}
{"dst_host": "172.31.43.40", "dst_port": 23, "honeycred": false, "local_time": "2017-07-24 11:01:50.909242", "logdata": {"PASSWORD": "password", "USERNAME": "root"}, "l
ogtype": 6001, "node_id": "opencanary-1", "src_host": "91.193.69.190", "src_port": 40404}
{"dst_host": "172.31.43.40", "dst_port": 23, "honeycred": true, "local_time": "2017-07-24 11:02:21.933574", "logdata": {"PASSWORD": "password", "USERNAME": "admin"}, "l
ogtype": 6001, "node_id": "opencanary-1", "src_host": "91.193.69.190", "src_port": 41249}
{"dst_host": "172.31.43.40", "dst_port": 23, "honeycred": false, "local_time": "2017-07-24 11:02:52.982781", "logdata": {"PASSWORD": "54321", "USERNAME": "root"}, "logt
ype": 6001, "node_id": "opencanary-1", "src_host": "91.193.69.190", "src_port": 42182}
{"dst_host": "172.31.43.40", "dst_port": 23, "honeycred": false, "local_time": "2017-07-24 11:03:23.914468", "logdata": {"PASSWORD": "juantech", "USERNAME": "root"}, "l
ogtype": 6001, "node_id": "opencanary-1", "src_host": "91.193.69.190", "src_port": 42988}
{"dst_host": "172.31.43.40", "dst_port": 23, "honeycred": false, "local_time": "2017-07-24 11:03:54.957853", "logdata": {"PASSWORD": "admin", "USERNAME": "root"}, "logt
ype": 6001, "node_id": "opencanary-1", "src_host": "91.193.69.190", "src_port": 43776}
{"dst_host": "172.31.43.40", "dst_port": 23, "honeycred": false, "local_time": "2017-07-24 11:04:25.957863", "logdata": {"PASSWORD": "12345", "USERNAME": "admin"}, "log
type": 6001, "node_id": "opencanary-1", "src_host": "91.193.69.190", "src_port": 44499}
{"dst_host": "172.31.43.40", "dst_port": 23, "honeycred": false, "local_time": "2017-07-24 11:04:56.920228", "logdata": {"PASSWORD": "123456", "USERNAME": "admin"}, "lo
gtype": 6001, "node_id": "opencanary-1", "src_host": "91.193.69.190", "src_port": 45144}
{"dst_host": "172.31.43.40", "dst_port": 23, "honeycred": false, "local_time": "2017-07-24 11:05:27.886476", "logdata": {"PASSWORD": "1111", "USERNAME": "admin"}, "logt
ype": 6001, "node_id": "opencanary-1", "src_host": "91.193.69.190", "src_port": 46026}
{"dst_host": "172.31.43.40", "dst_port": 23, "honeycred": false, "local_time": "2017-07-24 11:05:58.885839", "logdata": {"PASSWORD": "Zte521", "USERNAME": "root"}, "log
type": 6001, "node_id": "opencanary-1", "src_host": "91.193.69.190", "src_port": 46970}
{"dst_host": "172.31.43.40", "dst_port": 23, "honeycred": false, "local_time": "2017-07-24 11:06:29.984932", "logdata": {"PASSWORD": "user", "USERNAME": "user"}, "logty
pe": 6001, "node_id": "opencanary-1", "src_host": "91.193.69.190", "src_port": 59809}
{"dst_host": "172.31.43.40", "dst_port": 23, "honeycred": false, "local_time": "2017-07-24 11:07:00.923142", "logdata": {"PASSWORD": "guest", "USERNAME": "guest"}, "log
type": 6001, "node_id": "opencanary-1", "src_host": "91.193.69.190", "src_port": 60809}
{"dst_host": "172.31.43.40", "dst_port": 23, "honeycred": false, "local_time": "2017-07-24 11:07:31.926856", "logdata": {"PASSWORD": "zlxx.", "USERNAME": "root"}, "logt

Have you tried adding a geoip filter to look up one of the IP address fields?

Would you please give me example? Though I am going though the logs

Ok - I tried with below setup is that OK

    geoip {
            source => "src_host"
            add_field => [ "[geoip][coordinates]", "%{[geoip][longitude]}" ]
            add_field => [ "[geoip][coordinates]", "%{[geoip][latitude]}"  ]

            }

}

filter {
mutate {
add_tag => ["opencanary"]
}
json {
source => "message"
target => "parsedJson"
}
mutate {
add_field => {
"dst_host" => "%{[parsedJson][dst_host]}"
"dst_port" => "%{[parsedJson][dst_port]}"
"local_time" => "%{[parsedJson][local_time]}"
"logdata" => "%{[parsedJson][logdata]}"
"logtype" => "%{[parsedJson][logtype]}"
"node_id" => "%{[parsedJson][node_id]}"
"src_host" => "%{[parsedJson][src_host]}"
"src_port" => "%{[parsedJson][src_port]}"
}
remove_field => ["parsedJson","message"]

But my query is how do I pass index which is already done? DO I need to delete that from kibana and index it again?

Why are you storing the lon/lat into [geoip][coordinates]? Logstash's default index template for ES maps the [geoip][location] field as geo_point.

It's not clear where you've placed your geoip filter. Presumably after the mutate filter as the src_host field won't exist otherwise.

OK I changed my filter but still Geoip is not getting indexed..here is complete config file

input {
file {
path => "/var/log/opencanary.log"
start_position => "beginning"
}
}

filter {
mutate {
add_tag => ["opencanary"]
}
json {
source => "message"
target => "parsedJson"
}
mutate {
add_field => {
"dst_host" => "%{[parsedJson][dst_host]}"
"dst_port" => "%{[parsedJson][dst_port]}"
"local_time" => "%{[parsedJson][local_time]}"
"logdata" => "%{[parsedJson][logdata]}"
"logtype" => "%{[parsedJson][logtype]}"
"node_id" => "%{[parsedJson][node_id]}"
"src_host" => "%{[parsedJson][src_host]}"
"src_port" => "%{[parsedJson][src_port]}"
}
remove_field => ["parsedJson","message"]
}

    geoip {
            source => "src_host"

            }

    json {
            source => "logdata"
            target => "parsedlogdata"
    }
    mutate {
            add_field => {
                    "SESSION" => "%{[parsedlogdata][SESSION]}"
            }
            remove_field => ["path","@version","parsedlogdata.SESSION"]
    }

grok {

match => [ "local_time", "%{DATESTAMP:timestamp}" ]

}

    date {
            match => [ "local_time", "yyyy-MM-dd HH:mm:ss.SSSSSS" ]
            timezone => "Asia/Kolkata"
            remove_field => "timestamp"
    }

}

output {
elasticsearch {
manage_template => false
hosts => "192.168.5.15:9200"
index => "logstash-opencanary-%{+YYYY.MM.dd}"
}
stdout {
codec => rubydebug
}
}

Not sure if the index already exists is that the reason geoip is not being reflected? In that case do I need delete the index and recreate one or refresh it?

I tried refreshing couple of time but no luck.

  • What does an example event look like? Show the output from your stdout output or copy/paste from Kibana's JSON tab.
  • What do the mappings of your index look like? Use ES's get mapping API.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.