I collect VPN logs through Logstash and index them in Elasticsearch, but I'm having the following problem:
For each unique VPN connection (represented by TunnelID), there should be only one tunnel-up event and one tunnel-down event. However, sometimes the firewall duplicates the logs, so it can happen that a VPN connection has more than two assigned logs, as in the example below:
@timestamp ConnectionDuration Fortinet Action TunnelID
Nov 19, 2023 @ 06:56:35.552 993 tunnel-down 759607548
Nov 19, 2023 @ 06:56:35.552 993 tunnel-down 759607548
Nov 19, 2023 @ 06:56:35.551 993 tunnel-down 759607548
Nov 19, 2023 @ 06:40:02.412 - tunnel-up 759607548
Nov 19, 2023 @ 06:40:01.417 - tunnel-up 759607548
Nov 19, 2023 @ 06:06:00.507 24410 tunnel-down 759607532
Nov 19, 2023 @ 06:06:00.507 24410 tunnel-down 759607532
Nov 18, 2023 @ 23:19:13.928 0 tunnel-up 759607532
Nov 18, 2023 @ 23:19:09.898 0 tunnel-up 759607532
Nov 18, 2023 @ 23:19:08.841 13268 tunnel-down 759607512
Nov 18, 2023 @ 23:19:08.841 13268 tunnel-down 759607512
Nov 18, 2023 @ 19:38:06.661 0 tunnel-up 759607512
Nov 18, 2023 @ 19:37:59.677 0 tunnel-up 759607512
I can use Logstash to avoid these "duplicates" so that they go to Elasticsearch as in the example below (for each unique TunnelID, there is only one tunnel-up event and one tunnel-down event)?
@timestamp ConnectionDuration Fortinet Action TunnelID
Nov 19, 2023 @ 06:56:35.551 993 tunnel-down 759607548
Nov 19, 2023 @ 06:40:01.417 - tunnel-up 759607548
Nov 19, 2023 @ 06:06:00.507 24410 tunnel-down 759607532
Nov 18, 2023 @ 23:19:09.898 0 tunnel-up 759607532
Nov 18, 2023 @ 23:19:08.841 13268 tunnel-down 759607512
Nov 18, 2023 @ 19:37:59.677 0 tunnel-up 759607512
You could use a fingerprint filter to generate a document_id, so duplicated log entries will be overwritten. Combine the date, tunnel id, and action to create the fingerprint.
Your source in the fingerprint filter is an array, if you want to use both fields as the source for the fingerprint you need to also set concatenate_sources to true.
I, like Leandro, am puzzled why you are using aggregate, but to answer this question.... when aggregate creates the event (push_map_as_event_on_timeout => true) it will create the [@timestamp] field from the map['@timestamp'] field, and that needs to be a LogStash::Timestamp, not a string. So changing this to
It was my mistake, I was reusing a pipeline that has some similar data. The tips from the gentlemen helped me and the problem was solved, thank you very much!
Apache, Apache Lucene, Apache Hadoop, Hadoop, HDFS and the yellow elephant
logo are trademarks of the
Apache Software Foundation
in the United States and/or other countries.