I mean, define two paths, which the files (csv or log) in one path have 9 columns and in other path the files have 11 columns, then apply two different groks, one for the file with 9 columns and other for the file with 11 columns...All in the same configuration file of course.
I tried two apply a grok file to both, getting parse failures (of course), so, in that case, i applied an if statement and build a new grok to it. It almost works, except the confusion with columns names. In the example a tried two different file (9 and 11 columns, in the same path, in this case). For convenience the files only have 1 row. Here it is:
input {
file {
path => "/data/Logs/csvfiles/allcols/.csv"
start_position => "beginning"
sincedb_path => "/dev/null"
}
}
filter {
grok {
match => {"message" => "^(?[^;]+);(?[^;]);(?[^;]);(?[^;]);(?<x_7>[^;]);(?<x_8>[^;]);(?<x_9>[^;]);(?<x_10>[^;]);(?<response_time>[^;])"
}
}
if "_grokparsefailure" in [tags] {
grok {
match => {"message" => "^(?[^;]);(?[^;]);(?[^;]);(?[^;]);(?[^;]);(?[^;]);(?<x_7>[^;]);(?<x_8>[^;]);(?<x_9>[^;]);(?<x_10>[^;]);(?<response_time>[^;])"
}
}
}
ruby{
code => "
event.set('date', event.get('date')[0..-2])
"
}
ruby {
code => "
event.set('filename', event.get('path').split('/').last)
event.set('app_name', event.get('filename').split('')[1])
event.set('host', event.get('filename').split('').first)
"
}
mutate {
remove_field => ["filename"]
convert => {
"x_10" => "float"
"response_time" => "float"
}
}
date {
match => [ "date", "yyyy-MM-dd HH:mm:ss.SSS" ]
timezone => "UTC"
target => "date"
}
}
output {
elasticsearch {
hosts => ["localhost"]
index => "csv9and11-%{+YYYY-MM-dd}"
}
stdout { codec => rubydebug }
}