In logstash Replace elasticsearch's @timestamp with log's time

Hi ,
I want to replace @timestash of elasticsearch with my project log file!

flilebeat Logstash Elasticsearch all are 6.2

My log format:

2018-07-20 11:40:41.196 WARN 20671 --- [DiscoveryClient-InstanceInfoReplicator-0] com.zaxxer.hikari.pool.PoolBase ...

Logstash config:

input {
  kafka {
        bootstrap_servers => "kafka0:19092,kafka1:19093,kafka2:19094"
        topics => [ "crm-dev","app-dev","server-dev",]
        codec => "json"
        group_id => "es"
        consumer_threads => 2
  }
}

filter {

  grok {
        match => [ "message" , "(?<customer_time>20%{YEAR}-%{MONTHNUM}-%{MONTHDAY} %{TIME})\s+%{LOGLEVEL:level}" ]
  }

  date {
        match => ["customer_time", "yyyy-MM-dd HH:mm:ss.SSS"]
        target =>  "@timestamp"
       }

  mutate {
    remove_field => ["[beat][name]","[beat][version]","@version","customer_time","offset"]
  }

}

output {

   stdout { codec => rubydebug }
  elasticsearch {
    codec => plain{ charset => "UTF-8" }
    hosts => "http://es1.com:9200"
    index => "%{[fields][log_topic]}-%{+YYYY.MM.dd}"
  }
}

I also refer to some posts and example of official:


but I still cant get what I want !

The result of my ES:

{
"_index": "crm-dev-2018.07.19",
"_type": "doc",
"_id": "KGUSsWQB69AjfzryUz31",
"_version": 1,
"_score": 1,
"_source": {
"fields": {
"log_topic": "crm-dev"
},
"beat": {
"hostname": "88-193"
},
"source": "/data/logs/server/crm-dev/stdout.log",
"message": "2018-07-19 13:11:10.740 WARN 32360 --- [DiscoveryClient-InstanceInfoReplicator-0] com.zaxxer.hikari.pool.PoolBase : HikariPool-1 - Failed to validate connection com.mysql.jdbc.JDBC4Connection@309b88d3 (No operations allowed after connection closed.)",
"@timestamp": "2018-07-19T05:11:10.740Z",
"level": "WARN",
"prospector": {
"type": "log"
}
}
}

@timestamp is still default value of ES,not the time of log

On the Json output, "customer_time" doesn't look to be created after all, is that the full JSON output? don't you got any "_grokparsefailure" or other tags?

The thing is that, when it talks about "date" plugin, default target is '@timestamp', and when trying to match "customer_time" within the "message", if this one is not created properly before, the date filter won't parse the real time.

So, i'm assuming that this is your main problem at all:

If you look at LOGLEVEL:level, that variable is created on the JSON output, but nothing happens with "customer_time". Try to include everything like {YOURPARSE:variable_name} or create multiple variables as YEAR:year, MONTHNUM:monthnum, etc. And insde the grok filter, do a mutate with a new field that contain all. But i'll prefer the frist part, so you treat more efficient the information, and don't got duplicated data.

Hope it helps you, i've been working for a year with elk stack, so maybe any other could help you better with things i don't know that could make easily your @timestamp replace, but if you fix that "customer_time" sure it will work.

Thank you for replay!!

I didn't get "_grokparsefailure" or other tags!

what the mean of "the full JSON output "
I have seem my log in Kibina:

{
  "_index": "crm-dev-2018.07.20",
  "_type": "doc",
  "_id": "t3bvtmQB69AjfzrytqvA",
  "_version": 1,
  "_score": null,
  "_source": {
    "message": "2018-07-20 17:07:16.624 [DiscoveryClient-InstanceInfoReplicator-0] WARN  com.zaxxer.hikari.pool.PoolBase - HikariPool-1 - Failed to validate connection com.mysql.jdbc.JDBC4Connection@66514a71 (No operations allowed after connection closed.)",
    "beat": {
      "hostname": "88-193"
    },
    "@timestamp": "2018-07-20T09:07:22.340Z",
    "fields": {
      "log_topic": "crm-dev"
    },
    "source": "/data/logs/server/crm-dev/stdout.log",
    "prospector": {
      "type": "log"
    }
  },
  "fields": {
    "@timestamp": [
      "2018-07-20T09:07:22.340Z"
    ]
  },
  "sort": [
    1532077642340
  ]
}

I have change my Logstash config by your suggest:

filter {

  grok {
        match => [ "message" , "(20%{YEAR:year}-%{MONTHNUM:monthnum}-%{MONTHDAY:monthday} %{TIME:time})\s+%{LOGLEVEL:level}" ]
  }

  mutate {
        add_field => { "customer_time" => "%{year}-%{monthnum}-%{monthday} %{time}" }
 }

  date {
        match => [ "customer_time", "yyyy-MM-dd HH:mm:ss.SSS" ]
        target =>  "@timestamp"
       }

  mutate {
       remove_field => ["[beat][name]","[beat][version]","@version","customer_time","offset"]
  }

it dosent work ,and I get some wrong:

    "beat": {
      "hostname": "88-193"
    },
    "tags": [
      "_grokparsefailure",
      "_dateparsefailure"
    ],
    "@timestamp": "2018-07-20T09:20:33.620Z",

Do not remove customer_time until you have successfully parsed it. Your grok pattern starts with 20, so that will not be in customer_time. Try

match => [ "customer_time", "yy-MM-dd HH:mm:ss.SSS" ]

Thank you for your reply!

I dont' think that's the reason. I think the value had parse to @timestamp in

  date {
        match => [ "customer_time", "yyyy-MM-dd HH:mm:ss.SSS" ]
        target =>  "@timestamp"
       }

Thank you again!
I think maybe it is my wrong!
The config of my Logstash maybe is right :

  grok {
        match => [ "message" , "(?<customer_time>20%{YEAR}-%{MONTHNUM}-%{MONTHDAY} %{TIME})\s+%{LOGLEVEL:level}" ]
  }

  date {
        match => ["customer_time", "yyyy-MM-dd HH:mm:ss.SSS"]
        target =>  "@timestamp"
       }

  mutate {
    remove_field => ["[beat][name]","[beat][version]","@version","customer_time","offset"]
  }

The "customer_time" and "@timestamp" had changed.

My servers is in China

In elasticsearch the "@timestamp" is 8 hours less than the time of "message" :

"@timestamp": "2018-07-23T01:12:02.682Z",
"fields": {
"log_topic": "crm-dev"
},
"level": "DEBUG",
"message": "2018-07-23 09:12:02.682 DEBUG [ForkJoinPool.commonPool-worker-2] c.b.c.m.CrmClientSalesMapper.selectListByClientId - <== Total: 0",

In kibana the "@timestamp" is same as time of "message"

@timestamp:July 23rd 2018, 07:57:55.792 fields.log_topic:crm-dev level:INFO message:2018-07-23 07:57:55.792 INFO [AsyncResolver-bootstrap-executor-0] c.n.d.shared.resolver.aws.ConfigClusterResolver - Resolving eureka endpoints via configuration

But it result in that the log of the time of log befor 8 o'clock will be write in previous day's index of elasticsearch!

How can I write log to the right Elasticsearch index by time of log ?

elasticsearch always saves timestamps as UTC. By default Kibana uses the browser's timezone. The timezone option in the date filter allows you to indicate what timezone the log file uses.

Thank you very much for reply!

I think the direction is wrong since it start!I don't need to change "@timestamp" value to the time of China.

I just want to write one day index by day of project's log.

Because my server in China,the log before 8 o'clock won write the previous day index that is not what I wanted.

I realized the mistake from the issue:

https://discuss.elastic.co/t/how-to-set-timestamp-timezone/28401

my solution is through "data" and "ruby":


filter {

  grok {
        match => [ "message" , "(20%{YEAR:year}-%{MONTHNUM:month}-%{MONTHDAY:day} %{TIME:time})\s+%{LOGLEVEL:level}" ]
  }

  mutate {
       add_field => [ "log_time","20%{year}-%{month}-%{day} %{time}" ]
  }
  date {
        match => [ "log_time","yyyy-MM-dd HH:mm:ss.SSS" ]
        target => "@timestamp"
       }
  ruby {
        code => [ "event.set('index_day', event.get('@timestamp').time.localtime.strftime('%Y.%m.%d'))" ]
       }

  mutate {
       remove_field => ["[beat][name]","[beat][version]","@version","offset","tmptime","log_time"]
  }

}

output {

  stdout { codec => rubydebug }

  elasticsearch {
    codec => plain{ charset => "UTF-8" }
    hosts => "http://es1.com:9200"
    index => "%{[fields][log_topic]}-%{index_day}"
  }
}
3 Likes

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.