Event Correlation \ populate one field with data from other field in elasticsearch

Hi,
i'm looking to do correlation searches in elastic (kibana 4.4 actually) is there a way to have a search like the following -
1 - find all ip addresses from a certain type (lets say from critical stack intel, via filebeat to logstash and elastic)
2 - find all outgoing connections with bro ids that feeds the elk (via filebeat to logstash and elastic)
i want to bring only ips that hosts were outgoing to in bro ids, and are also found in the feeds of critical stack intel, is it possible?

OR - is there a way to do that directly in Logstash?
like working with logstash plugin "elasticsearch" like in this link -
https://www.elastic.co/guide/en/logstash/current/plugins-filters-elasticsearch.html

as follows -
if [type] == "end" {
elasticsearch {
hosts => ["es-server"]
query => "type:start AND operation:%{[opid]}"
fields => ["@timestamp", "started"]
}

Thanks

Shai

I have a similar problem, would be happy to get some advice

You can't do this in KB, best to do it in LS during processing like you have done there.

Our Graph plugin will also be able to do some of this, it's part of our X-Pack.

1 Like

How is it possible in logstash?
can you give me an example or a link?

Thanks A lot

Like what you have in the OP, with the Elasticsearch filter.

yeah but when i insert the field part, it doesn't insert the data with the original event, only what i give to as static text and not the field as parameter.

Hi,

Here is my filter part where I query and try to do the match and insert to the original field:
elasticsearch {
hosts => ["localhost"]
query => "type:maltrail AND dst_ip:%{dst_ip}"
add_field => [ "reason" , "maltrail['reason']" ]
}

Hi Mark,

Could you give me an example for that ?
for some reason it's not working for me. I can only get the data as static (as in 'field name') to get into the field that I want, it does not pull the actual data into it.

Thanks

add_field is not the right option.

Here's the good configuration to do what you want :

elasticsearch {
         hosts => ["localhost"]
        query => "type:maltrail AND dst_ip:%{dst_ip}"
        fields => [ "reason" ]
 }

it will copy 'reason' field from elasticsearch result to your current logstash event.

Hi

After changing the configuration this is the error i'm getting in logstash log. here's the config and the error:

input {
beats {
port => 5044
ssl => false

ssl_certificate => "/etc/pki/tls/certs/logstash-forwarder.crt"
ssl_key => "/etc/pki/tls/private/logstash-forwarder.key"
}
}
filter {
if [type] == "conn" {
grok {
tag_on_failure => ["connlog_long_parse_fail"]
match => {"message" => [ ".?\s+.?\s+(?\d+.\d+.\d+.\d+)\s\S+\s(?\d+.\d+.\d+.\d+)"] }
}
elasticsearch {
hosts => ["localhost"]
query => "type:maltrail AND dst_ip:%{dst_ip}"
fields => [ "reason" ]
}
geoip {
source => "src_ip"
target => "geoip"
database => "/etc/logstash/GeoLiteCity.dat"
add_field => [ "[geoip][coordinates]", "%{[geoip][longitude]}" ]
add_field => [ "[geoip][coordinates]", "%{[geoip][latitude]}" ]
}
geoip {
source => "dst_ip"
target => "geoip"
database => "/etc/logstash/GeoLiteCity.dat"
add_field => [ "[geoip][coordinates]", "%{[geoip][longitude]}" ]
add_field => [ "[geoip][coordinates]", "%{[geoip][latitude]}" ]
}
mutate {
convert => [ "[geoip][coordinates]", "float"]
}
date {
match => [ "syslog_timestamp", "MMM d HH:mm:ss", "MMM dd HH:mm:ss" ]
}
}
}
output {
elasticsearch {
hosts => ["localhost:9200"]
sniffing => true
manage_template => false
index => "%{[@metadata][beat]}-%{+YYYY.MM.dd}"
document_type => "%{[@metadata][type]}"
}
}
output {
elasticsearch {
hosts => ["localhost:9200"]
sniffing => true
manage_template => false
index => "%{[@metadata][beat]}-%{+YYYY.MM.dd}"
document_type => "%{[@metadata][type]}"
}
}

there's the log-

{:timestamp=>"2016-07-18T09:10:18.083000+0300", :message=>"Failed to query elasticsearch for previous event", :index=>"", :query=>"type:maltrail AND dst_ip:172.16.1.10", :event=>#@metadata_accessors=#@store={"type"=>"conn", "beat"=>"filebeat"}, @lut={}>, @cancelled=false, @data={"message"=>"1468822199.963471\tC3gEHk1b9GOQXYX5Z3\t172.17.1.2\t45124\t172.16.1.10\t53\tudp\tdns\t0.026759\t0\t506\tSHR\tT\tT\t0\tCd\t0\t0\t1\t534\t(empty)\t-\t-\t#siftworkstation-eth0", "@version"=>"1", "@timestamp"=>"2016-07-18T06:10:17.432Z", "type"=>"conn", "source"=>"/nsm/bro/logs/current/conn.log", "count"=>1, "fields"=>nil, "beat"=>{"hostname"=>"siftworkstation", "name"=>"siftworkstation"}, "offset"=>504473, "input_type"=>"log", "host"=>"siftworkstation", "tags"=>["beats_input_codec_plain_applied"], "syslog_severity_code"=>5, "syslog_facility_code"=>1, "syslog_facility"=>"user-level", "syslog_severity"=>"notice", "src_ip"=>"172.17.1.2", "dst_ip"=>"172.16.1.10"}, @metadata={"type"=>"conn", "beat"=>"filebeat"}, @accessors=#@store={"message"=>"1468822199.963471\tC3gEHk1b9GOQXYX5Z3\t172.17.1.2\t45124\t172.16.1.10\t53\tudp\tdns\t0.026759\t0\t506\tSHR\tT\tT\t0\tCd\t0\t0\t1\t534\t(empty)\t-\t-\t#siftworkstation-eth0", "@version"=>"1", "@timestamp"=>"2016-07-18T06:10:17.432Z", "type"=>"conn", "source"=>"/nsm/bro/logs/current/conn.log", "count"=>1, "fields"=>nil, "beat"=>{"hostname"=>"siftworkstation", "name"=>"siftworkstation"}, "offset"=>504473, "input_type"=>"log", "host"=>"siftworkstation", "tags"=>["beats_input_codec_plain_applied"], "syslog_severity_code"=>5, "syslog_facility_code"=>1, "syslog_facility"=>"user-level", "syslog_severity"=>"notice", [{"message"=>"1468822199.963471\tC3gEHk1b9GOQXYX5Z3\t172.17.1.2\t45124\t172.16.1.10\t53\tudp\tdns\t0.026759\t0\t506\tSHR\tT\tT\t0\tCd\t0\t0\t1\t534\t(empty)\t-\t-\t#siftworkstation-eth0", "@version"=>"1", "@timestamp"=>"2016-07-18T06:10:17.432Z", "type"=>"conn", "source"=>"/nsm/bro/logs/current/conn.log", "count"=>1, "fields"=>nil, "beat"=>{"hostname"=>"siftworkstation", "name"=>"siftworkstation"}, "offset"=>504473, "input_type"=>"log", "host"=>"siftworkstation", "tags"=>["beats_input_codec_plain_applied"], "syslog_severity_code"=>5, "syslog_facility_code"=>1, "syslog_facility"=>"user-level", "syslog_severity"=>"notice", "src_ip"=>"172.17.1.2", "dst_ip"=>"172.16.1.10"}, "src_ip"]}>>, :error=>#, :level=>:warn}

[{"message"=>"1468822626.133305\tCv3pcx65PJ07Jdv25\t172.17.1.2\t37239\t172.16.1.10\t53\tudp\tdns\t0.027748\t0\t506\tSHR\tT\tT\t0\tCd\t0\t0\t1\t534\t(empty)\t-\t-\t#siftworkstation-eth0", "@version"=>"1", "@timestamp"=>"2016-07-18T06:17:17.440Z", "source"=>"/nsm/bro/logs/current/conn.log", "offset"=>505790, "input_type"=>"log", "count"=>1, "beat"=>{"hostname"=>"siftworkstation", "name"=>"siftworkstation"}, "type"=>"conn", "fields"=>nil, "host"=>"siftworkstation", "tags"=>["beats_input_codec_plain_applied"], "syslog_severity_code"=>5, "syslog_facility_code"=>1, "syslog_facility"=>"user-level", "syslog_severity"=>"notice", "src_ip"=>"172.17.1.2", "dst_ip"=>"172.16.1.10"}, "src_ip"]}>>, :error=>#start_with?' for nil:NilClass>, :level=>:warn}
{:timestamp=>"2016-07-18T09:17:20.595000+0300", :message=>"Failed to query elasticsearch for previous event", :index=>"", :query=>"type:maltrail AND dst_ip:172.16.1.10", :event=>#<LogStash::Event:0x4b653d18 @metadata_accessors=#<LogStash::Util::Accessors:0x614d158e @store={"type"=>"conn", "beat"=>"filebeat"}, @lut={}>, @cancelled=false, @data={"message"=>"1468822626.137517\tCJ06wGBTIOaRavwR2\t172.17.1.2\t50456\t172.16.1.10\t53\tudp\tdns\t0.035619\t0\t1008\tSHR\tT\tT\t0\tCd\t0\t0\t2\t1064\t(empty)\t-\t-\t#siftworkstation-eth0", "@version"=>"1", "@timestamp"=>"2016-07-18T06:17:17.440Z", "source"=>"/nsm/bro/logs/current/conn.log", "type"=>"conn", "input_type"=>"log", "count"=>1, "fields"=>nil, "beat"=>{"hostname"=>"siftworkstation", "name"=>"siftworkstation"}, "offset"=>505938, "host"=>"siftworkstation", "tags"=>["beats_input_codec_plain_applied"], "syslog_severity_code"=>5, "syslog_facility_code"=>1, "syslog_facility"=>"user-level", "syslog_severity"=>"notice", "src_ip"=>"172.17.1.2", "dst_ip"=>"172.16.1.10"}, @metadata={"type"=>"conn", "beat"=>"filebeat"}, @accessors=#<LogStash::Util::Accessors:0x5f71b8e1 @store={"message"=>"1468822626.137517\tCJ06wGBTIOaRavwR2\t172.17.1.2\t50456\t172.16.1.10\t53\tudp\tdns\t0.035619\t0\t1008\tSHR\tT\tT\t0\tCd\t0\t0\t2\t1064\t(empty)\t-\t-\t#siftworkstation-eth0", "@version"=>"1", "@timestamp"=>"2016-07-18T06:17:17.440Z", "source"=>"/nsm/bro/logs/current/conn.log", "type"=>"conn", "input_type"=>"log", "count"=>1, "fields"=>nil, "beat"=>{"hostname"=>"siftworkstation", "name"=>"siftworkstation"}, "offset"=>505938, "host"=>"siftworkstation", "tags"=>["beats_input_codec_plain_applied"], "syslog_severity_code"=>5, "syslog_facility_code"=>1, "syslog_facility"=>"user-level", "syslog_severity"=>"notice", "src_ip"=>"172.17.1.2", "dst_ip"=>"172.16.1.10"}, @lut={"@timestamp"=>[{"message"=>"1468822626.137517\tCJ06wGBTIOaRavwR2\t172.17.1.2\t50456\t172.16.1.10\t53\tudp\tdns\t0.035619\t0\t1008\tSHR\tT\tT\t0\tCd\t0\t0\t2\t1064\t(empty)\t-\t-\t#siftworkstation-eth0", "@version"=>"1", "@timestamp"=>"2016-07-18T06:17:17.440Z", "source"=>"/nsm/bro/logs/current/conn.log", "type"=>"conn", "input_type"=>"log", "count"=>1, "fields"=>nil, "beat"=>{"hostname"=>"siftworkstation", "name"=>"siftworkstation"}, "offset"=>505938, "host"=>"siftworkstation", "tags"=>["beats_input_codec_plain_applied"], "syslog_severity_code"=>5, "syslog_facility_code"=>1, "syslog_facility"=>"user-level", "syslog_severity"=>"notice", "src_ip"=>"172.17.1.2", "dst_ip"=>"172.16.1.10"}, "@timestamp"], "source"=>[{"message"=>"1468822626.137517\tCJ06wGBTIOaRavwR2\t172.17.1.2\t50456\t172.16.1.10\t53\tudp\tdns\t0.035619\t0\t1008\tSHR\tT\tT\t0\tCd\t0\t0\t2\t1064\t(empty)\t-\t-\t#siftworkstation-eth0", "@version"=>"1", "@timestamp"=>"2016-07-18T06:17:17.440Z", "source"=>"/nsm/bro/logs/current/conn.log", "type"=>"conn", "input_type"=>"log", "count"=>1, "fields"=>nil, "beat"=>{"hostname"=>"siftworkstation", "name"=>"siftworkstation"}, "offset"=>505938, "host"=>"siftworkstation", "tags"=>["beats_input_codec_plain_applied"], "syslog_severity_code"=>5, "syslog_facility_code"=>1, "syslog_facility"=>"user-level", "syslog_severity"=>"notice", "src_ip"=>"172.17.1.2", "dst_ip"=>"172.16.1.10"}, "source"], "type"=>[{"message"=>"1468822631.454374\tCfj4LIvtsNP39EI4\t172.17.1.2\t33237\t172.16.1.10\t53\tudp\tdns\t0.012831\t0\t1008\tSHR\tT\tT\t0\tCd\t0\t0\t2\t1064\t(empty)\t-\t-\t#siftworkstation-eth0", "@version"=>"1", "@timestamp"=>"2016-07-18T06:17:24.440Z", "fields"=>nil, "beat"=>{"hostname"=>"siftworkstation", "name"=>"siftworkstation"}, "offset"=>506088, "type"=>"conn", "input_type"=>"log", "source"=>"/nsm/bro/logs/current/conn.log", "count"=>1, "host"=>"siftworkstation", "tags"=>["beats_input_codec_plain_applied"], "syslog_severity_code"=>5, "syslog_facility_code"=>1, "syslog_facility"=>"user-level", "syslog_severity"=>"notice", "src_ip"=>"172.17.1.2", "dst_ip"=>"172.16.1.10"}, "src_ip"]}>>, :error=>#<NoMethodError: undefined methodstart_with?' for nil:NilClass>, :level=>:warn}

Sorry, the configuration I suggested was wrong.
Here's the right configuration that copy 'reason' ES field to 'reason' logstash field.

elasticsearch {
         hosts => ["localhost"]
        query => "type:maltrail AND dst_ip:%{dst_ip}"
        fields => { "reason" => "reason" }
 }
1 Like

Hi fbaligand,

Thanks alot for your reply.
Actually, It's a an official documentation mistake. in the logstash filter link above (Elasticsearch filter plugin | Logstash Reference [8.11] | Elastic )
it's written in a Wrong syntax.
I'll post it up on github as well as here as wrong syntax.
I've been on this for more than a week with different syntaxes.

Your works great !

Actually, sample present in elasticsearch filter documentation is not wrong :

["@timestamp", "started"] and {"@timestamp" => "started"} are equivalent.

But I agree that ["@timestamp", "started"] is more a trick and is not obvious for understanding.

Well, for some reason ["@timestamp", "started"] doesn't work for me at all, and brings only a static field name (reason, in my case). But this {"@timestamp" => "started"} works great.

I think You should definitely add this as the main syntax in the elasticsearch filter documentation.

Shai

Ok.
I don't commit on this plugin. But feel free to open an issue on GitHub.

Anyway, Thanks a lot for your reply.

You're welcome :slight_smile:

Happy to see I help you !