Error parsing csv when files are being added

I'm copying log files generated on dba server on my linux machine (where logstash instance is) with crontab job every day at 06:30. Files are during generation being written each minute, which means that if generation is 30 minutes long, it will generate 30 files. Once in a while I get error and some files are missing in ES index. It happens around 06:30 when I start the copy operation with crontab. Firstly I thought it was some problem relate to NFS, but I guess it's the problem because Logstash read lines faster then all lines are being written into file.

Here's my conf:

#live import from files
input{
    file{
        id => "import_local_files"
        path => "/elasticsearch/logs/365_logs_*"
        start_position => "beginning"
        sincedb_path => "/etc/logstash/sincedb_local"
        file_completed_action => "log"
        file_completed_log_path => "/etc/logstash/local_completed"
    }
}

filter {
   csv {
        ... # bunch of columns defined
        }
    }

    grok {
      match => [
         "book_date", "(?<index_year>%{YEAR})"
      ]
    }

    mutate {
        add_field => {
            "[@metadata][index_year]" => "%{index_year}"
        }
    }

    mutate {
        remove_field => [ "index_year", "@version", "message", "@timestamp", "host", "path"]
    }
}
output{
    elasticsearch {
        hosts => ["https://localhost:9200"]
        user => "XXXXX"
        password => "XXXXXX"
        ssl => true
        #ssl_certificate_verification => false
        cacert => '/etc/logstash/RootCAv4.cer.pem'
        index => "logs_%{[@metadata][index_year]}"
        routing => "%{account_id}"
        document_id => "%{order_id}"
    }
stdout { codec => rubydebug }
}

I'm running Logstash all the time with systemd and "waiting" for new files -> pushing them to ES (near real time is desirable). Once and then, I see errors because logstash is not reading files properly (first error occurs in 06:30:18, and crontab is starting at 06:30) so I'm guessing it's because files are not completely copied.

[2019-11-30T06:30:18,690][WARN ][logstash.outputs.elasticsearch] Could not index event to Elasticsearch. {:status=>400, :action=>["index", {:_id=>"00", :_index=>"logs_9500", :_type=>"_doc", :routing=>"%{account_id}"}, #LogStash::Event:0x3f361281], :response=>{"index"=>{"_index"=>"logs_9500", "_type"=>"_doc", "_id"=>"00", "status"=>400, "error"=>{"type"=>"mapper_parsing_exception", "reason"=>"failed to parse field [book_date] of type [date] in document with id '00'. Preview of field's value: '95000'", "caused_by"=>{"type"=>"illegal_argument_exception", "reason"=>"failed to parse date field [95000] with format [strict_year_month_day]", "caused_by"=>{"type"=>"date_time_parse_exception", "reason"=>"Text '95000' could not be parsed at index 0"}}}}}}

There's no problem with files and filters in logstash, beacuse when I run another logstash job couple of minutes later (after all files are copied and not been changed), all documents are stored in ES (each time).

Is there any recommended approach for this, because even since_db file and my folder looks the same (same number of lines).. maybe I can add some another parameter to this. Can i force logstash to wait for XX seconds after file appears (because we create new file each minute) to start reading from it? Why LS can't process file if he sees it has been changed in size after some time? Is there any fix for Logstash while reading files that are not closed (not being changed in size / lines).

my folder:

-rw-r--r--. 1 database dba       23001 Nov 30 00:06 365_logs_2019-11-30.00:06.log
-rw-r--r--. 1 database dba     3215012 Nov 30 00:08 365_logs_2019-11-30.00:07.log
-rw-r--r--. 1 database dba     3224950 Nov 30 00:08 365_logs_2019-11-30.00:08.log
-rw-r--r--. 1 database dba     3187276 Nov 30 00:09 365_logs_2019-11-30.00:09.log
-rw-r--r--. 1 database dba     3095002 Nov 30 00:10 365_logs_2019-11-30.00:10.log
-rw-r--r--. 1 database dba     3176401 Nov 30 00:11 365_logs_2019-11-30.00:11.log
-rw-r--r--. 1 database dba     3173608 Nov 30 00:12 365_logs_2019-11-30.00:12.log
-rw-r--r--. 1 database dba     3055775 Nov 30 00:13 365_logs_2019-11-30.00:13.log
-rw-r--r--. 1 database dba     3174441 Nov 30 00:14 365_logs_2019-11-30.00:14.log
-rw-r--r--. 1 database dba     3019499 Nov 30 00:15 365_logs_2019-11-30.00:15.log
-rw-r--r--. 1 database dba     3143938 Nov 30 00:16 365_logs_2019-11-30.00:16.log
-rw-r--r--. 1 database dba     3258992 Nov 30 00:17 365_logs_2019-11-30.00:17.log
-rw-r--r--. 1 database dba     2996001 Nov 30 00:18 365_logs_2019-11-30.00:18.log
-rw-r--r--. 1 database dba     3073479 Nov 30 00:20 365_logs_2019-11-30.00:19.log
-rw-r--r--. 1 database dba     3116066 Nov 30 00:20 365_logs_2019-11-30.00:20.log
-rw-r--r--. 1 database dba     3045814 Nov 30 00:21 365_logs_2019-11-30.00:21.log
-rw-r--r--. 1 database dba     3201033 Nov 30 00:22 365_logs_2019-11-30.00:22.log
-rw-r--r--. 1 database dba     3065749 Nov 30 00:23 365_logs_2019-11-30.00:23.log
-rw-r--r--. 1 database dba     2907330 Nov 30 00:24 365_logs_2019-11-30.00:24.log
-rw-r--r--. 1 database dba     3084664 Nov 30 00:26 365_logs_2019-11-30.00:25.log
-rw-r--r--. 1 database dba     2796782 Nov 30 00:27 365_logs_2019-11-30.00:26.log
-rw-r--r--. 1 database dba     2846058 Nov 30 00:27 365_logs_2019-11-30.00:27.log
-rw-r--r--. 1 database dba     2898921 Nov 30 00:28 365_logs_2019-11-30.00:28.log
-rw-r--r--. 1 database dba     2986992 Nov 30 00:29 365_logs_2019-11-30.00:29.log
-rw-r--r--. 1 database dba     2968226 Nov 30 00:30 365_logs_2019-11-30.00:30.log
-rw-r--r--. 1 database dba     3575251 Nov 30 00:31 365_logs_2019-11-30.00:31.log

sincedb file:

6442507375 0 66305 23001 1575092104.275624 /elasticsearch/logs/365_logs_2019-11-30.00:06.log
6442507377 0 66305 3215012 1575092114.392039 /elasticsearch/logs/365_logs_2019-11-30.00:07.log
6442507378 0 66305 3224950 1575092124.6893718 /elasticsearch/logs/365_logs_2019-11-30.00:08.log
6442507380 0 66305 3187276 1575092133.685711 /elasticsearch/logs/365_logs_2019-11-30.00:09.log
6442507382 0 66305 3095002 1575092143.5489252 /elasticsearch/logs/365_logs_2019-11-30.00:10.log
6442507383 0 66305 3176401 1575092153.341393 /elasticsearch/logs/365_logs_2019-11-30.00:11.log
6442507384 0 66305 3173608 1575092164.973843 /elasticsearch/logs/365_logs_2019-11-30.00:12.log
6442507385 0 66305 3055775 1575092172.5179899 /elasticsearch/logs/365_logs_2019-11-30.00:13.log
6442507386 0 66305 3174441 1575092183.323487 /elasticsearch/logs/365_logs_2019-11-30.00:14.log
6442507387 0 66305 3019499 1575092191.834973 /elasticsearch/logs/365_logs_2019-11-30.00:15.log
6442507388 0 66305 3143938 1575092200.976298 /elasticsearch/logs/365_logs_2019-11-30.00:16.log
6442507389 0 66305 3258992 1575092210.893957 /elasticsearch/logs/365_logs_2019-11-30.00:17.log
6442507390 0 66305 2996001 1575092220.4364972 /elasticsearch/logs/365_logs_2019-11-30.00:18.log
6442507391 0 66305 3073479 1575092229.5110168 /elasticsearch/logs/365_logs_2019-11-30.00:19.log
6442507392 0 66305 3116066 1575092241.122341 /elasticsearch/logs/365_logs_2019-11-30.00:20.log
6442507393 0 66305 3045814 1575092248.858644 /elasticsearch/logs/365_logs_2019-11-30.00:21.log
6442507394 0 66305 3201033 1575092258.71389 /elasticsearch/logs/365_logs_2019-11-30.00:22.log
6442507395 0 66305 3065749 1575092267.679543 /elasticsearch/logs/365_logs_2019-11-30.00:23.log
6442507396 0 66305 2907330 1575092276.6309352 /elasticsearch/logs/365_logs_2019-11-30.00:24.log
6442507397 0 66305 3084664 1575092286.4521968 /elasticsearch/logs/365_logs_2019-11-30.00:25.log
6442507400 0 66305 2796782 1575092294.5444448 /elasticsearch/logs/365_logs_2019-11-30.00:26.log
6442507408 0 66305 2846058 1575092303.4759889 /elasticsearch/logs/365_logs_2019-11-30.00:27.log
6442507410 0 66305 2898921 1575092312.528367 /elasticsearch/logs/365_logs_2019-11-30.00:28.log
6442507411 0 66305 2986992 1575092321.4431732 /elasticsearch/logs/365_logs_2019-11-30.00:29.log
6442507414 0 66305 2968226 1575092330.679982 /elasticsearch/logs/365_logs_2019-11-30.00:30.log
6442507416 0 66305 3575251 1575092341.254118 /elasticsearch/logs/365_logs_2019-11-30.00:31.log

Persistent queue might be helpful here?

Ok, how about next question? Can I define for logstash to wait on files that are being written to. E.g. file is created and it's changing in size (number of lines) for a 20 seconds. Can Logstash wait for N seconds before start of reading file. If file isn't changed for 15 seconds for example, then start reading, if it is changing in size, wait.. If you know what I mean :slight_smile: