Ingest not reading csv input as a single line


#1

Hi, I am trying to process a csv file which has multiple lines as follows using ingest pipeline.

Archive,Major Interval,DEV1,f328c29c-c695-11e5-addb-cc355a180000,CDBJVM,907047a8-5201-0000-0080-8e881e33280d,GetAdditionalAttributes,68aa61b2-5201-0000-0080-9c722b3eca55,,,,,2017-07-25,20:50:49.537407,2017-07-26 01:50:49.5374,2017-07-25,20:54:50.061352,2017-07-26 01:54:50.0613,0,0,0,0,0,0,0,0,9675,240516344,0,0,0,0,0,2,0,0,0,0,0,0,0,Anonymous

POST _ingest/pipeline/_simulate 
{
"pipeline": {
"description": "Ingest flow statistics",
"processors": [
  {
    "grok": {
      "field": "message",
      "patterns": [
       "%{DATA},%{DATA},%{DATA:broker},%{DATA},%{DATA:egname},%{DATA},%{DATA:flowname},%{DATA},%{DATA},%{DATA},%{DATA},%{DATA},%{DATA},%{DATA},%{DATA},%{DATA:enddate},%{DATA:endtime},%{DATA},%{DATA},%{DATA},%{DATA},%{DATA},%{DATA},%{DATA:cpu_time},%{DATA},%{DATA},%{DATA},%{DATA},%{DATA:input_messages},%{GREEDYDATA:extra}"
      ]
    }
  },
  {
    "convert": {
      "field": "cpu_time",
      "type": "integer"
    }
  },
  {
    "convert": {
      "field": "input_messages",
      "type": "integer"
    }
  },
   {
    "set": {
      "field": "timestamp",
       "value": "{{enddate}} {{endtime}}"
    }
  },
  {
    "date": {
      "field": "timestamp",
      "formats": [
        "yyyy-MM-dd HH:mm:ss.SSSSSS"
      ],
      "timezone": "America/Chicago"
    }
  },
  {
    "date_index_name": {
      "field": "@timestamp",
      "index_name_format": "yyyy.MM.dd",
      "index_name_prefix": "logstash-egstats-",
      "date_rounding": "d"
    }
  },
  {
    "remove": {
      "field": "timestamp"
    }
  },
  {
    "remove": {
      "field": "enddate"
    }
  },
  {
    "remove": {
      "field": "endtime"
    }
  },
  {
    "remove": {
      "field": "extra"
    }
  },
  {
    "set": {
      "field": "_type",
      "value": "egstats"
    }
  }
],
"on_failure": [
  {
    "set": {
      "field": "_index",
      "value": "failed-{{ _type }}"
    }
  },
  {
    "set": {
      "field": "error",
      "value": "{{ _ingest.on_failure_message }}"
    }
  }
]
},
"docs" : [
{
  "_score": 1,
  "_source": {
    "input_type": "log",
    "message": "Archive,Major Interval,DEV1,f328c29c-c695-11e5-addb-cc355a180000,CDBJVM,907047a8-5201-0000-0080-8e881e33280d,GetAdditionalAttributes,68aa61b2-5201-0000-0080-9c722b3eca55,,,,,2017-07-25,20:50:49.537407,2017-07-26 01:50:49.5374,2017-07-25,20:54:50.061352,2017-07-26 01:54:50.0613,0,0,0,0,0,0,0,0,9675,240516344,0,0,0,0,0,2,0,0,0,0,0,0,0,Anonymous"   
  }
}
]
}

This works fine. But when i create the pipeline and feed the csv file to the pipeline using filebeat, I get an error. Not sure why filebeat or my pipeline is not treating this as a single line.
Any help is appreciated.
Here's the error i get in

error:Provided Grok expressions do not match field value: [0,0,0,Anonymous]

Also attaching the filebeat config file

filebeat:
prospectors:
- input_type: log
  paths:
    - /tmp/*.csv
  encoding: plain
  fields_under_root: false
  exclude_lines: ["^Record"]
  scan_frequency: 10s
  harvester_buffer_size: 16384
  max_bytes: 10485760
  multiline.pattern: '^Anonymous'
  multiline.negate: true
  multiline.match: before
  pipeline: egstats`

I also tried without the multiline options but that didn't help.


#2

Guys, Any idea? I tried different options in my filebeat conf too. But doesn't seem to work.


(David Pilato) #3

I don't know. May be ask the question in #beats:filebeat forum?


#4

Thanks, David. I moved it to the forum you suggested. Hopefully I'll get some answers there.


(Steffen Siering) #5

Do you have some more sample logs? Just from configs and actual error message I can't tell much.

With your on_failure handler, you still have the original message? A message like 0,0,0,Anonymous will not match the grok pattern, as fields seem to be missing.

How/when are you writing your the CSV file. The multiline support has a configurable timeout (default 5s). With applications buffering output without a (or too big) flush timeout, partial lines might get pushed by filebeat. You can disable the timeout by setting multiline.timeout: 0.


#6

Hi Steffen, Sorry for the delayed response. Have been trying to put out some fires at work. here's a sampe of the log

Record Type,Record Code,Broker Name,Broker UUID,EG Name,EG UUID,Message Flow Name,Message Flow UUID,Application Name,Application UUID,Library Name,Library UUID,Record Start Date,Record Start Time,Record GMT Start Timestamp,Record End Date,Record End Time,Record GMT End Timestamp,Total Elapsed Time,Average Elapsed Time,Maximum Elapsed Time,Minimum Elapsed Time,Total CPU Time,Average CPU Time,Maximum CPU Time,Minimum CPU Time,CPU Time Waiting for Input Messages,Elapsed Time Waiting for Input Messages,Total Number of Input Messages,Total Size of Input Messages,Average Size of Input Messages,Maximum Size of Input Messages,Minimum Size of Input Messages,Number of Threads in Pool,Time Maximum Number of Threads reached,Total Number of MQ Errors,Total Number of Messages with Errors,Total Number of Errors Processing Messages,Total Number of Time Outs Waiting for Replies to Aggregate Messages,Total Number of Commits,Total Number of Backouts,Accounting Origin
Archive,Major Interval,DEV1,f328c29c-c695-11e5-addb-cc355a180000,CDBJVM,907047a8-5201-0000-0080-8e881e33280d,GetAdditionalAttributes,68aa61b2-5201-0000-0080-9c722b3eca55,,,,,2017-07-25,20:50:49.537407,2017-07-26 01:50:49.5374,2017-07-25,20:54:50.061352,2017-07-26 01:54:50.0613,0,0,0,0,0,0,0,0,9675,240516344,0,0,0,0,0,2,0,0,0,0,0,0,0,Anonymous

Now my understanding is that filebeat should treat it like a single line. The message in the failure index is 0,0,0,Anonymous. So it appears it's breaking the message. The csv log is written once every hour, and I am running a inotifywait script to remove all the double quotes from the file as soon as it is touched. Could that be a reason?


(Steffen Siering) #7

So you are editing the file in place, while another process is trying to read the contents? This indeed sounds like a potential race leading to invalid data.


#8

Probably. So,how do i work around this? My app generates these csv files with double quotes around each element. I am using inotifywait to remove these double quotes. Is there a way i can do this with filebeat itself?

"Record Type","Record Code","Broker Name","Broker UUID","EG Name","EG UUID","Message Flow Name","Message Flow UUID","Application Name","Application UUID","Library Name","Library UUID","Record Start Date","Record Start Time","Record GMT Start Timestamp","Record End Date","Record End Time","Record GMT End Timestamp","Total Elapsed Time","Average Elapsed Time","Maximum Elapsed Time","Minimum Elapsed Time","Total CPU Time","Average CPU Time","Maximum CPU Time","Minimum CPU Time","CPU Time Waiting for Input Messages","Elapsed Time Waiting for Input Messages","Total Number of Input Messages","Total Size of Input Messages","Average Size of Input Messages","Maximum Size of Input Messages","Minimum Size of Input Messages","Number of Threads in Pool","Time Maximum Number of Threads reached","Total Number of MQ Errors","Total Number of Messages with Errors","Total Number of Errors Processing Messages","Total Number of Time Outs Waiting for Replies to Aggregate Messages","Total Number of Commits","Total Number of Backouts","Accounting Origin"
"Archive","Major Interval","PROD1","03e4ab7e-486d-11e6-b474-a4790e420000","CISPS","dde1e7ea-5501-0000-0080-d792e1016297","EventNotification","ae04837c-3601-0000-0080-f64363966ca7","","","","","2017-08-06","01:14:47.611553","2017-08-06 06:14:47.6115","2017-08-06","02:10:28.012165","2017-08-06 07:10:28.0121","0","0","0","0","0","0","0","0","120797","3340281569","0","0","0","0","0","5","0","0","0","0","0","0","0","Anonymous"

This is a sample of the original file before inotifywait modifies it.


(Steffen Siering) #9

no, filebeat can not modify the lines. You can try to remove the quotes in our ingest pipeline using the script filter, right before the grok, or adapt the grok. Alternatively have your tool write to another file or push to some central logstash instance.


#10

Hi Steffen,

Can you give me an exmaple of how to remove the quotes using script filter,please? I tried a few ways but haven't had a lot of luck. One more thing i tried was point my filebeat to a different location and copy these modified files to that location manually. Even then, i saw the message being broken.

thanks.


(Steffen Siering) #11

painless is somehow inspired by groovy + you can use the Java APIs on common type. For strings you can use replace.

In this sample I'm using a mix of both solutions:

  1. Account for " in the grok pattern
  2. Use script to remove all " from the extra field, as the grok pattern captures the 'rest' of the input in extra
POST _ingest/pipeline/_simulate
{
  "pipeline": {
    "description": "Ingest flow statistics",
    "processors": [
      {
        "grok": {
          "field": "message",
          "patterns": [
            "\"%{DATA}\",\"%{DATA}\",\"%{DATA:broker}\",\"%{DATA}\",\"%{DATA:egname}\",\"%{DATA}\",\"%{DATA:flowname}\",\"%{DATA}\",\"%{DATA}\",\"%{DATA}\",\"%{DATA}\",\"%{DATA}\",\"%{DATA}\",\"%{DATA}\",\"%{DATA}\",\"%{DATA:enddate}\",\"%{DATA:endtime}\",\"%{DATA}\",\"%{DATA}\",\"%{DATA}\",\"%{DATA}\",\"%{DATA}\",\"%{DATA}\",\"%{DATA:cpu_time}\",\"%{DATA}\",\"%{DATA}\",\"%{DATA}\",\"%{DATA}\",\"%{DATA:input_messages}\",%{GREEDYDATA:extra}"
          ]
        },
        "script": {
          "lang": "painless",
          "inline": "ctx.extra = ctx.extra.replace(\"\\\"\", \"\")"
        }
      }
    ]
  },
  "docs" : [
    {
      "_score": 1,
      "_source": {
        "input_type": "log",
        "message": "\"Archive\",\"Major Interval\",\"DEV1\",\"f328c29c-c695-11e5-addb-cc355a180000\",\"CDBJVM\",\"907047a8-5201-0000-0080-8e881e33280d\",\"GetAdditionalAttributes\",\"68aa61b2-5201-0000-0080-9c722b3eca55\",\"\",\"\",\"\",\"\",\"2017-07-25\",\"20:50:49.537407\",\"2017-07-26 01:50:49.5374\",\"2017-07-25\",\"20:54:50.061352\",\"2017-07-26 01:54:50.0613\",\"0\",\"0\",\"0\",\"0\",\"0\",\"0\",\"0\",\"0\",\"9675\",\"240516344\",\"0\",\"0\",\"0\",\"0\",\"0\",\"2\",\"0\",\"0\",\"0\",\"0\",\"0\",\"0\",\"0\",\"Anonymous\""
      }
    }
  ]
}

One has to be somewhat careful with escaping, but it works:

{
  "docs": [
    {
      "doc": {
        "_index": "_index",
        "_type": "_type",
        "_id": "_id",
        "_source": {
          "cpu_time": "0",
          "input_messages": "0",
          "input_type": "log",
          "endtime": "20:54:50.061352",
          "message": """"Archive","Major Interval","DEV1","f328c29c-c695-11e5-addb-cc355a180000","CDBJVM","907047a8-5201-0000-0080-8e881e33280d","GetAdditionalAttributes","68aa61b2-5201-0000-0080-9c722b3eca55","","","","","2017-07-25","20:50:49.537407","2017-07-26 01:50:49.5374","2017-07-25","20:54:50.061352","2017-07-26 01:54:50.0613","0","0","0","0","0","0","0","0","9675","240516344","0","0","0","0","0","2","0","0","0","0","0","0","0","Anonymous"""",
          "broker": "DEV1",
          "enddate": "2017-07-25",
          "egname": "CDBJVM",
          "extra": "0,0,0,0,2,0,0,0,0,0,0,0,Anonymous",
          "flowname": "GetAdditionalAttributes"
        },
        "_ingest": {
          "timestamp": "2017-08-09T11:28:13.160Z"
        }
      }
    }
  ]
}

Alternatively you can run the script filter on message, before doing the grok.


(Steffen Siering) #12

about broken messages:
Did you try to disable the multiline.timeout as I proposed?

How exactly does your script work? How is the application writing the logs employs buffering when writing content. For how long is the file 'alive' when writing the log?

You use network shares are something alike?


#13

I tried setting multiline.timeout: 0 to disable the timeout. I tried the script processor and tried with that. I also added multiline patterns to treat the multiline as single line, but i still see my message breaking up and hence giving grok expressions error.
We are trying to do this with ingest instead of logstash to avoid having a middleman. I would eventually switch to logstash if this doesn't work.
So, my app writes these stats to the csv files every hour. And looks like it has a constant handle on the files. So, i would say the file is always alive. We are not using any network shares. I am writing this to the tmp directory. I have also stopped my intotifywait script from changing the files. So, now it's only my app and filebeat that has handle on the files.

FYI, this setup used to work correctly with logstash.


(Steffen Siering) #14

Which logstash config have you used?

Did you try with live system only? Remote 'debugging' via discuss is somewhat difficult at time :slight_smile: It would be very helpful to isolate the log-messages which get split up. Best from static log file which gets not written to anymore:

  1. point filebeat to static log file (from day/time-range an error has been reported by ES)
  2. check ES for errors still occuring with static log file
  3. if any errors did occur due to lines being split try to isolate the split log lines + a few line (3 or 5) before and after the event for testing and check again lines are still split (using console output)

Note, delete registry file between tests.

Having a small test sample I can test locally with filebeat and ingest node.


(system) #15

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.