Have ~ 50K files in single directory( ~200Gb ) of logs.
trying to process to parse and add them to elasticsearch
my config:
input {
file {
path => "/mnt/storage/*.txt"
sincedb_path => "/dev/null"
codec => "json_lines"
start_position => "beginning"
ignore_older => 123123123131
}
}
filter {
useragent {
source => "[req][headers][user-agent]"
target => "user-agent"
}
mutate {
rename => { "msg" => "message" }
convert => { # unify some field types
"level" => "string"
"page" => "string"
}
remove_field => [ "[f1][f1nested]", "[f2][f2-nested][f2nested-nested]", "metadata" ] #removes obsolete fileds, which should not goes even to s3 or elasticsearch
}
if [level] == "10" {
mutate {
update => { "level" => "trace" }
}
}
if [level] == "20" {
mutate {
update => { "level" => "debug" }
}
}
if [level] == "30" {
mutate {
update => { "level" => "info" }
}
}
if [level] == "40" {
mutate {
update => { "level" => "warn" }
}
}
if [level] == "50" {
mutate {
update => { "level" => "error" }
}
}
if [level] == "60" {
mutate {
update => { "level" => "fatal" }
}
}
clone { #clone to put original in s3
clones => ["details"]
}
if [type] != "details" { # simplify record for elasticsearch
if [some][some][some-name] == "some" {
drop { }
}
if [some][some][some-name] == "some" {
drop { }
}
ruby {
code => "event['fileId'] = event['version'] if event['version'].is_a?(String)"
}
mutate {
#de-neste some fields like:
rename => {
"[command][command]" => "myCommand"
"[command][channel_id]" => "myChannelId"
"[command][channel_name]" => "myChannelName"
"[email][address]" => "email"
......
}
remove_field => [ "command"......, "version", .... ]
}
ruby {
code => "event['email'] = '' if event['email'] and not event['email'].is_a?(String)"
}
}
}
output {
stdout { codec => dots }
if [type] != "details" {
# for debug
# file {
# path => "_out/elastic.log"
# }
elasticsearch{
hosts => "localhost:9200"
}
}
# if [type] == "details" {
# s3 {
# ......
# }
# }
}
running as
LS_HEAP_SIZE="15g" /opt/logstash/bin/logstash -f logstash.conf
after some time it is crashed with error:
Settings: Default pipeline workers: 8
Pipeline main started
java.lang.OutOfMemoryError: Java heap space
Dumping heap to /opt/logstash/heapdump.hprof ...
Unable to create /opt/logstash/heapdump.hprof: File exists
Pipeline main has been shutdown
stopping pipeline {:id=>"main"}
Error: Your application used more memory than the safety cap of 15G.
and Logs not even started pushed in elasticsearch
with single file all works fine
Where I'm wrong? does it any other way to parse big log files?