Filebeat to Logstash - Error parsing csv

I have recently added Filebeat to our ELK stack to read logs and send the data to Logstash. Every so often, Logstash prints the following an "Error parsing csv" error. The result is that we get corrupt data in the database, because the csv fields are misaligned.

Before adding Filebeat, we were reading the logs using Logstash. It was slower and used more CPU, but we did not have these errors.

In the errors below, you can see the "Illegal quoting in line 1" bit. This is because the error always occurs in the middle of a string which is quoted, but the first quote is missing. And that is because for some reason Logstash appears to be receiving the message beginning somewhere in the string.

Thinking this is related to inode reuse, I have tried numerous Filebeat options like clean_* and close_* as shown below. Nothing has worked so far.

Here are some sample errors from Logstash

[2018-04-11T08:15:17,885][WARN ][logstash.filters.csv     ] Error parsing csv {:field=>"message", :source=>"lity\",10.4.2.65,64490,72.21.91.29,80,04/11/2018 08:14:41.715232,04/11/2018 08:14:41.767610,52,1418,\"HTTP\",606,980,980,606,4,3,3,4,1,1,1,3,3,3,0,0,0,0,0,0,0,0,0,0,0,5.000000,5.000000,5.000000,,,,,,,5.000000,5.000000,5.000000,0,13", :exception=>#<CSV::MalformedCSVError: Illegal quoting in line 1.>}
[2018-04-11T08:18:14,697][WARN ][logstash.filters.csv     ] Error parsing csv {:field=>"message", :source=>" Quality\",10.4.2.65,64570,64.4.54.254,443,04/11/2018 08:16:55.704939,04/11/2018 08:18:04.403762,68698,1454,\"HTTPS\",5806,5294,5294,5806,13,12,12,13,2011,4078,3561,109,268,228,0,0,0,0,0,0,0,0,0,0,0,5.000000,5.000000,5.000000,,,,,,,5.000000,5.000000,5.000000,0,13", :exception=>#<CSV::MalformedCSVError: Illegal quoting in line 1.>}

Filebeat is configured to watch around 10 different files. However, it is only this one harvester that we are having a problem with. This uniquely timestamped named file is generated every 5 seconds.

Here is a sample from this file:

04/11/2018 08:23:57,1,"Savvius worst of the worst",topn,all,1,"All Savvius",0,"","App Latency",10.4.2.65,64806,72.21.81.200,443,04/11/2018 08:23:10.917145,04/11/2018
08:23:10.946133,28,1454,"HTTPS",2630,22769,22769,2630,28,24,24,28,0,0,0,4,4,4,0,0,0,0,0,0,0,0,0,0,0,5.000000,5.000000,5.000000,,,,,,,5.000000,5.000000,5.000000,0,13
04/11/2018 08:23:57,1,"Savvius worst of the worst",topn,all,1,"All Savvius",0,"","App Latency",10.4.2.89,49201,10.8.1.56,9200,04/11/2018 08:23:46.618455,04/11/2018 0
8:23:53.625176,7006,1400,"TCP",78,0,0,0,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,5.000000,5.000000,5.000000,,,,,,,5.000000,5.000000,5.000000,14,13
04/11/2018 08:23:57,1,"Savvius worst of the worst",topn,all,1,"All Savvius",0,"","Net Latency",10.4.2.65,64806,72.21.81.200,443,04/11/2018 08:23:10.917145,04/11/2018
08:23:10.946133,28,1454,"HTTPS",2630,22769,22769,2630,28,24,24,28,0,0,0,4,4,4,0,0,0,0,0,0,0,0,0,0,0,5.000000,5.000000,5.000000,,,,,,,5.000000,5.000000,5.000000,0,13

The Filebeat config for this file looks like this:

- type: log
  paths:
    - "/var/lib/omni/data/streaming_analytics_conversations_*.csv"
  tags: [ "sv_all" ]
  ignore_older    : 1m
  clean_removed   : true
  scan_frequency  : 5s
  close_eof       : true

I really want to make Filebeat work, because it uses much less CPU to detect and read the files.

I am sure somebody will ask for the logstash config. Here it is:

filter {
        if "sv_all" in [tags] {
                mutate { replace => { "type" => "sv_all" }}
                csv {
                        columns => ["interval_ts","stream_id","stream_name","view_mode","state","network_id","network_name","application_id","application_name","stats_type",
                      "client_addr","client_port","server_addr","server_port","earliest_ts","latest_ts","duration","protospec_id","protospec",
                      "bytes_client","bytes_server","bytes_ingress","bytes_egress","packets_client","packets_server","packets_ingress",
                      "packets_egress","avg_app_latency","max_app_latency","app_latency_score","avg_net_latency","max_net_latency",
                      "net_latency_score","quality_counts","tcp_reset_conn_rejected_count","tcp_reset_conn_lost_count","tcp_retransmission_generic_count",
                      "tcp_retransmission_excessive_count","tcp_retransmission_fast_3ack_count","tcp_retransmission_fast_count","tcp_retransmission_slow_count",
                      "tcp_window_low_count","tcp_window_zero_count","tcp_repeated_syn_count","avg_tcp_quality","max_tcp_quality",
                      "quality_score","jitter_average","jitter_worst","packets_dropped","avg_voip_quality","max_voip_quality",
                      "low_mos_score","avg_conversation_quality","max_conversation_quality","conversation_quality_score",
                      "network_id_client","network_id_server"]
                        separator => ","
                        skip_empty_columns => true
                }
                if ([interval_ts] == "interval_ts") {
                        drop {}
                }
                date {
                        locale => "en"
                        match => ["interval_ts", "MM/dd/YYYY HH:mm:ss"]
                        target => "@timestamp"
                }
                grok {
                match => [ "server_addr", "^%{IP:server_addr}$" ]
                        overwrite => [ "server_addr" ]
                        add_tag => "gotIP"
                }
                if ([server_addr] and [client_addr]) {
                        if ([client_addr]  =~ /\./) {
                                mutate {
                                        add_field => { "myflow" => "%{client_addr}:%{client_port} - %{server_addr}:%{server_port}" }
                                }
                        } else {
                                mutate {
                                        add_field => { "myflow" => "[%{client_addr}]:%{client_port} - [%{server_addr}]:%{server_port}" }
                                }
                        }
                }

                if "gotIP" in [tags] {
                        mutate {
                                remove_tag => [ 'gotIP' ]
                        }
                        if [server_addr] !~ /^(10\.|192\.168\.|169\.254\.|172\.(1[6-9]|2[0-9]|3[01])|[a-fA-F0-9]{2}:)/ {
                                geoip {
                                source => "server_addr"
                                target => "geoip"
                                database => "/var/lib/omni/MaxMind-DB.mmdb"
                                add_field => [ "[geoip][coordinates]", "%{[geoip][longitude]}" ]
                                add_field => [ "[geoip][coordinates]", "%{[geoip][latitude]}"  ]
                        }
                                mutate {
                                        convert => {
                                                "[geoip][coordinates]" => "float"
                                        }
                                }
                        dns {
                                reverse => [ "server_addr" ]
                                action => [ "replace" ]
                                    failed_cache_size => 1000
                                        failed_cache_ttl => 3600
                                        hit_cache_size => 1000
                                        hit_cache_ttl => 3600
                                }
                        }
                }
        }

I have now configured filebeat to save output to a file. I can see in the messages starting at the wrong offset.

In this event, the message starts with a date, which they all should:

filebeat.1:{"@timestamp":"2018-04-11T16:01:18.000Z","beat":{"hostname":"ultra"},"message":"04/11/2018 09:01:17,1,"Savvius worst of the worst",topn,all,1,"All Savvius",0,"","App Latency",10.8.1.40,54958,10.4.2.38,8443,04/11/2018 09:00:16.844862,04/11/2018 09:01:09.700353,52855,1400,"TCP",34247,2076979,0,0,404,1471,0,0,107,1137,880,10,39,32,0,0,0,0,0,0,0,0,0,0,0,5.000000,5.000000,5.000000,,,,,,,5.000000,5.000000,5.000000,13,14","tags":["sv_all"],"type":"log"}

In this event, the message starts in the middle of a quoted string, which is wrong:

filebeat.1:{"@timestamp":"2018-04-11T16:01:12.997Z","beat":{"hostname":"ultra"},"message":"ll Savvius",0,"","Conversation Quality",10.8.1.40,54958,10.4.2.38,8443,04/11/2018 09:00:16.844862,04/11/2018 09:01:09.659118,52814,1400,"TCP",34183,2076979,0,0,403,1471,0,0,107,1137,880,10,39,32,0,0,0,0,0,0,0,0,0,0,0,5.000000,5.000000,5.000000,,,,,,,5.000000,5.000000,5.000000,13,14","tags":["sv_all"],"type":"log"}

This leads me to believe the problem is Filebeat, not Logstash.

It does sounds like an inode reuse issue. Can you tell us more about how / when the streaming_analytics_conversations_*.csv files are deleted/rotated.

If you enable debug logging you should be able to see if Filebeat thinks that the file is new or if it thinks it is resuming on a pre-existing file.

logging.level: debug
logging.selectors: [prospector, harvester]

Thanks for responding Andrew. The csv files are created in the same directory. A new file is created every 5 seconds. The number of files to keep is currently set to 30 files.
When 30 files has been reached, the oldest file will be removed before the new one is created. When it is set to 10, the problem is worse.

Here is my current Filebeat config. With the increase to 30 files, and this config, we did not see the problem for a few days, but it just happened again recently.

  • type: log
    paths:
    • "/var/lib/omni/data/streaming_analytics_conversations_*.csv"
      tags : [ "sv_all" ]
      exclude_lines : [ '^interval_ts' ]
      encoding : plain
      clean_removed : true
      clean_inactive : 1m
      ignore_older : 30s
      close_older : 30s
      close_timeout : 30s
      scan_frequency : 5s
      close_eof : true

Before we started using Filebeat, we were using Logstash to read the files. We had a similar problem, which we fixed by hashing the inode with the filepath and filename. I wonder if anyone has a patch to do that for Filebeat?

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.