hello, I would like to know if it is possible, and if there is any filter that can put a time between events before sending them to elastic, for example: I have logs in elastic that are being ingested in the same second, I would like each document had a 30 second delay before being indexed. is there any way to do this?
There is nothing built into elasticsearch to delay ingest.... Most folks want the exact opposite ... index events as quick as possible.
You would need to account for this in the client/ log shipping side.
There is a throttle filter in logstash perhaps that will suit your needs
Try sleep plugin or you might use ruby code and sleep method to build something custom.
I tried using the sleep filter, but the documents were still indexed at the same second and thousandth, and I believe this is interfering with my alerts, as I noticed that when multiple logs arrive at the exact same second it doesn't alert all of them, only 1 of them
hello, could you give me an example of how to use the throatle plugin to do this function? in the case if several logs arrive in exactly the same second, it gives a time between them before sending to elastic, for example, there is a log arriving in the same second and thousandth, I would like each indexed document to have a time of at least 10 seconds before indexing.
How did you use the sleep
filter?
Also, did you set pipeline.workers
to 1
? Logstash per default will use one worker per CPU core, so if your logstash server has more than one CPU logstash will process multiple events at the same time. To try to achieve what you want you would need to use just one CPU.
Try to set pipeline.workers
to 1
and use the following sleep
filter:
sleep {
# Sleep 10 seconds for every event.
time => "10"
}
This would make Logstash emit an event and sleep for 10 seconds, but this can impact in your ingestion rate.
I'm curious, why you want to do that? If this is impacting in your alerts you need to try to fix on how you are alerting, not change the source events.
my pipeline workers is set to 3 at the moment, and I put the sleep filter in the same way you mentioned, I'm wanting to do that, because I'm using elastic security's detection and alert function to alert some specific queries in an index , but some logs are arriving at exactly the same second and thousandth and these alerts/logs that arrive at the same time are only being alerted one of them, for example 8 events arrived at the same time but only 1 was issued a notification (in this case I'm using the webhook as a means of notification), I don't know if this is a bug regarding the time of the logs that are arriving at the same time and it's not being able to send via webhook or something I'm doing wrong.
here is my pipeline settings at this moment.
in the pipeline I'm using 2 inputs and 2 outputs for different index
Can you share an example of those events and what triggers your alert?
I don't think you will be able to add a delay between your events, and you should not do that, you would be changing the time of the event and this can be misleading in some cases.
that's what trigger my alert.
and there's a sample off those events
{
"_index": "xxx-workloadsecurity-2022.09",
"_type": "_doc",
"_id": "vhH4LxxxMBGR_KuvzuGFWN",
"_version": 1,
"_score": null,
"fields": {
"deviceCustomNumber1.keyword": [
"266"
],
"deviceEventClassId.keyword": [
"4000000"
],
"cefVersion.keyword": [
"0"
],
"severity.keyword": [
"6"
],
"hostname": [
"xxxxxxxxxxxxxxx"
],
"TrendMicroDsTenantId.keyword": [
"xxxx"
],
"syslog_timestamp": [
"2022-09-12T04:24:04"
],
"deviceVersion.keyword": [
"xx.x.xxx"
],
"data_e_hora_do_evento": [
"2022-09-12T01:24:04.000Z"
],
"TrendMicroDsFileSHA1.keyword": [
"FB0F6C30839XXXXXXXXXXXXXXXXXXXx"
],
"deviceHostName": [
"example"
],
"deviceAction": [
"Quarantine"
],
"TrendMicroDsTenantId": [
"xxxxxx"
],
"result.keyword": [
"Quarantined"
],
"@version.keyword": [
"1"
],
"name.keyword": [
"TROJ_FRS.0NA103BF22"
],
"deviceProduct.keyword": [
"Deep Security Agent"
],
"deviceEventClassId": [
"4000000"
],
"tags": [
"workloadsecurity",
],
"client": [
"xxxx"
],
"port": [
xxxx
],
"filePath.keyword": [
"/local/xxxxxxxx/xxxxxxxxxxxx/cxxxxxxxx.mp4"
],
"message.keyword": [
"Realtime"
],
"cefVersion": [
"0"
],
"name": [
"TROJ_FRS.0NA103BF22"
],
"deviceCustomNumber1": [
"26"
],
"hostname.keyword": [
"xxxxxxxxxxxxxxxxxxxxxxxxx"
],
"deviceCustomNumber2": [
"3404"
],
"cliente.keyword": [
"xxxxxxxx"
],
"deviceVendor": [
"Trend Micro"
],
"tags.keyword": [
"workloadsecurity",
],
"syslog": [
"471 <134>2022-09-12T04:24:04Z xxxx"
],
"deviceCustomNumber2Label": [
"Quarantine File Size"
],
"result": [
"Quarantined"
],
"syslog_timestamp.keyword": [
"2022-09-12T04:24:04"
],
"deviceVendor.keyword": [
"Trend Micro"
],
"received_at.keyword": [
"2022-09-12T04:24:04Z"
],
"@version": [
"1"
],
"TrendMicroDsTenant.keyword": [
"599615299724"
],
"deviceProduct": [
"Deep Security Agent"
],
"deviceCustomNumber1Label": [
"Host ID"
],
"TrendMicroDsFileSHA1": [
"xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
],
"deviceAction.keyword": [
"Quarantine"
],
"severity": [
"6"
],
"deviceHostName.keyword": [
"example"
],
"TrendMicroDsTenant": [
"xxxxxxxxxx"
],
"filePath": [
"/local/xxxxxxxxx/xxxxxxxx/xxx.mp4"
],
"deviceCustomNumber2Label.keyword": [
"Quarantine File Size"
],
"deviceCustomNumber1Label.keyword": [
"Host ID"
],
"deviceVersion": [
"50.0.1395"
],
"message": [
"Realtime"
],
"@timestamp": [
"2022-09-12T04:32:11.809Z"
],
"syslog.keyword": [
"471 <134>2022-09-12T04:24:04Z xxxx"
],
"received_at": [
"2022-09-12T04:24:04Z"
],
"deviceCustomNumber2.keyword": [
"3404"
]
},
"highlight": {
"deviceAction.keyword": [
"@kibana-highlighted-field@Quarantine@/kibana-highlighted-field@"
],
"deviceAction": [
"@kibana-highlighted-field@Quarantine@/kibana-highlighted-field@"
],
"deviceCustomNumber2Label": [
"@kibana-highlighted-field@Quarantine@/kibana-highlighted-field@ File Size"
]
},
"sort": [
1662957131809
]
}
And what is the action of this alert?
Did you set it the Actions Frequency to On each rule execution ?
But I'm not sure if the Detection module will sent an alert for each match, I'm not using it yet because of some limitations on my use case, so I use an external tool.
Another thing is that your filter queries some events that may happen at the same time.
For example, the time logged from your antivirus software Quarantine and Delete could be the same because the precision is only in miliseconds if I'm not wrong.
Maybe you will need to create a rule for each one of those values.
yes it is defined in "each rule detection", I'm thinking that's the problem, it's not alerting all matches.
About the alert schedule, before it was every 2 minutes, I set it to 0 seconds to see if there was any difference, can you tell me which external tool you use at the moment?
I'm using ElastAlert2 for some alerts, but we are migrating everything to Kibana Alerts and the Detection Rules.
Since there are some limitations with both Kibana Alerts and Detection Rules, some alerts will be triggered by ElastAlert until Elastic improve the detection/alerting system.
I would suggest that you try to create a different rule for every value you are filtering.
For example a rule only for Delete, another one only for Quarantine etc.
See if it works as you expect.
This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.