Replace @timestamp with actual timestamp from log file

Hi,

I have json log in the following format:

{"@timestamp":"2017-01-18T11:41:28.753Z","source":"host1","level":"INFO","message":"Some log event"}

the log is read by filebeat and sent to redis/logstash/elasticsearch/kibana.

The problem is that filebeat puts in @timestamp the time at which the log entry was read, but I want to replace that field with the @timestamp value from my log file.

I have the following logstash filter:

filter {
           date {
                match => ["timestamp" , "yyyy-MM-dd'T'HH:mm:ss.SSSZ"]
                target => "@timestamp"
                add_field => { "debug" => "timestampMatched"}
           }
}                                       

Looking at the @timestamp field in kibana after I run this shows a different value, than the @timestamp field from my log. Also the debug field is not present. I run logstash with -debug option, but there is nothing in the output that gives me a hint on what's going on. No _dateparsefailure or any kind of errors in the logstash output.

Been stuck with this for a while and don't find the documentation nor the similar threads too explanatory. Something like this should be very simple IMHO. Any help?

3 Likes

Okay, I've added @ before timestamp in:

match => ["timestamp" , "yyyy-MM-dd'T'HH:mm:ss.SSSZ"]

and now I get _dateparsefailure

Testing the following with stdin input and stdout output works fine, i.e. the timestamp is matched and put in @timestamp:

input { stdin { } }

filter {
 grok {
     match => ["message", "%{TIMESTAMP_ISO8601:tstamp}"]
       }
  date {
    match => ["tstamp", "ISO8601"]
  }
}      

output { stdout { codec => rubydebug } }

However, I get " _grokparsefailure" when I change to redis input and elasticsearch output:

input {
 redis {
       host => "redis"
           data_type => "list"
           key => "my-test"
   }              

filter {
        grok {
             match => ["message", "%{TIMESTAMP_ISO8601:tstamp}"]
        }

        date {
            match => ["tstamp", "ISO8601"]
             }
    }

output {
              elasticsearch {
           hosts => ["elasticsearch-host:9200"]
               index => "my-test"
    }
}                  

I'm testing with this string:
{"@timestamp":"2017-01-18T11:41:28.753Z","source":"host1","level":"INFO","message":"Some log event"}

The "@timestamp":"2017-01-18T11:41:28.753Z" is a JSON representation of the @timestamp field and its value.

Your event message field should have a date section in the text. You need to use grok to extract that date string into a new field called say event_timestamp. Then your date filter can parse the event_timestamp field and add it to the target field which can be the @timestamp field.

Once the event is created in the beats input, the @timestamp field is no longer a String but an instance of LogStash::Timestamp - a special class used internally to represent event timestamps.

1 Like

But I'm doing exactly what you said! Extracting with grok from the message field into the "tstamp" field, then parsing "tstamp" into @timestamp with the date filter.

Could it be possible that the problem is that I have a field called @timestamp in my json and this leads to problems when the date filter tries to parse my @timestamp into the default @timestamp field?

Please show a proper example of your Log text, the bit in the message field from beats.

A sample 3 lines of my log looks like this:

{"@timestamp":"2017-01-19T12:00:57.164Z","source_host":"db020103.example.net","file":"RequestContext.java","method":"log","level":"INFO","line_number":"458","thread_name":"http-80-183","@version":1,"logger_name":"core.context.servlet.WebRequestContext","message":"Request:svt\t-989\t\t\/from\/private\/down\/air\/half.rss\t9\t210432755645\tVOID\t10.10.10.10\t1030343218988\tOK","class":"svt.core.context.RequestContext","mdc":{}}
{"@timestamp":"2017-01-19T12:00:57.178Z","source_host":"db020103.example.net","file":"RequestContext.java","method":"log","level":"INFO","line_number":"458","thread_name":"http-80-230","@version":1,"logger_name":"core.context.background.BackgroundRequestContext","message":"Request:svt\tClientContext.create\t4\t2105894543647\tSYSTEMS\t-1\tOK","class":"svt.core.context.RequestContext","mdc":{}}
{"@timestamp":"2017-01-19T12:00:57.179Z","source_host":"db020103.example.net","file":"RequestContext.java","method":"log","level":"INFO","line_number":"458","thread_name":"http-80-230","@version":1,"logger_name":"core.context.servlet.WebRequestContext","message":"Request:svt\t684325994\t\t\/v\/311\/104546\/layout\/b\/image\/big\/random_2\/random_image-206.jpg\t7\t2104434555646\tVOID\t10.10.10.10\t1030432418989\tOK","class":"core.context.RequestContext","mdc":{}}

These three lines are not your original data. The ones you show above have already been processed by Logstash once. Correct? Is this what is being read from Redis by any chance?

What I posted is my original log which is in json (I've only replaced some sensitive values). This log has not been processed by logstash before (don't know why you think that). The log is read by filebeat and sent to redis, from where it is retrieved by logstash and sent further to elasticsearch.

Edit: my log indeed does have field named @timestamp

I think that it has been processed by Logstash because it has an @timestamp and @version JSON fields.

But maybe you are using a Logstash compatible Logging Java component.

Moving on...
Your test using the stdin input is not a correct simulation of the Redis input because the redis input will automatically use the JSON codec to convert the JSON string to a Map and create the Logstash event from that - and in the process automatically parse the "2017-01-19T12:00:57.164Z" string from the @timestamp JSON field.

As the JSON source is already in a Logstash compatible format you will not need to grok or date convert anything.

To correctly simulate the redis input behaviour with stdin you need to direct it to use the json codec.

input { stdin { codec => "json" } }

You are right, using "input { stdin { codec => "json" } }" led to "_grokparsefailure" when using my sample json log.

However, if I don't grok or date convert, then the final event I see in Elasticsearch/Kibana has a different @timestamp field, than the original @timestamp value that I have in my log for that message. The value in Kibana I assume is the date and the time at which the log was read by filebeat.

So back to my original question, how to replace @timestamp with the value from my log?

What is your filebeat config?

#------------------------------ Log prospector --------------------------------
- input_type: log
 file is defined twice as this can lead to unexpected behaviour.
  paths:
    - /var/log/my-log.json


  ### JSON configuration

  # Decode JSON options. Enable this if your logs are structured in JSON.
  # JSON key on which to apply the line filtering and multiline settings. This key
  # must be top level and its value must be string, otherwise it is ignored. If
  # no text key is defined, the line filtering and multiline features cannot be used.
  #json.message_key:
  json.message_key: message

  # By default, the decoded JSON is placed under a "json" key in the output document.
  # If you enable this setting, the keys are copied top level in the output document.
  #json.keys_under_root: false
  json.keys_under_root: true

  # If keys_under_root and this setting are enabled, then the values from the decoded
  # JSON object overwrite the fields that Filebeat normally adds (type, source, offset, etc.)
  # in case of conflicts.
  #json.overwrite_keys: false

  # If this setting is enabled, Filebeat adds a "json_error" key in case of JSON
  # unmarshaling errors or when a text key is defined in the configuration but cannot
  # be used.
  #json.add_error_key: false

 
#------------------------------- Redis output ---------------------------------
output.redis:
  # Boolean flag to enable or disable the output module.
  enabled: true

  # The list of Redis servers to connect to. If load balancing is enabled, the
  # events are distributed to the servers in the list. If one server becomes
  # unreachable, the events are distributed to the reachable servers only.
  hosts: ["10.10.10.20"]

  # The Redis port to use if hosts does not contain a port number. The default
  # is 6379.
  #port: 6379

  # The name of the Redis list or channel the events are published to. The
  # default is filebeat.
  #key: filebeat
  key: my-test

I've omitted most commented out lines.

OK so you are using JSON decoding in filebeat.
What does a JSON document in Redis look like?
Use the Redis CLI to read one doc and post it here.

I've changed "json.overwrite_keys: false" to true in filebeat config and now the @timestamp in Elasticsearch matches the one in my log. I also removed grok and date filters from logstash config, i.e. now it goes straight from Redis input to Elasticsearch output without any filters.

I assume that you were right when you said that no grok/date filtering was needed when logs are already in a Logstash compatible format

Thank you for your help @guyboertje

7 Likes

You are welcome.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.