Trouble with aggregate filter?

Hi,I have a csv file contain 1000 rows ,it looks like this

Blockquote
ent_sort , Date , vehicle
enter, 2020-04-28 05:04:02 , c1
sortie , 2020-04-28 10:04:02 , c1
enter , 2020-04-28 05:04:02 , c3
sortie , 2020-04-28 10:04:02 , c2
enter , 2020-04-28 05:04:02 , c2
sortie , 2020-04-28 12:04:02 , c3
...........
Blockquote
i will try to explain my trouble and i really appreciate any help
so i want to change or group data like this
vehicle,date_in,date_out
c1,2020-04-28 05:04:02,2020-04-28 10:04:02
c2,2020-04-28 05:04:02,2020-04-28 10:04:02
......

Blockquote
My goal is to calculate difference of time for every vehicle so i need to filter these data like this and then use ruby filter to calculate time difference.
I try this code aggregate filter but it doesn't work .

Blockquote
if[ent_sort] == "enter"{
aggregate {
task_id => "%{vehicle}"
code => "
map['Date_of_entry'] = event.get('Date')
"
push_map_as_event_on_timeout => true
timeout_task_id_field => "vehicle"
timeout => 3 }}
else if [enter_sort] == "sortie"{
aggregate {
task_id => "%{vehicle}"
code => "
map['date_end'] = event.get('Date')
"
push_map_as_event_on_timeout => true
timeout_task_id_field => "vehicle"
timeout => 3 }
}

Please, please any ideas???

The aggregate filter can work, but it has some limits.
It requires to force 1 single worker.

On which version are you? Do you want to send the data to Elasticsearch?

I use version 7.6.0 , yes i want to send data to elasticsearch and i have two output elasticsearch and file and i did this two lines in logstash.yml
pipeline.workers: 1
pipeline.java_execution: false

   if [ent_sort] == "enter" {
     aggregate {
       task_id => "%{vehicle}"
       code => "map['date_in'] = event.get('Date'); "
       map_action => "create"
     }
   }

   if [ent_sort] == "sortie" {
     aggregate {
       task_id => "%{vehicle}"
       code => "event.set('date_out', event.get('Date')); event.set('date_in', map['date_in']);"
       map_action => "update"
       end_of_task => true
       timeout => 5
     }
   }

If you confirm this works, we can proceed further to write the Ruby code to calculate the time difference.

First I want to say thank you so much for your help , and now I tested it works! i really appreciate that!! thank you again!!

Perfect, we could even drop the Date field.

After the code above, let's add the following to convert the date strings in actual dates:

date {
  match => [ "date_in", "yyyy-MM-dd HH:mm:ss" ]
  target => "date_in"
  timezone => "Europe/Paris" # this is not mandatory, but otherwise dates are considered UTC
}
date {
  match => [ "date_out", "yyyy-MM-dd HH:mm:ss"]
  target => "date_out"
  timezone => "Europe/Paris" # this is not mandatory, but otherwise dates are considered UTC
}

Afterwards, to compute the difference:

ruby {
  code => "event['duration'] = event.get('date_out') - event.get('date_in')"
  remove_field => "Date"
}

The result should be in milliseconds.

Ok there is a problem with the date format.

Uhm your new problem is different and can be calculated on Elasticsearch, not in Logstash (well we could, but it is not necessary).

really i'm sorry if i bother you,if i understood correctly we can do that in elasticsearch and in logstash, i want to know how can i correct this code in logstash or do you mean using query in elasticsearch ?

I've managed to test this locally.

Replace the generator input by your existing CSV file.

Logstash pipeline

input {
    generator {
		lines => [
'ent_sort,Date,vehicle',
'enter,2020-04-28 05:04:02,c1',
'sortie,2020-04-28 10:04:02,c1',
'enter,2020-04-28 05:04:02,c3',
'sortie,2020-04-28 10:04:02,c2',
'enter,2020-04-28 05:04:02,c2',
'sortie,2020-04-28 12:04:02,c3'
]
		count => 1
	}
}

filter { 
    csv {
		autodetect_column_names => true
		source => message
	}
	mutate {
		strip => ["Date", "vehicle", "ent_sort"]
	}
	if [ent_sort] == "enter" {
		 aggregate {
		   task_id => "%{vehicle}"
		   code => "map['date_in'] = event.get('Date'); event.cancel()"
		   map_action => "create"
		 }
	}
	if [ent_sort] == "sortie" {
		 aggregate {
		   task_id => "%{vehicle}"
		   code => "event.set('date_out', event.get('Date')); event.set('date_in', map['date_in']);"
		   map_action => "update"
		   end_of_task => true
		   timeout => 5
		 }
	}
	date {
	  match => [ "date_in", "yyyy-MM-dd HH:mm:ss" ]
	  target => "date_in"
	  timezone => "Europe/Paris" # this is not mandatory, but otherwise dates are considered UTC
	}
	date {
	  match => [ "date_out", "yyyy-MM-dd HH:mm:ss"]
	  target => "date_out"
	  timezone => "Europe/Paris" # this is not mandatory, but otherwise dates are considered UTC
	}
	ruby {
	  code => "event.set('duration',event.get('date_out') - event.get('date_in'))"
	  remove_field => [ "Date", "message", "sequence", "@timestamp", "host" ]
	}
}

To be run with params pipeline.workers set to 1.

You will end up with those documents:

{
      "@version" => "1",
      "ent_sort" => "sortie",
      "date_out" => 2020-04-28T08:04:02.000Z,
      "duration" => 18000.0,
       "date_in" => 2020-04-28T03:04:02.000Z,
       "vehicle" => "c1"
}
{
      "@version" => "1",
      "ent_sort" => "sortie",
      "date_out" => 2020-04-28T08:04:02.000Z,
      "duration" => 18000.0,
       "date_in" => 2020-04-28T03:04:02.000Z,
       "vehicle" => "c2"
}
{
      "@version" => "1",
      "ent_sort" => "sortie",
      "date_out" => 2020-04-28T10:04:02.000Z,
      "duration" => 25200.0,
       "date_in" => 2020-04-28T03:04:02.000Z,
       "vehicle" => "c3"
}

Kibana visualization

You can use TSVB (Timeseries Builder) to display:

  • The total number of minutes per car in total
  • The number of parkings per car

Replace the generator input with your file input @Emna1.
I used a generator just to test this.

Yes, I've shared above how to calculate the:

  • number of time a vehicle enters&leaves
  • total duration per vehicle

It can be done in Elasticsearch if you build the visualization I've shared in the screenshot.

OK Thank you so much i will try to create this visualisation immediately, really i appreciate your help @Luca_Belluccini thank you again

1 Like

No problem!

Did you encounter any problem to build the visualization above?

Do you have the file CSV file name on each document in Elasticsearch?

If you have it, it should be possible to do what you need.

Can you execute the command GET nameoftheindex/_search and send here the response?

Hello @Emna1

The resulting document shows there are still problems in the pipeline.

Would it be possible to share the full pipeline you are using in Logstash?

Regarding computing sum of sums in Logstash: it is not possible except if we use the pipeline to pipeline pattern.
You cannot have multiple aggregation filters with a timeout (which is required to implement what you need) in the same pipeline.

If we manage to write to Elasticsearch the duration of every enter/exit of a car, the sum per file and total sum per car are feasible.

Blockquote

if we use the pipeline to pipeline pattern.
If we manage to write to Elasticsearch the duration of every enter/exit of a car, the sum per file and total sum per car are feasible.

Blockquote
can you explain more please ?

Using Logstash & Elasticsearch

Input files

cars.csv

ent_sort,Date,vehicle,
enter,2020-04-28 05:04:02,c1
sortie,2020-04-28 10:04:02,c1
enter,2020-04-28 05:04:02,c3
sortie,2020-04-28 10:04:02,c2
enter,2020-04-28 05:04:02,c2
sortie,2020-04-28 12:04:02,c3

cars - Copia.csv

ent_sort,Date,vehicle,
enter,2020-04-29 05:04:02,c1
sortie,2020-04-29 10:04:02,c1
enter,2020-04-29 05:04:02,c4
sortie,2020-04-29 10:04:02,c5
enter,2020-04-29 05:04:02,c5
sortie,2020-04-29 12:04:02,c4

cars - Copia - Copia.csv

ent_sort,Date,vehicle,
enter,2020-04-30 05:04:02,c1
sortie,2020-04-30 10:04:02,c1
enter,2020-04-30 05:04:02,c4
sortie,2020-04-30 10:04:02,c5
enter,2020-04-30 05:04:02,c5
sortie,2020-04-30 12:04:02,c4

Logstash pipeline

input {
    file {
		path => "Z:/Downloads/logstash-7.6.0/bin/*.csv"
		sincedb_path => "NUL"
		start_position => beginning
	}
}

filter { 
    csv {
		autodetect_column_names => true
		source => message
		skip_header => true
	}
	mutate {
		strip => ["Date", "vehicle", "ent_sort"]
		add_field => { "key" => "%{path}%{vehicle}" }
	}
	if [ent_sort] == "enter" {
		 aggregate {
		   task_id => "%{key}"
		   code => "map['date_in'] = event.get('Date'); event.cancel()"
		   map_action => "create"
		 }
	}
	if [ent_sort] == "sortie" {
		 aggregate {
		   task_id => "%{key}"
		   code => "event.set('date_out', event.get('Date')); event.set('date_in', map['date_in']);"
		   map_action => "update"
		   end_of_task => true
		   timeout => 5
		 }
	}
	date {
	  match => [ "date_in", "yyyy-MM-dd HH:mm:ss" ]
	  target => "date_in"
	  timezone => "Europe/Paris" # this is not mandatory, but otherwise dates are considered UTC
	}
	date {
	  match => [ "date_out", "yyyy-MM-dd HH:mm:ss"]
	  target => "date_out"
	  timezone => "Europe/Paris" # this is not mandatory, but otherwise dates are considered UTC
	}
	ruby {
	  code => "event.set('duration',event.get('date_out') - event.get('date_in'))"
	  remove_field => [ "Date", "message", "sequence", "ent_sort", "key", "@version", "@timestamp" ]
	}
}
output {
	stdout { codec => rubydebug }
	elasticsearch {
		hosts => [ "https:/..."]
		index => "vehicle"
		ilm_enabled => false
		user => elastic
		password => "..."
	}
}

Delete the destination index vehicle prior to run Logstash.

It requires to be ran with pipeline.workers=1.

Kibana visualizations

All the following visualizations are Table visualizations.

Sum of duration per file

Sum of duration per file and per vehicle

Sum of duration per vehicle, overall

Number of parkings per vehicle, overall

Number of parkins per file

Number of parkings per file per vehicle

Logstash-only solution

input {
    file {
		path => "Z:/Downloads/logstash-7.6.0/bin/*.csv"
		sincedb_path => "NUL"
		start_position => beginning
	}
}

filter { 
    csv {
		autodetect_column_names => true
		source => message
		skip_header => true
	}
	mutate {
		strip => ["Date", "vehicle", "ent_sort"]
		add_field => { "key" => "%{path}%{vehicle}" }
		add_field => { "globalkey" => "globalkey" }
	}
	if [ent_sort] == "enter" {
		 aggregate {
		   task_id => "%{key}"
		   code => "map['date_in'] = event.get('Date'); event.cancel()"
		   map_action => "create"
		 }
	}
	if [ent_sort] == "sortie" {
		 aggregate {
		   task_id => "%{key}"
		   code => "event.set('date_out', event.get('Date')); event.set('date_in', map['date_in']);"
		   map_action => "update"
		   end_of_task => true
		   timeout => 5
		 }
	}
	date {
	  match => [ "date_in", "yyyy-MM-dd HH:mm:ss" ]
	  target => "date_in"
	  timezone => "Europe/Paris" # this is not mandatory, but otherwise dates are considered UTC
	}
	date {
	  match => [ "date_out", "yyyy-MM-dd HH:mm:ss"]
	  target => "date_out"
	  timezone => "Europe/Paris" # this is not mandatory, but otherwise dates are considered UTC
	}
	ruby {
	  code => "event.set('duration',event.get('date_out') - event.get('date_in'))"
	  remove_field => [ "Date", "message", "sequence", "ent_sort", "@version", "@timestamp" ]
	}
	aggregate {
		task_id => "%{globalkey}"
		code => "
		 file = event.get('path')
		 vehicle = event.get('vehicle')
		 duration = event.get('duration')
		 map['totalDurationAllFiles'] ||= 0
		 logger.info('map[totalDurationAllFiles] += duration')
		 map['totalDurationAllFiles'] += duration
		 map[file] = { } unless map.has_key?(file)
		 map[file][vehicle] ||= 0
		 map[file][vehicle] += duration
		 map[file]['totalDurationPerFile'] ||= 0
		 logger.info('map[file][totalDurationPerFile] += duration')
		 map[file]['totalDurationPerFile'] += duration
		 event.cancel()
		"
		push_previous_map_as_event => true
		timeout => 30
	}
        mutate {
          remove_field => [ "@version", "@timestamp" ]
        }
}
output {
	stdout { codec => rubydebug }
}

Result (in stdout):

{
            "Z:/Downloads/logstash-7.6.0/bin/cars - Copia.csv" => {
                          "c4" => 25200.0,
                          "c1" => 18000.0,
        "totalDurationPerFile" => 61200.0,
                          "c5" => 18000.0
    },
    "Z:/Downloads/logstash-7.6.0/bin/cars - Copia - Copia.csv" => {
                          "c4" => 25200.0,
                          "c1" => 18000.0,
        "totalDurationPerFile" => 61200.0,
                          "c5" => 18000.0
    },
                    "Z:/Downloads/logstash-7.6.0/bin/cars.csv" => {
                          "c2" => 18000.0,
                          "c1" => 18000.0,
        "totalDurationPerFile" => 61200.0,
                          "c3" => 25200.0
    },
                                       "totalDurationAllFiles" => 183600.0
}

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.