One time batch processing big number of files

Have ~ 50K files in single directory( ~200Gb ) of logs.
trying to process to parse and add them to elasticsearch
my config:

input {
    file {
    path => "/mnt/storage/*.txt"
    sincedb_path => "/dev/null"
    codec => "json_lines"
    start_position => "beginning"
    ignore_older => 123123123131
      }
}

filter {
    useragent {
      source => "[req][headers][user-agent]"
      target => "user-agent"
    }
    mutate {
       rename => { "msg" => "message" }
       convert => { # unify some field types
        "level" => "string"
        "page" => "string"
         }
       remove_field => [ "[f1][f1nested]", "[f2][f2-nested][f2nested-nested]", "metadata" ] #removes obsolete fileds, which should not goes even to s3 or elasticsearch
    }


 if [level] == "10" {
          mutate {
              update => { "level" => "trace" }
                }
          }

    if [level] == "20" {
          mutate {
              update => { "level" => "debug" }
                }
          }
    if [level] == "30" {
          mutate {
              update => { "level" => "info" }
                }
          }
    if [level] == "40" {
          mutate {
              update => { "level" => "warn" }
                }
          }
    if [level] == "50" {
          mutate {
              update => { "level" => "error" }
                }
          }
    if [level] == "60" {
          mutate {
              update => { "level" => "fatal" }
                }
          }

  clone { #clone to put original in s3
    clones => ["details"]
  }

    if [type] != "details" { # simplify record for elasticsearch

      if [some][some][some-name] == "some" {
        drop { }
          }
      if  [some][some][some-name] == "some" {
        drop { }
          }
  
    ruby {        
        code => "event['fileId'] = event['version'] if event['version'].is_a?(String)"
    }


      mutate {
      #de-neste  some fields like:
       rename => { 
          "[command][command]" => "myCommand"
          "[command][channel_id]" => "myChannelId"
          "[command][channel_name]" => "myChannelName"
          "[email][address]" => "email"
             ...... 
          }
       remove_field => [  "command"......, "version", .... ]
      }

    ruby {        
        code => "event['email'] = '' if event['email'] and not event['email'].is_a?(String)"
    }


    }
}

output {
  stdout { codec => dots }
  if [type] != "details" {
  #   for debug
  #   file {
  #     path =>  "_out/elastic.log"   
  #  }
   elasticsearch{
      hosts => "localhost:9200"
    }
  }
 # if [type] == "details" {
 #  s3 {
 #     ......
 #   }  
 # }
}

running as

LS_HEAP_SIZE="15g" /opt/logstash/bin/logstash -f logstash.conf

after some time it is crashed with error:

Settings: Default pipeline workers: 8
Pipeline main started
java.lang.OutOfMemoryError: Java heap space
Dumping heap to /opt/logstash/heapdump.hprof ...
Unable to create /opt/logstash/heapdump.hprof: File exists
Pipeline main has been shutdown
stopping pipeline {:id=>"main"}
Error: Your application used more memory than the safety cap of 15G.

and Logs not even started pushed in elasticsearch

with single file all works fine

Where I'm wrong? does it any other way to parse big log files?

At a guess it's probably not handling the number of files, can you break things up a bit more on the input?

for test purpose - yes,
but big folders with huge amount of logs is usual for us

Which version of LS and the File input are you using? The most recent version of the file input and filewatch are better at handling these scenarios. Please bear in mind though, the file input was designed for tailing files - it tries its best to work for the read files case.

logstash 2.3.0, just recent installed.
Since I need file input as initial seed, i don't need to watch for files changes. my files is immutable

Does it any other ways to seed big data in LS? I tried get same data from S3 input, but after 2.5 hr
it still reads files list

S3 input: Found key {:key=>"full-log/ls.s3.ip-10-0-0-46.2016-01-14T23.07.part16152.txt", :level=>:debug, :file=>"logstash/inputs/s3.rb", :line=>"111", :method=>"list_new_files"}
S3 input: Adding to objects[] {:key=>"full-log/ls.s3.ip-10-0-0-46.2016-01-14T23.07.part16152.txt", :level=>:debug, :file=>"logstash/inputs/s3.rb", :line=>"116", :method=>"list_new_files"}

You should try the json codec? The file input is already line oriented. I think the JsonLines line buffer is filling up with concatenated "lines" because it is looking for a "\n" but the file input has taken them out already.

Wow!
Looks better now. Good to know about this behavior.

I also reduce close_older value and max_open_files to faster cycle among files
now config:

input {
    file {
        path => "/mnt/storage/*.txt"
        sincedb_path => "/tmp/sincedb"
        codec => "json"
        start_position => "beginning"
        ignore_older => 864000000
        close_older => 2
        max_open_files => 10
   }
}
output {
        stdout { codec => dots }
}

Perfect. I am glad to see you using close_older and max_open_files. Did you read my blog post about these changes? the evolving story of the file input

So some background on the codec mismatch, in LS there are three types of sources in the inputs 1) provides bytes 2) provides lines 3) provides protocol string; and there are various codecs that accept only one of these three. Unfortunately, we don't have a mechanism to establish when the input source to codec is mismatched. If we did, we could warn when file is used with json-lines.

Out of curiosity, how are your lines of JSON being generated? Nginx or Apache log formatting perhaps? If so, be aware that this technique can generate invalid JSON - some user supplied data can be incorrectly escaped (0xHH instead of u00HH).

Not seen your blog post yet, decide to use it myself.
Will read it now.

Glad to help, happy stashing :smiley:

My logs was generated with logstash s3 output with codec => "json_lines"
It was a "hardcopy" of all logs which was tried to push into elasticsearch.

Since our log records is very heterogeneous, many entries was failed while parsing in elasticsearch
To not lose anything We stored it also in S3.

Last days I added many normalization filters and tried to reparse S3 logs back :slight_smile:

Ahhhhh OK.