Using Elastic SIEM and ML with Beats and Logstash

Hello all. My question is what is the general method for using beats with logstash if you want access to Elastic SIEM and the like?

Elastic SIEM works well when the data is gathered via beats and sent directly to Elasticsearch. For my use case I need to be able to send the data gathered from the beats over a socket, and also to elasticsearch. Since Beats don't support multiple outputs, I was wondering what the best way to do this is?

I am initially thinking of Logstash since it does support multiple outputs. However in the past Logstash didn't work with the Beats if you still want to use Elastic SIEM, I think with the issue being that their templates and ingest pipelines weren't loaded. If using Logstash is the best option, what do I need to do to circumvent this? I'm using modules so I was thinking the process would be:
Temporarily disable logstash output and post to elasticsearch the templates, as mentioned here: https://www.elastic.co/guide/en/beats/packetbeat/current/packetbeat-template.html#load-template-manually
But my question is, do I need to do this for every beat I'm using and for EACH Filebeat module template as well?
Next, load the ingest pipelines in a similar manner to Elasticsearch. Again this needs to be repeated for example Filebeat, and Suricata and Netflow modules?
Next, configure the Logstash Elasticsearch output to send to a pipeline.
Is there anything else that must be done for the data to be properly stored? I'm not sure what all the beats have to do and what of that is on the filebeat/elasticsearch side of things, so I'm really not sure.

Is this the best option? Or should I rather have 2 copies of each beat, and send one to logstash to socket and one to elasticsearch. My question with this is: what is the additional overhead of the reads from the network interface (eg with pcap or af_packet). Will this cause a significant slowdown?

Thanks for your time reading this, I'm really not sure how one would configure this conceptually and previous attempts have failed.

Another issue with this I've noticed is that Beats, even when passed to logstash and directed to the correct ingest node, is that they have the tag as "Beats_input_raw_event". Is this added by logstash? How can I remove this? How can I make logstash otherwise transparent to just passing them to Elasticsearch? And what exactly makes the beats not behave so well with SIEM when passed through logstash first?

Hi @chancewwr Could you be more specific about "what exactly makes the beats not behave so well with SIEM when passed through logstash first". I do not experience any problems with a setup like beats -> Logstash -> Elasticsearch -> Kibana apps like SIEM. I would say that it depends on your requirements and experience with infrastructure setup you should choose. For several customers Logstash is a good choice e.g. easy configurable persistent queues, filters, transforms ect. Which beats are you using?

Hey thank you for the reply fgjensen! I'm currently using Filebeat (with the Netflow and Suricata module) and Packetbeat. How do you have your pipeline configured? I'm outputting from beats to logstash and from there I'm currently using this as my output:

if [@metadata][pipeline] {
  elasticsearch {
    hosts => ["${ELASTICSEARCH_HOSTS}"]
    manage_template => false
    user => "${ES_USERNAME}"
    password => "${ES_PASSWORD}"
    index => "%{[@metadata][beat]}-%{[@metadata][version]}"
    pipeline => "%{[@metadata][pipeline]}"
    cacert => "/usr/share/logstash/certificates/ca.pem"
  }
} else {
  elasticsearch {
    hosts => ["${ELASTICSEARCH_HOSTS}"]
    manage_template => false
    user => "${ES_USERNAME}"
    password => "${ES_PASSWORD}"
    index => "%{[@metadata][beat]}-%{[@metadata][version]}"
    cacert => "/usr/share/logstash/certificates/ca.pem"
  }
}

And inputting my templates/ingest node pipelines via curl on the first run of the deployment. I got the pipelines and templates from the filebeat container, but it required converting from yaml to json for the templates. Is this how you would recommend doing this? I still get the "beats_input_raw_event" tag on all of my beats.

Your Logstash pipeline configuration and the steps taken to get there looks pretty much the same as I have setup for some customers. In works well. One change you may consider is to write to index aliases and configure ILM for your indices. That could be your next step, when the raw event has been fixed.

I think the beats_input_raw_event is caused by your Filebeat configuration. Could you share your filebeat and modules configuration, please?

Thank you again for your help, it's very appreciated.
Here is my current filebeat setup:

# ========================== Filebeat global options ===========================
filebeat.overwrite_pipelines: true

filebeat.config:
  modules:
  enabled: true
  path: modules.d/*.yml
  reload.enabled: true
  reload.period: 10s

#============================= Logstash Output =================================
output.logstash:
  enabled: true
  hosts: ["${LOGSTASH_HOST}"]
  ssl:
    enabled: true
    certificate_authorities: ["/usr/share/filebeat/config/certificates/ca.pem"]
    certificate: "/usr/share/filebeat/config/certificates/elastic.pem"
    key: "/usr/share/filebeat/config/certificates/elastic-key.pem"
    verification_mode: "full"

# ================================== Template ==================================
# Our output is logstash, not elasticsearch so we need to load the templates
# manually.
setup.template.enabled: false
setup.template.overwrite: false

#============================== Kibana =========================================
setup.kibana:
  host: "${KIBANA_HOST}"
  username: ${ES_USERNAME}
 password: ${ES_PASSWORD}
 ssl:
   enabled: true
   certificate_authorities: ["/usr/share/filebeat/config/certificates/ca.pem"]
   certificate: "/usr/share/filebeat/config/certificates/elastic.pem"
   key: "/usr/share/filebeat/config/certificates/elastic-key.pem"

#============================== Dashboards =====================================
setup.dashboards:
 enabled: true
 retry: 
  enabled: true
  maximum: 0 #unlimited

I load the templates and pipelines using curl that's run from a container. I also run filebeat modules enable netflow suricata. This config file is essentially the exact same for the elasticsearch output except I enable the template loading.
I input it into logstash using:

input {
  beats {
    port => "${BEATS_PORT:5044}"
    ssl => true
    ssl_certificate_authorities => ["/usr/share/logstash/certificates/ca.pem"]
    ssl_certificate => "/usr/share/logstash/certificates/elastic.pem"
    ssl_key => "/usr/share/logstash/certificates/elastic-key.pem"
    ssl_verify_mode => "force_peer"
  }
}

Any suggestions/critiques? Thanks for the help again

Since your are using Logstash you should commend out filebeat.overwrite_pipelines: true. Could you also list the output from ./filebeat test config and ./filebeat modules list? It is a bit difficult to read id the yaml is correctly indented.

Sorry for the late reply. Looking above it appears i just mistyped the indentation on the kibana password and ssl stuff since I was doing it manually. I think my yaml is correct.
The results from those commands are:
Config OK
And

Enabled:
netflow
suricata

Disabled:
activemq
apache
auditd
aws
azure
cef
checkpoint
cisco
coredns
crowdstrike
elasticsearch
envoyproxy
fortinet
googlecloud
haproxy
ibmmq
icinga
iis
iptables
kafka
kibana
logstash
misp
mongodb
mssql
mysql
nats
nginx
o365
okta
osquery
panw
postgresql
rabbitmq
redis
santa
system
traefik
zeek

Which I think is correct. I just reran everything to be sure and I'm still getting issues. When I try to load Elastic SIEM I get "Your visualization has error(s). Data Fetch Failure"

[illegal_argument_exception] Text fields are not optimised for operations that require per-
document field data like aggregations and sorting, so these operations are disabled by 
default. Please use a keyword field instead. Alternatively, set fielddata=true on
[event.dataset] in order to load field data by uninverting the inverted index. Note that this 
can use significant memory. (and) [illegal_argument_exception] Text fields are not optimised 
for operations that require per-document field data like aggregations and sorting, so these 
operations are disabled by default. Please use a keyword field instead. Alternatively, set 
fielddata=true on [event.dataset] in order to load field data by uninverting the inverted 
index. Note that this can use significant memory.

This happens with Overview, Hosts, Network, and Detections.

Along with the tags Suricata and beats_input_raw_event for each document. Upon checking my templates I found I was accidentally still sending 7.8.0 templates but my indices were for 7.8.1 (is there a good way to post templates if using logstash output? the filebeat i'm using is dockerized and there's already another entrypoint so using the --index-management doesn't seem like it would work).
The templates were not my error however and I still get the exact same errors above.

(is there a good way to post templates if using logstash output? Only to do has you have already done or temporary disable the Logstash output :slightly_smiling_face: If you try to netflow and suricata and enable the system module do you then still get the illegal argument exception and the beats_input_raw_event?

So I've done some more research and it appears that the Suricata dashboards don't work either, because for example Could not locate that index-pattern-field (id: source.geo.country_iso_code). The dashboards and SIEM work fine without logstash.
I am sure that my index templates and ingest pipelines are correct and identical on both configurations. I'm checking this in Kibana under Index Management Index Templates. However, under under Index Management Indices, the two configurations are very different, with the logstash version being called filebeat-7.8.1, and the elasticsearch version being called filebeat-7.8.1-2020.08.10-000001, furthermore under the Mappings tab, this is what the elasticsearch configuration is:

{
  "mappings": {
    "_doc": {
      "_meta": {
        "beat": "filebeat",
        "version": "7.8.1"
      },
      "dynamic_templates": [
        {
          "labels": {
            "path_match": "labels.*",
            "match_mapping_type": "string",
            "mapping": {
              "type": "keyword"
            }
          }
        },
        {
          "container.labels": {
            "path_match": "container.labels.*",
            "match_mapping_type": "string",
            "mapping": {
              "type": "keyword"
            }
          }
        },
        {
          "dns.answers": {
            "path_match": "dns.answers.*",
            "match_mapping_type": "string",
            "mapping": {
              "type": "keyword"
            }
          }
        },
        {
          "log.syslog": {
            "path_match": "log.syslog.*",
            "match_mapping_type": "string",
            "mapping": {
              "type": "keyword"
            }
          }
        },
        {
          "network.inner": {
            "path_match": "network.inner.*",
            "match_mapping_type": "string",
            "mapping": {
              "type": "keyword"
            }
          }
        },
        {
          "observer.egress": {
            "path_match": "observer.egress.*",
            "match_mapping_type": "string",
            "mapping": {
              "type": "keyword"
            }
          }
        },
        {
          "observer.ingress": {
            "path_match": "observer.ingress.*",
            "match_mapping_type": "string",
            "mapping": {
              "type": "keyword"
            }
          }
        },
        {
          "fields": {
            "path_match": "fields.*",
            "match_mapping_type": "string",
            "mapping": {
              "type": "keyword"
            }
          }
        },
        {
          "docker.container.labels": {
            "path_match": "docker.container.labels.*",
            "match_mapping_type": "string",
            "mapping": {
              "type": "keyword"
            }
          }
        },
        {
...

Whereas the non logstash version gives:

{
  "mappings": {
    "_doc": {
      "properties": {
        "@timestamp": {
          "type": "date"
        },
        "@version": {
          "type": "text",
          "fields": {
            "keyword": {
              "type": "keyword",
              "ignore_above": 256
            }
          }
        },
        "agent": {
          "properties": {
            "ephemeral_id": {
              "type": "text",
              "fields": {
                "keyword": {
                  "type": "keyword",
                  "ignore_above": 256
                }
              }
            },
            "hostname": {
              "type": "text",
              "fields": {
                "keyword": {
                  "type": "keyword",
                  "ignore_above": 256
                }
              }
            },
            "id": {
              "type": "text",
              "fields": {
                "keyword": {
                  "type": "keyword",
                  "ignore_above": 256
                }
              }
            },
            "name": {
              "type": "text",
              "fields": {
                "keyword": {
                  "type": "keyword",
                  "ignore_above": 256
                }
              }
            },
            "type": {
              "type": "text",
              "fields": {
                "keyword": {
                  "type": "keyword",
                  "ignore_above": 256
                }
              }
            },
            "version": {
              "type": "text",
              "fields": {
                "keyword": {
                  "type": "keyword",
                  "ignore_above": 256
                }
              }
            }
          }
        },
        "destination": {
          "properties": {
            "address": {
              "type": "text",
              "fields": {
                "keyword": {
                  "type": "keyword",
                  "ignore_above": 256
                }
              }
            },
            "as": {
              "properties": {
                "number": {
                  "type": "long"
                },
                "organization": {
                  "properties": {
                    "name": {
                      "type": "text",
                      "fields": {
                        "keyword": {
                          "type": "keyword",
                          "ignore_above": 256
                        }
                      }
                    }
                  }
                }
              }
            },
            "bytes": {
              "type": "long"
            },
            "domain": {
              "type": "text",
              "fields": {
                "keyword": {
                  "type": "keyword",
                  "ignore_above": 256
                }
              }
            },
            "geo": {
              "properties": {
                "city_name": {
                  "type": "text",
                  "fields": {
                    "keyword": {
                      "type": "keyword",
                      "ignore_above": 256
                    }
                  }
                },
                "continent_code": {
                  "type": "text",
                  "fields": {
                    "keyword": {
                      "type": "keyword",
                      "ignore_above": 256
                    }
                  }
                },
                "country_code2": {
                  "type": "text",
                  "fields": {
                    "keyword": {
                      "type": "keyword",
                      "ignore_above": 256
                    }
                  }
                },
                "country_code3": {
                  "type": "text",
                  "fields": {
                    "keyword": {
                      "type": "keyword",
                      "ignore_above": 256
                    }
                  }
                },
                "country_name": {
                  "type": "text",
                  "fields": {
                    "keyword": {
                      "type": "keyword",
                      "ignore_above": 256
                    }
                  }
                },
                "dma_code": {
                  "type": "long"
                },
                "ip": {
                  "type": "text",
                  "fields": {
                    "keyword": {
                      "type": "keyword",
                      "ignore_above": 256
                    }
                  }
                },
                "latitude": {
                  "type": "float"
                },
                "location": {
                  "properties": {
                    "lat": {
                      "type": "float"
                    },
                    "lon": {
                      "type": "float"
                    }
                  }
                },
                "longitude": {
                  "type": "float"
                },
                "postal_code": {
                  "type": "text",
                  "fields": {
                    "keyword": {
                      "type": "keyword",
                      "ignore_above": 256
                    }
                  }
                },
                "region_code": {
                  "type": "text",
                  "fields": {
                    "keyword": {
                      "type": "keyword",
                      "ignore_above": 256
                    }
                  }
                },
                "region_name": {
                  "type": "text",
                  "fields": {
                    "keyword": {
                      "type": "keyword",
                      "ignore_above": 256
                    }
                  }
                },
                "timezone": {
                  "type": "text",
                  "fields": {
                    "keyword": {
                      "type": "keyword",
                      "ignore_above": 256
                    }
                  }
                }
              }
            },
            "ip": {
              "type": "text",
              "fields": {
                "keyword": {
                  "type": "keyword",
                  "ignore_above": 256
                }
              }
            },
            "packets": {
              "type": "long"
            },
            "port": {
              "type": "long"
            }
          }
        },
        "dns": {
          "properties": {
            "answers": {
              "properties": {
                "data": {
                  "type": "text",
                  "fields": {
                    "keyword": {
                      "type": "keyword",
                      "ignore_above": 256
                    }
                  }
                },
                "name": {
                  "type": "text",
                  "fields": {
                    "keyword": {
                      "type": "keyword",
                      "ignore_above": 256
                    }
                  }
                },
                "ttl": {
                  "type": "long"
                },
                "type": {
                  "type": "text",
                  "fields": {
                    "keyword": {
                      "type": "keyword",
                      "ignore_above": 256
                    }
...

(note that the logstash version does NOT include the source.geo.country_iso_code field, which I think causes the first dashboard error I mentioned. These outputs have been trimmed since they're extremely long.
I'm not sure what could be causing this, or what this "mapping" really even is, since the template is defined and looks good. Does logstash need to create an index mapping or something that I'm not doing?

The filebeat index template has not been applied to the filebeat-7.8.1 index Logstash is writing to. Perhaps you started filebeat and Logstash before you loaded the index template? In Kibana under the Index Templates tab which index patterns are listed for Filebeat index template? If it is filebeat-7.8.1-* you may stop indexing from Logstash, delete the filebeat-7.8.1 index and restart indexing from Logstash. A new filebeat-7.8.1 index will be created in Elasticsearch configured with the filebeat index template. Please, be aware you have to configure Index Lifecycle Management manually for this index.

1 Like

This was it! In my logstash config changing

index => "%{[@metadata][beat]}-%{[@metadata][version]}"

to

index => "%{[@metadata][beat]}-%{[@metadata][version]}-%{+YYYY.MM.dd}"

fixed it! The template wasn't applying since filebeat-7.8.1 doesn't match filebeat-7.8.1-*, so with the date addition and deleting the index, everything in SIEM is now working.
Thank you for the help!

I am still getting the beats_input_raw_event tag for everything that comes in, both for packetbeat and logstash however. I'm not sure why this is happening unless it's just logstash saying that it got input from beats. I'm not sure if it's an error of some sort or normal? I may just manually remove this tag in my logstash pipeline if it isn't an error of some sort.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.