Timestamps are getting merged when ingesting old data using filebeat

vmstat Log File 1:

Fri Mar 17 15:00:37 CDT 2017
 kthr      memory            page            disk          faults      cpu
 r b w   swap  free  re  mf pi po fr de sr m0 m1 m2 m3   in   sy   cs us sy id
 3  0  0 113340 877904   0   0   0  0   0   0   0  90   0   0   0     0     0     0   6   2  91
 0  0  0 113340 877368   0   0   0  0   0   0   0   0   0   0   0     0     0   247   1   0  99
 2  0  0 113340 876360   0   0   0  0   0   0   0 160   0   0   0     0     0   395   2   1  97
 0  0  0 113340 877468   0   0   0  0   0   0   0  92   0   0   0     0     0  3731  11   8  81
Fri Mar 17 15:00:47 CDT 2017
 kthr      memory            page            disk          faults      cpu
 r b w   swap  free  re  mf pi po fr de sr m0 m1 m2 m3   in   sy   cs us sy id
 3  0  0 113340 878508   0   0   0  0   0   0   0  90   0   0   0     0     0     0   6   2  91
 0  0  0 113340 878076   0   0   0  0   0   0   0   0   0   0   0     0     0   257   0   1 100
 0  0  0 113340 878076   0   0   0  0   0   0   0  32   0   0   0     0     0   277   1   1  99
 0  0  0 113340 878092   0   0   0  0   0   0   0  36   0   0   0     0     0   264   0   0 100
Fri Mar 17 15:00:57 CDT 2017
 kthr      memory            page            disk          faults      cpu
 r b w   swap  free  re  mf pi po fr de sr m0 m1 m2 m3   in   sy   cs us sy id
 3  0  0 113340 877268   0   0   0  0   0   0   0  90   0   0   0     0     0     0   6   2  91
 0  0  0 113340 876928   0   0   0  0   0   0   0 292   0   0   0     0     0   271   0   1  99
 0  0  0 113340 876928   0   0   0  0   0   0   0   4   0   0   0     0     0   245   0   0 100
 0  0  0 113340 876936   0   0   0  0   0   0   0  40   0   0   0     0     0   268   0   1  99

vmstat Log File 2:

Wed Apr 19 00:30:36 EDT 2017
 kthr      memory            page            disk          faults      cpu
 r b w   swap  free  re  mf pi po fr de sr m0 m1 m2 m3   in   sy   cs us sy id
 4  0  0  20128 1173096   0   0   0  0   0   0   0  46   0   0   0     0     0     1   2   1  97
 0  0  0  20128 1172168   0   0   0  0   0   0   0   4   0   0   0     0     0  2478   2   1  97
 1  0  0  20128 1171452   0   0   0  0   0   0   0  44   0   0   0     0     0  1820   1   1  98
 1  0  0  20128 1171700   0   0   0  0   0   0   0 1408   0   0   0     0     0  1959   5   1  94
Wed Apr 19 00:40:41 EDT 2017
 kthr      memory            page            disk          faults      cpu
 r b w   swap  free  re  mf pi po fr de sr m0 m1 m2 m3   in   sy   cs us sy id
 3  0  0  20128 1164448   0   0   0  0   0   0   0  46   0   0   0     0     0     1   2   1  97
 0  0  0  20128 1162572   0   0   0  0   0   0   0 2196   0   0   0     0     0  3357   5   1  93
 0  2  0  20128 1162696   0   0   0  0   0   0   0 2752   0   0   0     0     0  3009   5   1  93
 3  0  0  20128 1160320   0   0   0  0   0   0   0 2056   0   0   0     0     0  2719   4   1  95
Wed Apr 19 00:50:47 EDT 2017
 kthr      memory            page            disk          faults      cpu
 r b w   swap  free  re  mf pi po fr de sr m0 m1 m2 m3   in   sy   cs us sy id
 3  0  0  20128 1461804   0   0   0  0   0   0   0  46   0   0   0     0     0     1   2   1  97
 0  0  0  20128 1461480   0   0   0  0   0   0   0   0   0   0   0     0     0  1228   0   0 100
 0  0  0  20128 1461604   0   0   0  0   0   0   0 848   0   0   0     0     0  1208   0   0  99
 0  0  0  20128 1462100   0   0   0  0   0   0   0   4   0   0   0     0     0  1188   0   0  99

We are using filebeat version 1.2.3 for log shipping to logstash 5.2.1 and output of logstash is elasticsearch 5.2.1. When I ingest one of these logs, its looking good. Everyday ingestion is also looks good. But problem comes when I ingest historical data. So when i ingest above two logs, indices are not getting created properly. Sometimes, indices are getting created properly and sometimes not. And if i ingest different types of logs(vmstat,netstat and iostat) all at a time, then timestamps are getting swapped between docs. Could you please anyone help me out. Thanks.

Please format logs and config files using the </> button in the Editors toolbar.

I think filebeat 1.2.3 is EOL and there are sooo many changes ever since. Have you considered upgrading filebeat?

I guess you are using some kind of multiline filter? You apply multiline in filebeat or logstash? Normally it's recommended to apply multiline in filebeat.

So when i ingest above two logs, indices are not getting created properly

Can you give example data with example index names being generated?

In the end it is logstash deciding into which index the event should indexed. You extract the correct timestamp in Logstash. Is there some subtle difference making logstash not correctly parsing the timestamp from the contents?

How do you determine the events timestamp?

Hi @steffens, Thanks for the quick reply. Our filebeat is installed on client side and currently we are not facing any issues with filebeat as of now. If we have any concerning problem then we might upgrade it.
We are not using any multiline filter either in filebeat or logstash. Do i need multiline filter in filebeat?

I ingested these two logs.
-rwxr-x--- 1 xxxx xxxx 55629 Aug 23 11:17 vmstat.HF_12345.zlt10357.xxxx.201704200030.92663.log
-rwxr-x--- 1 xxxx xxxx 80217 Aug 21 09:51 vmstat.HF.host167.appuser1.201609130030.28224.log

filebeat.yml:

 prospectors:
    # Each - is a prospector. Below are the prospector specific configurations
    -
      # Paths that should be crawled and fetched. Glob based paths.
      # To fetch all ".log" files from a specific level of subdirectories
      # /var/log/*/*.log can be used.
      # For each file found under this path, a harvester is started.
      # Make sure not file is defined twice as this can lead to unexpected behaviour.
      paths:
        - /opt/app/xxxxx/data_logs/vmstat.*
      fields:
        MOTS_ID: 1234
      document_type: vmstat

Example data:

{
  "_index": "1234_xxxxx_vmstat-2017.04.20",
  "_type": "vmstat",
  "_id": "AV4P50EDyTd80Pm_-03t",
  "_score": null,
  "_source": {
    "kthr_w": 0,
    "cpu_id": 97,
    "cpu_us": 2,
    "user_name": "xxxxx",
    "source": "/opt/app/xxxxx/data_logs/vmstat.HF_12345.zlt10357.xxxxx.201704200030.92663.log",
    "type": "vmstat",
    "kthr_r": 3,
    "cpu_sy": 1,
    "page_mf": 0,
    "page_sr": 0,
    "@version": "1",
    "beat": {
      "hostname": "host2770",
      "name": "host2770"
    },
    "host": "host2770",
    "mem_swap": 24368,
    "faults_in": 0,
    "offset": 55241,
    "disk_m1": 0,
    "disk_m2": 0,
    "page_fr": 0,
    "disk_m0": 47,
    "faults_sy": 0,
    "input_type": "log",
    "count": 1,
    "disk_m3": 0,
    "page_de": 0,
    "message": " 3  0  0  24368 2035036   0   0   0  0   0   0   0  47   0   0   0     0     0     1   2   1  97",
    "faults_cs": 1,
    "tags": [
      "beats_input_codec_plain_applied",
      "no_date_line"
    ],
    "@timestamp": "2017-04-20T20:39:06.000Z",
    "page_pi": 0,
    "mem_usage": 0,
    "page_po": 0,
    "mem_free": 2035036,
    "page_re": 0,
    "fields": {
      "MOTS_ID": 1234
    },
    "kthr_b": 0,
    "cpu_usage": 3
  },
  "fields": {
    "@timestamp": [
      1492720746000
    ]
  },
  "sort": [
    1492720746000
  ]
}

Bad data:

{
  "_index": "1234_xxxxx_vmstat-2017.04.20",
  "_type": "vmstat",
  "_id": "AV4P50EDyTd80Pm_-03z",
  "_score": null,
  "_source": {
    "kthr_w": 0,
    "cpu_id": 100,
    "cpu_us": 0,
    "user_name": "appuser1",
    "source": "/opt/app/xxxxx/data_logs/vmstat.HF.host167.appuser1.201609130030.28224.log",
    "type": "vmstat",
    "kthr_r": 0,
    "cpu_sy": 0,
    "page_mf": 0,
    "page_sr": 0,
    "@version": "1",
    "beat": {
      "hostname": "host2770",
      "name": "host2770"
    },
    "host": "host2770",
    "mem_swap": 209192,
    "faults_in": 0,
    "offset": 59492,
    "disk_m1": 0,
    "disk_m2": 0,
    "page_fr": 0,
    "disk_m0": 36,
    "faults_sy": 0,
    "input_type": "log",
    "count": 1,
    "disk_m3": 0,
    "page_de": 0,
    "message": " 0  0  0 209192 4149088   0   0   0  0   0   0   0  36   0   0   0     0     0   637   0   0 100",
    "faults_cs": 637,
    "tags": [
      "beats_input_codec_plain_applied",
      "no_date_line"
    ],
    "@timestamp": "2017-04-20T20:39:06.000Z",
    "page_pi": 0,
    "mem_usage": 0,
    "page_po": 0,
    "mem_free": 4149088,
    "page_re": 0,
    "fields": {
      "MOTS_ID": 1234
    },
    "kthr_b": 0,
    "cpu_usage": 0
  },
  "fields": {
    "@timestamp": [
      1492720746000
    ]
  },
  "sort": [
    1492720746000
  ]
}

Timestamp code from logstash conf:

 grok {
      match => { "message" => "%{DATESTAMP_OTHER:time}" }
      tag_on_failure => [ "no_date_line" ]
    }
# if we're processing the first line, remember the date
    if "no_date_line" not in [tags] {
        aggregate {
          task_id => "%{taskId}"
          code => "map['time'] = event.get('time')"
        }
    }
# if we're processing the next lines, add the date
    else {
       aggregate {
         task_id => "%{taskId}"
         code => "event.set('time', map['time'])"
         map_action => "update"
    #     timeout => 0
       }
    }
# Parse the date
    date {
      match => [ "time", "EEE MMM dd HH:mm:ss z YYYY" ]
      remove_field => [ "time" ]
    }

Please let me know if any changes needs to be done. Thanks.

This is supposed to be one event, right?

Fri Mar 17 15:00:37 CDT 2017
 kthr      memory            page            disk          faults      cpu r b w   swap  free  re  mf pi po fr de sr m0 m1 m2 m3   in   sy   cs us sy id
 3  0  0 113340 877904   0   0   0  0   0   0   0  90   0   0   0     0     0     0   6   2  91
 0  0  0 113340 877368   0   0   0  0   0   0   0   0   0   0   0     0     0   247   1   0  99
 2  0  0 113340 876360   0   0   0  0   0   0   0 160   0   0   0     0     0   395   2   1  97
 0  0  0 113340 877468   0   0   0  0   0   0   0  92   0   0   0     0     0  3731  11   8  81

This is clearly a task for multiline.

You've coded some kind of statemachine, combining multiple lines to create an event. This works ok if you have only one source. But given you have multiple sources with content being intermixed (e.g. publish 2 log files at the same time), you will have contents being mixed up. Solution is to have a state machine per source (you need to define some kind of 'private key' to identify the source) or (somewhat simpler) use multiline to have one event being send to Logstash contain all data. You will have to redo some of your processing in Logstash, but I think using multiline in filebeat will simplify the task a little.

Btw. have you considered using metricbeat for collecting stats?

Yes. That is one event. I will try to add multiline in filebeat. Could you please elaborate on the solution by providing any example as I didn't understand what is state machine per source and what is private key.

We have been using filebeat for all the different types of logs like vmstat, netstat, threaddump and threadgc etc. I am not sure whether my team considered metricbeat or not.

uhm... I did mean "primary key" (In a sense of database like primary key on data), not "private key". As logstash is receiving events from multiple sources, you will need to uniquely identify the source, in order to do correct processing per source. This can be a mix of hostname and filename for example. On network/IO errors, events might even become re-ordered. Mixing multiple lines from multiple source without any ordering guarantees is why your usage of aggregate breaks.

By state-machine I mean your usage of map, event.set and event.get. As the execution of filters is per event, but you have to span computations on a many events, you basically end up with a state-machine, holding intermediate state (in this case map['time']). You actually need something like need map[id]['time'] (whatever id is) in logstash, to build multiple concurrent maps/state-machines. But this would need you to add even more logic for dealing with re-ordering and cleanup of no more used ids in your map.

By using multiline support in filebeat, you won't need the aggregate filter -> you won't end up with a self-written state-machine in logstash. Plus, potential retransmits of single lines on network errors will not break your processing.

@steffens, Thanks. one small correction, its not one event. It should be 4 events. Each event should have timestamp. I tried multiline but couldn't solve the problem. Could you please let me know is there any other solution as there are 4 events. Thanks.

I did mean one single event filebeat->Logstash. The first line contains the timestamp, the second line (CSV header) can be likely ignored and all other lines you will have to parse in logstash :slight_smile:

I'm no expert in logstash, but I can give it a try :slight_smile:

First using grok/dissect filters I would try to do some initial processing:

  • parse timestamp into '@timestamp' field
  • ignore line 2
  • capture all other lines in the message field.

Next the split filter can be used to split the message field into N events (one event per line). All other fields in the original event will be duplicated. => now you should have 4 events

Finally apply the CSV filter to parse the fields. You will have to configure the column names yourself, but the original column names are not the most readable ones anyway. Alternatively to defining the columns yourself, capture the CSV header in another field and reconstruct a message with CSV header after the split.

You are doing quite some work in Logstash here. If possible I'd try to simplify a setup as much as possible. Some information (like netstat) are already available via metricbeat. I'd suggest to evaluate metricbeat for some of your use-cases. Feel free to ask in forum about beats replacements/support for netstats/vmstat/... .

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.