Using Logstash & Elasticsearch
Input files
cars.csv
ent_sort,Date,vehicle,
enter,2020-04-28 05:04:02,c1
sortie,2020-04-28 10:04:02,c1
enter,2020-04-28 05:04:02,c3
sortie,2020-04-28 10:04:02,c2
enter,2020-04-28 05:04:02,c2
sortie,2020-04-28 12:04:02,c3
cars - Copia.csv
ent_sort,Date,vehicle,
enter,2020-04-29 05:04:02,c1
sortie,2020-04-29 10:04:02,c1
enter,2020-04-29 05:04:02,c4
sortie,2020-04-29 10:04:02,c5
enter,2020-04-29 05:04:02,c5
sortie,2020-04-29 12:04:02,c4
cars - Copia - Copia.csv
ent_sort,Date,vehicle,
enter,2020-04-30 05:04:02,c1
sortie,2020-04-30 10:04:02,c1
enter,2020-04-30 05:04:02,c4
sortie,2020-04-30 10:04:02,c5
enter,2020-04-30 05:04:02,c5
sortie,2020-04-30 12:04:02,c4
Logstash pipeline
input {
file {
path => "Z:/Downloads/logstash-7.6.0/bin/*.csv"
sincedb_path => "NUL"
start_position => beginning
}
}
filter {
csv {
autodetect_column_names => true
source => message
skip_header => true
}
mutate {
strip => ["Date", "vehicle", "ent_sort"]
add_field => { "key" => "%{path}%{vehicle}" }
}
if [ent_sort] == "enter" {
aggregate {
task_id => "%{key}"
code => "map['date_in'] = event.get('Date'); event.cancel()"
map_action => "create"
}
}
if [ent_sort] == "sortie" {
aggregate {
task_id => "%{key}"
code => "event.set('date_out', event.get('Date')); event.set('date_in', map['date_in']);"
map_action => "update"
end_of_task => true
timeout => 5
}
}
date {
match => [ "date_in", "yyyy-MM-dd HH:mm:ss" ]
target => "date_in"
timezone => "Europe/Paris" # this is not mandatory, but otherwise dates are considered UTC
}
date {
match => [ "date_out", "yyyy-MM-dd HH:mm:ss"]
target => "date_out"
timezone => "Europe/Paris" # this is not mandatory, but otherwise dates are considered UTC
}
ruby {
code => "event.set('duration',event.get('date_out') - event.get('date_in'))"
remove_field => [ "Date", "message", "sequence", "ent_sort", "key", "@version", "@timestamp" ]
}
}
output {
stdout { codec => rubydebug }
elasticsearch {
hosts => [ "https:/..."]
index => "vehicle"
ilm_enabled => false
user => elastic
password => "..."
}
}
Delete the destination index vehicle
prior to run Logstash.
It requires to be ran with pipeline.workers=1
.
Kibana visualizations
All the following visualizations are Table visualizations.
Sum of duration per file
Sum of duration per file and per vehicle
Sum of duration per vehicle, overall
Number of parkings per vehicle, overall
Number of parkins per file
Number of parkings per file per vehicle
Logstash-only solution
input {
file {
path => "Z:/Downloads/logstash-7.6.0/bin/*.csv"
sincedb_path => "NUL"
start_position => beginning
}
}
filter {
csv {
autodetect_column_names => true
source => message
skip_header => true
}
mutate {
strip => ["Date", "vehicle", "ent_sort"]
add_field => { "key" => "%{path}%{vehicle}" }
add_field => { "globalkey" => "globalkey" }
}
if [ent_sort] == "enter" {
aggregate {
task_id => "%{key}"
code => "map['date_in'] = event.get('Date'); event.cancel()"
map_action => "create"
}
}
if [ent_sort] == "sortie" {
aggregate {
task_id => "%{key}"
code => "event.set('date_out', event.get('Date')); event.set('date_in', map['date_in']);"
map_action => "update"
end_of_task => true
timeout => 5
}
}
date {
match => [ "date_in", "yyyy-MM-dd HH:mm:ss" ]
target => "date_in"
timezone => "Europe/Paris" # this is not mandatory, but otherwise dates are considered UTC
}
date {
match => [ "date_out", "yyyy-MM-dd HH:mm:ss"]
target => "date_out"
timezone => "Europe/Paris" # this is not mandatory, but otherwise dates are considered UTC
}
ruby {
code => "event.set('duration',event.get('date_out') - event.get('date_in'))"
remove_field => [ "Date", "message", "sequence", "ent_sort", "@version", "@timestamp" ]
}
aggregate {
task_id => "%{globalkey}"
code => "
file = event.get('path')
vehicle = event.get('vehicle')
duration = event.get('duration')
map['totalDurationAllFiles'] ||= 0
logger.info('map[totalDurationAllFiles] += duration')
map['totalDurationAllFiles'] += duration
map[file] = { } unless map.has_key?(file)
map[file][vehicle] ||= 0
map[file][vehicle] += duration
map[file]['totalDurationPerFile'] ||= 0
logger.info('map[file][totalDurationPerFile] += duration')
map[file]['totalDurationPerFile'] += duration
event.cancel()
"
push_previous_map_as_event => true
timeout => 30
}
mutate {
remove_field => [ "@version", "@timestamp" ]
}
}
output {
stdout { codec => rubydebug }
}
Result (in stdout
):
{
"Z:/Downloads/logstash-7.6.0/bin/cars - Copia.csv" => {
"c4" => 25200.0,
"c1" => 18000.0,
"totalDurationPerFile" => 61200.0,
"c5" => 18000.0
},
"Z:/Downloads/logstash-7.6.0/bin/cars - Copia - Copia.csv" => {
"c4" => 25200.0,
"c1" => 18000.0,
"totalDurationPerFile" => 61200.0,
"c5" => 18000.0
},
"Z:/Downloads/logstash-7.6.0/bin/cars.csv" => {
"c2" => 18000.0,
"c1" => 18000.0,
"totalDurationPerFile" => 61200.0,
"c3" => 25200.0
},
"totalDurationAllFiles" => 183600.0
}