Machine learning and firewall logs

machine-learning

#1

hi

is there a "how to" for applying ML to firewall logs etc. somewhere,.

I checked the link in ML 5.4 posting and it seams it should be straight forward. however I might need more data - timewise ie firewall logs from 24 hours or more for it to work or am i missing something obvious,

I'm trying to get a siem like functionality by pouring sysmon and firwall log data into ELK.


(Robert Cowart) #2

I recently posted an article on Linkedin where I share my first impressions of ML. The data sources were BlackRidge security appliances which generate logs very similar to firewalls. That might provide you with some ideas.

A first look at Elastic's new Machine Learning Technology

Rob


#3

hi robert,

i did see your article and that is what got me thinking i could use it with the FW logs as the data collected is simular in nature.

i tried replicating your formula with the avaialbe firewall index' fields to something like this

high_distinct_count(dst_mapped_port) over src_mapped_ip partition_field_name=dst_port

thw way i see it this could put a real dent in the SIEM market, especially if i get i running with sysmon data as well :wink:
however i get an error

 2017-05-19 13:58:25 hnvKTK9 Job created
 2017-05-19 13:58:26 hnvKTK9 Opening job on node [{hnvKTK9}{hnvKTK9cRm6YfB11hs_Itg}{iBFBjmOwQO-hUOKECBTbKg}{127.0.0.1}{127.0.0.1:9300}{ml.enabled=true}]
 2017-05-19 13:58:26 hnvKTK9 Loading model snapshot [N/A], job latest_record_timestamp [N/A]
 2017-05-19 13:58:35 hnvKTK9 Starting datafeed [datafeed-portscan] on node [{hnvKTK9}{hnvKTK9cRm6YfB11hs_Itg}{iBFBjmOwQO-hUOKECBTbKg}{127.0.0.1}{127.0.0.1:9300}{ml.enabled=true}]
 2017-05-19 13:58:35 hnvKTK9 Datafeed started (from: 1970-01-01T00:00:00.000Z to: 2017-05-19T11:58:30.001Z)
 2017-05-19 13:58:37 hnvKTK9 Datafeed is encountering errors extracting data: all shards failed
 2017-05-19 13:58:37 hnvKTK9 Datafeed stopped
 2017-05-19 13:58:37 hnvKTK9 Job is closing


(Sophie Chang) #4

If you are having a problem with the datafeed, it might be a misconfiguration. To check what is being returned you can run:

GET _xpack/ml/datafeeds/<datafeed_id>/_preview

(Robert Cowart) #5

@ssi, Cool that you read my article. I am honored that a few people were inspired to try out ML because of me. :blush:

I am guessing based on the field names that this is an Cisco ASA firewall, but maybe other grok patterns use these field names as well. Let's break down your detector...

high_distinct_count(dst_mapped_port)
over src_mapped_ip
partition_field_name=dst_port

The first statement high_distinct_count(dst_mapped_port) means that you want investigate an abnormally high number of unique destination mapped ports. So far so good.

The next part over src_mapped_ip means that anomalies will be determined per source mapped IP as compared to the behaviour of other source mapped IPs. In other words... tell us when a particular source is doing something significantly different than the other sources. This is good too.

At this point we would be evaluating dst_mapped_port for all destination addresses together. What we really want is to evaluate each destination address independently. Afterall, a port scan is specific to a single destination address. To achieve this we use the partition_field_name setting to split the data into multiple time series which are evaluated independently. This is where I think you may have a problem. Maybe not the thing which is causing your error, but still a logic mistake. You have specified dst_port and what you actually need is a destination IP address. Looking at the grok pattern for ASA devices I suspect the correct field would be dst_mapped_ip.

Make sure you remember to indicate both src_mapped_ip and dst_mapped_ip as Influencers.

Hopefully that helps.

Rob


#6

thank you both! i'll give i whirl

from the console i get this when i enter that request, something is def wrong

{
"error": {
"root_cause": [
{
"type": "resource_not_found_exception",
"reason": "No datafeed with id [<datafeed_id>] exists"
}
],
"type": "resource_not_found_exception",
"reason": "No datafeed with id [<datafeed_id>] exists"
},
"status": 404
}

the error output is this, i can see the .keyword in the index under management - indices for cisco asa, but i cant see them when i configure detectors

 2017-05-20 07:26:11 hnvKTK9 Job created
 2017-05-20 07:26:11 hnvKTK9 Opening job on node [{hnvKTK9}{hnvKTK9cRm6YfB11hs_Itg}{iBFBjmOwQO-hUOKECBTbKg}{127.0.0.1}{127.0.0.1:9300}{ml.enabled=true}]
 2017-05-20 07:26:11 hnvKTK9 Loading model snapshot [N/A], job latest_record_timestamp [N/A]
 2017-05-20 07:26:42 hnvKTK9 Starting datafeed [datafeed-portscan4] on node [{hnvKTK9}{hnvKTK9cRm6YfB11hs_Itg}{iBFBjmOwQO-hUOKECBTbKg}{127.0.0.1}{127.0.0.1:9300}{ml.enabled=true}]
 2017-05-20 07:26:42 hnvKTK9 Datafeed started (from: 2017-05-18T22:00:00.000Z to: 2017-05-20T05:26:17.001Z)
 2017-05-20 07:26:46 hnvKTK9 Datafeed is encountering errors extracting data: [portscan4] Search request returned shard failures; first failure: shard [[hnvKTK9cRm6YfB11hs_Itg][cisco-asa-2017.05.19][0]], reason [RemoteTransportException[[hnvKTK9][127.0.0.1:9300][indices:data/read/search[phase/fetch/id]]]; nested: IllegalArgumentException[Fielddata is disabled on text fields by default. Set fielddata=true on [dst_mapped_ip] in order to load fielddata in memory by uninverting the inverted index. Note that this can however use significant memory. Alternatively use a keyword field instead.]; ]; see logs for more info
 2017-05-20 07:26:46 hnvKTK9 Datafeed stopped
 2017-05-20 07:26:46 hnvKTK9 Job is closing


#7

I took the non advanced wizard, and got i somewhat working i think, as the .keyword is available in the dropdown menus. I could not figure out how to follow the advise in the error message above. also i would like to limit the influcers to internal ip adresses only - should be possible right?

but check it out, it correctly detected qualys our external vulnerability scanner doing its thing aroung 1:30 in the morning. definetly on the right track here.


(Robert Cowart) #8

Nice results! Now that you have this working, you can use this job as the basis to try some other things. In Job Manager you can copy the job (one of the little buttons on the right next to the job) , edit the various settings, and save it as a new job.

Regarding the keyword fields, my index templates are much more specific to the data I have coming in. For example, you probably want those IP addresses to be handled in Elasticsearch as IPs and not strings, and this would be done in the index template. This can be useful because Elasticsearch supports queries based on CIDR notation, which would help you to limit the ML jobs to your internal IPs. Such a query would like this...

GET my_index/_search
{
  "query": {
    "term": {
      "ip_addr": "192.168.0.0/16"
    }
  }
}

A little more info is here.

In your ML job it would look something like this...

It looks like you are making some progress, so keep up the good work. We all have a lot more to learn about ML and the ways we can leverage it. Thanks for sharing!

Rob


#9

hi again,

i tried to replicate what you have done, but i think the field type that is used for the ip address data i wrong. is there a way to change this on the fly? / add the field type. from my understanding your example needs the field type to be ip address related.

i can't get it to return data if i query a cidr range as above

GET cisco-asa-2017.05.21/_search
{
"query": {
"term": {
"src_ip": "172.0.0.0/16"
}
}
}

result

{
"took": 0,
"timed_out": false,
"_shards": {
"total": 5,
"successful": 4,
"failed": 0
},
"hits": {
"total": 0,
"max_score": null,
"hits": []
}
}

but if i change it to a specific ip

GET 2017.05.21/_search
{
"query": {
"term": {
"src_ip": "xxx.xxx.xxx.xxx"
}
}
}

{
"took": 1,
"timed_out": false,
"_shards": {
"total": 5,
"successful": 4,
"failed": 0
},
"hits": {
"total": 1887,
"max_score": 4.195242,
"hits": [
{
"_index": "cisco-asa-2017.05.21",
"_type": "cisco-asa",
"_id": "AVwoTsO86oBl3j97sYRj",
"_score": 4.195242,
"_source": {
"xlate_type": "dynamic",
"src_interface": "any",
"syslog_severity_code": 5,
"syslog_facility": "user-level",
"syslog_facility_code": 1,
"message": "<174>%ASA-6-305011: Built dynamic UDP translation from any:xxx.xxx.xxx.xxx/65121 to OUTSIDE:185.30.100.4/65121\n",
"type": "cisco-asa",
"src_xlated_ip": "xxx.xxx.xxx.xxx",
"syslog_severity": "notice",
"tags": [
"_grokparsefailure"
],
"src_ip": "xxx.xxx.xxx.xxx",
"src_port": "65121",
"protocol": "UDP",
"@timestamp": "2017-05-21T00:01:09.382Z",
"@version": "1",
"host": "xxx.xxx.xxx.xxx",
"action": "Built",
"src_xlated_interface": "OUTSIDE"
}
},

we're in doing ok. I think my elastic-fu is not quite strong enough yet. but am i right in my assumtion that i need to somehow change the src_ip field type to "ip" for it to accept cidr ?

ideally i guess i should get thsi done in the logstash config?


(Robert Cowart) #10

To do CIDR-based queries, the field type definitely must be IP. Field types are determined when an index is created and cannot be changed without re-indexing. It looks like you are using daily indexes, so you can update your index template to set the type, and tomorrow's index will be correct. If this is a lab environment and you don't mind losing the data you can also just delete the current indexes and template, make the changes and begin collecting again.

The docs for index templates are here...

https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-templates.html

I googled quickly and found this blog that looks like it has a good index template to start with...

https://jackhanington.com/blog/2015/06/16/send-cisco-asa-syslogs-to-elasticsearch-using-logstash/

Rob


#11

Thats the logstash i use as a base config yes. Are you saying that I can make the needed changes from the index menu I kibana or do I need to use console commands I can loose the data no problem still in test for machine learning

also could you or anyone else walk me through the console commands to update the index or can i just delete it and i will get a chance to make any adjustments when i create the index?


(system) #12

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.