Ingesting JSON logs with filebeat and logstash

I hope this messages finds the community member's safe and healthy.

I am trying to ingest around 600 GB of logs spread across multiple JSON files. Here are few lings of logs (these are scrubbed DNS logs)

{"timestamp":"1632164841","name":"*.pima.iii.com","type":"a","value":"52.11.181.87"}
{"timestamp":"1632164858","name":"*.snjac.iii.com","type":"a","value":"34.237.109.119"}
{"timestamp":"1632164888","name":"*.wideu.iii.com","type":"a","value":"3.233.209.224"}

I tried sending logs using filebeat via logstash to Elasticsearch.

Here is my configuration of filebeat:

- type: log
  json.keys_under_root: true
  json.overwrite_keys: true
  json.add_error_key: true
  json.expand_keys: true

  # Change to true to enable this input configuration.
  enabled: true

  # Paths that should be crawled and fetched. Glob based paths.
  paths:
    - P:\dns\*.json
    #- /var/log/*.log
    #- c:\programdata\elasticsearch\logs\*

Here is the logstash pipeline configuration:

input {
  beats {
    port => 5656 #port for filebeat - Oxford  only
    #id => "beats_ingest"
  }
}



#filter {
#    json {
#      source => "message"
#    }
#}


output {
  elasticsearch {
   hosts => ["https://ES1:9200","https://ES2:9200"]
   index => "dissertation-oxford-%{+yyyy.MM.dd}"
   ssl => true
   user => 'redacted'
   password => 'redacted'
   cacert => '/etc/logstash/elasticsearch-ca.pem'
   ssl_certificate_verification => false
   ilm_enabled => auto
   ilm_rollover_alias => "dissertation-oxford"
  }
}

Here are my questions:

  1. The timestamp JSON keypair having EPOCH times need conversation to human readable time stamp. I have tried few configurations in mutate but the data always ends up under the original JSON key with EPOCH time. -- Do I need changes in my filebeat configuration with mutate under logstash
  2. How do replace JSON key value title? For example instead of "name":"*.wideu.iii.com" I would want "dns_request":"*.wideu.iii.com"

Thank you very much.

you need to use filter for any modification

   date { match => ["timestamp", "UNIX"]
          target => "timestamp" }
   mutate { rename => {"name" => "dns_request" } }

and your output would look like this

timestamp and @timestamp is two different thing here. one is your record one is ingestion time.

{
      "timestamp" => 2021-09-20T19:07:21.000Z,
     "@timestamp" => 2021-10-16T03:02:06.033Z,
          "value" => "52.11.181.87",
           "type" => "a",
    "dns_request" => "*.pima.iii.com"
}
1 Like

Thank you very much @elasticforme let me try this and update.

This works perfectly. Thank you very much Sachin. Here is the final logstash configuration i am using (in case someone else needs it):

input {
  beats {
    port => 5656 #port for filebeat - Oxford  only
    #id => "beats_ingest"
  }
}



filter {
    json {
      source => "message"
    }
	
	date {
	  match => ["timestamp", "UNIX"]
      target => "timestamp"
	  }
	
	mutate { rename => {"name" => "dns_request" } }
}


output {
  elasticsearch {
   hosts => ["https://ES1:9200","https://ES2:9200"]
   index => "dissertation-oxford-%{+yyyy.MM.dd}"
   ssl => true
   user => 'redacted'
   password => 'redacted'
   cacert => '/etc/logstash/elasticsearch-ca.pem'
   ssl_certificate_verification => false
   ilm_enabled => auto
   ilm_rollover_alias => "dissertation-oxford"
  }
}

Hello,

Is it possible to reindex data and take the value field which holds the IP addresses and run them against GeoIP DB? Or will I need to reingest the data?

pretty much anything is possible.

explain little more. I didn't understand

Hello @elasticforme - Thank you very much for replying.

As of now I'm using offline GeoDB to determine where the IP address is possibly from, here is the current logstash configuration:

filter {
    json {
      source => "message"
    }

        date {
          match => ["timestamp", "UNIX"]
      target => "timestamp"
          }

        mutate { rename => {"name" => "dns_request" } }
        mutate { rename => {"value" => "resolved_ip" } }

   geoip {
    source => "resolved_ip"
    target => "geoip"
    database => "/opt/logstash/vendor/geoip/GeoLite2-City.mmdb"

  }
}

I am hoping to enhance this further.

The current set up is:
I'm running filebeat on my laptop which is in running state from last 3 days running filebeat and forwarding the file to Logstash which is running on a Raspberry pi.

The ingestion rate is ~1200 event per second. I want to now introduce a filter in logstash that will carry out a network call and fetch data which I feel will slow down the ingestion to around 20 to 30 EPS given that it is on my home internet.

Here are my questions

  1. As of now ~50% of the file with ~126,796,781 records has been ingested. How will any change in ingestion work for the entirety of the record? As in - I can shutdown filebeat now which is reading the file & make changes to logstash pipeline & start filebeat again. But that will only cater to records being indexed henceforth - How do work on the ~126,796,781 that have been indexed? What is the correct API to invoke reindexing while ensuring time is not wasted processing 50% of the file that will have the new logstash pipeline.

  2. Further to point 1 - Is there a way I can do this without affecting current ingestion rate by making a this process start from the very first event that is indexed and enhance the current index as in a separate pipeline and not part of the current one?

I'm sorry if this sounds confusing, please do let me know if I need to reword it. Lots going on :smiley:

When you said reindex I got confused.
here you are asking if you update your config file and add one more field called geoip. what will happen to old record which are already ingested?

if that is the question then that old record will not have geoip field. and new record will have geoip field.

You actually do not have document_id setup in your ouput that means each document(record) in Elasticsearch has randomly created id for that record "_id"

only way I see here is.

  1. stop logstash
  2. delete index
  3. run new logstash and ingest all again.
1 Like

Thank you very much for giving that guidance. I've followed your advise and carried out the changes.

Would you be able to help me with this one: Dynamic input (file hashes) for HTTP JSON data collection

Thank you once again.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.