Good practices with Beats and Logstash

Hello to all of you!
I have the following question about which option is better to be able to register two types of logs in elastic search.
Let's say for example that I want to log the nginx and crontab logs.
In addition to logging them (which could be done simply with filebeat) I would like to be able to treat these logs so that, for example in the case of nginx, you can filter by ip, path, HTTP code, date...which by default you could not. As the structure of the nginx and crontab logs are different, I could not use the same file in logstash to make the modifications before indexing the data in elastic, but when using filebeat to register the changes in the nginx and crontab logs, in logstash I would use as input beats, and it is precisely here where the question arises: how can I differentiate in the logstash files what information comes from the filebeat that monitors nginx from the information that comes from the filebeat that monitors crontab?
Is it possible to do this or should I forget to use filebeat and use instead logstash with the file input plugin?
Thanks!

Hi Miguel,

Logstash is very flexible with regard to this, and you would us its conditional processing features to direct the operations of your pipeline.

Let's say you are using filebeat to send your logs to logstash. You'll likely find that you can get a reasonably amount of value by using the 'modules' that come with Filebeat. In the case of filebeat, a lot of these modules make use of 'ingest pipelines' within Elasticsearch to provide the actual value, and these 'ingest pipelines' are installed for you when you run the 'setup' function of filebeat. In this case, logstash is doing compatively very little.

But I also use filebeat to do a lot of processing on various other logs. One example of where I get a lot of value is in custom processing of web server logs (eg. httpd, nginx, IIS).

One thing that will be important, and is very much to be considered a best practice, would be to keep separate any content that does not conform to ECS. You don't want to introduce a mapping conflict from different log types because they disagree on the type of a field.

Filebeat allows you to add additional fields to a input, and it adds a lot of fields of its own. Logstash plugins will also add their own fields. You can use any of these to direct your logstash processing. One handy thing to do is using fields or tags; which you can set on an input. Of note, the beats input for Logstash will set the fields [@metadata][beat] (and others) that will be useful in steering processing in your pipeline.

Filebeat can be also be told to add additional fields of your choosing to an input. I like to include one called 'fields.processing_key', which I use to describe the 'type' of log processing that needs to be done.

filter {
  if [fields][processing_key] and [fields][processing_key] == "httpd_access_json" {
    ruby {
      id => "parse-json-in-message.ruby.30"
      path => "/etc/logstash/enrichment/lib/httpd_access_json_fixup_encoding.rb"
    }
  }
}

One thing to bear in mind though; you'll likely want to have a couple of different elasticsearch outputs; one that identifies an ingest pipeline, and one that doesn't. I go beyond this and also (via earlier processing) assign an elasticsearch index I want to send the content too (and manage the template via Ansible, my chosen deployment tool).

Here's what the elasticsearch outputs look like:

output {
  #
  # For log sources that use Beats, we allow them use the Ingest Node
  # functionality within Elasticsearch, as this means less maintenance.
  # It does mean though that we might not get all the enrichment we would
  # prefer though (eg. Ingest pipelines can't use external lookups)
  #
  # https://www.elastic.co/guide/en/beats/winlogbeat/master/configuring-ingest-node.html
  # https://www.elastic.co/guide/en/logstash/current/use-ingest-pipelines.html
  #
  # Different versions of beats contain potentially different templates,
  # which is why the ES indices that are created contain a version in the
  # index name.
  #
  # This is mostly useful for filebeat, which uses ingest pipelines for its
  # various modules, which its Kibana dashboards then expect to use.
  #
  # Otherwise, if a pipeline has not been specified, we write it out as we
  # have done previously, using the index_basename we have previously determined.
  #
  # You might ask why this processing logic seems rather complex? The reason
  # is that I wanted to strike a balance between making use of the value
  # provided by ingest pipelines, while still being able to make use of richer
  # capabilities within Logstash when the processing within Ingest Pipelines
  # is not sufficient for our needs.
  #
  if [@metadata][pipeline] {
    #
    # Beats _can_ make use of ingest pipelines within Elasticsearch. This allows us
    # to make use of some of the value offered by various beats; most notably
    # Filebeat, where it is used to parse various log file formats, such as the
    # MySQL slow query log.
    #
    elasticsearch {
      id => "elasticsearch.output.ingestpipeline"
      hosts => [ "...:9200" ]
      manage_template => false
      index => "%{[@metadata][beat]}-%{[submission_metadata][version]}-%{+YYYY.MM.dd}"
      user => "..."
      password => "..."
      pipeline => "%{[@metadata][pipeline]}"
    }
  } else {
    #
    # This covers the scenario where a beat has not been used and no other index has
    # earlier been calculated in 1filter-01-index-assignment.conf
    #
    elasticsearch {
      id => "elasticsearch.output"
      hosts => [ "...:9200" ]
      manage_template => false
      index => "%{[@metadata][index_basename]}-%{+YYYY.MM.dd}"
      user => "..."
      password => "..."
    }
  }
}

(I've rewritten this slightly as if the input came directly from a beat input; my environment actually has logstash -> kafka -> logstash, so I've tried to simplify that for you).

I've also referred to [@metadata][index_basename]; for a variety of reasons (eg. keeping non-ECS content in separate indices; catering to specially engineered templates; security) I separate some sources of content into specific indices. Here's an example of part of that, which happens early in the processing.

  if [@metadata][processing_key] == "httpd_access_json"
  {
    mutate {
      id => "index_assignment.mutate.47"
      add_field => {
        "[@metadata][index_basename]" => "httpd_access_json"
      }
    }
  }

Hope you find this useful; logstash has proven itself as one of the more satisfying aspects of ELK (although to be fair, it does have its plenty of warts). Certainly its the layer where I tend to inject the most amount of value.

Thank you so much for your answer Cameron!

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.