That would explain most of it...
We can fix timezones later...
I still do not see the user getting parsed ... show me everything
That would explain most of it...
We can fix timezones later...
I still do not see the user getting parsed ... show me everything
right but how sure of the logic are you
Show me everything.... (anonymized) I may not get back to this until later
HI @stephenb,
I tought I could be able to add attachements to a reply, but it seems I can't.
I've uploaded the files into my Onedrive.
Here is the link (read only bnut you could be able to download them):
https://usherbrooke-my.sharepoint.com/:f:/g/personal/quiy2001_usherbrooke_ca/EsdHUJkmQ-VInIbPnoXjLzkBVLgcpM-MeLIeSewhW2WiDA?e=ROCKvU
When looking at the 10-pipeline.conf file, the logic for this particular data will starts at line 535. All stuff above is not related to this data.
Please let me know if that work or not.
Thanks
Yanick
So I just ran
fsci-secure.log -> Filebeat (Your System Module with Auth) -> Logstash (your conf) -> Elasticsearch
To be transparent in your logstash conf I commented out the APACHE processing because you must have custom patterns some where and I commented out the file output section that writes to the root
directory
But the output section looks like this... which is the most important
#SC
is StephenComment
if "fileonly" not in [tags] {
stdout {}
elasticsearch {
hosts => [ "http://localhost:9200" ]
#SC hosts => [ "${coordinator_node_0}", "${coordinator_node_1}" ]
#SC ssl => true
#SC cacert => "/etc/logstash/certs/UdeS-Chain-Base64.crt"
#SC sniffing => false
manage_template => false
#index => "%{[@metadata][target_index]}-%{+YYYY.MM.dd}"
index => "%{[@metadata][target_index]}-%{[@metadata][index_time]}"
pipeline => "%{[@metadata][target_pipeline]}"
#pipeline => "filebeat-8.6.2-system-auth-pipeline"
#SC user => "logstash_user"
#SC password => "${logstash_user_pwd}"
}
}
Here is what I found... first you need to set this setting (there is a really long conversation hidden here) otherwise logstash will create and event.original
fields which can mess up the ingest pipeline parsing.
set in your logstash.yml
pipeline.ecs_compatibility: disabled
Even with doing all this only some of the users are parse and they are probably not the ones you think
I made sure all 10 log lines were unique with time and process ids here is my test set
Sep 14 18:00:11 dinf-miro sshd[121966]: Received disconnect from 10.3.1.2 port 33678:11: disconnected by user
Sep 14 18:00:12 dinf-miro sshd[121967]: Disconnected from user myuser 10.3.1.2 port 33678
Sep 14 18:00:13 dinf-miro sshd[121888]: pam_unix(sshd:session): session closed for user myuser
Sep 14 18:00:14 dinf-miro systemd-logind[1091]: Session 142 logged out. Waiting for processes to exit.
Sep 14 18:00:15 dinf-miro systemd-logind[1091]: Removed session 142.
Sep 14 18:00:16 dinf-miro sshd[122208]: pam_unix(sshd:auth): authentication failure; logname= uid=0 euid=0 tty=ssh ruser= rhost=10.3.1.2 user=myuser
Sep 14 18:00:17 dinf-miro sshd[122209]: pam_sss(sshd:auth): authentication success; logname= uid=0 euid=0 tty=ssh ruser= rhost=10.3.1.2 user=myuser
Sep 14 18:00:18 dinf-miro sshd[122210]: Accepted password for myuser from 10.3.1.2 port 33732 ssh2
Sep 14 18:00:19 dinf-miro sshd[122211]: pam_unix(sshd:session): session opened for user myuser by (uid=0)
Sep 14 18:00:20 dinf-miro systemd-logind[1091]: New session 143 of user myuser.
No you can see by the output that only the lines with the for myuser
get parsed and the ones with user=myuser
do not... that is not to say you can not parse them just they are not by the current pipeline
Only the logs with seconds :13, :18 and :19 get the user
parsed.... not the other ones
Now then I ran
fsci-secure.log -> Filebeat (Filestream Input with Pipelien) -> Logstash (your conf) -> Elasticsearch
Same Exact Data Set
Disabled system module
with this with the as the filestream input
# Auth LOGS
- type: filestream
# Change to true to enable this input configuration.
enabled: true
pipeline: filebeat-8.6.2-system-auth-pipeline
# Paths that should be crawled and fetched. Glob based paths.
paths:
- /Users/sbrown/workspace/sample-data/discuss/syslog-pipeline/fsci-secure.log
index: 'filebeat-auth-8.6.2-sys-linux'
And I got the exact same result same records were parsed exactly the same...
So tell me what you can reproduce....
Hi @stephenb,
First I already have the parameter pipeline.ecs_compatibility: disabled
sets into my logstash.yml
file.
On each of my pipelines, I always make a copy of [message]
into [event][original]
using mutate
.
However, I'm currently struggling with [event][original]
. There is a very strange behaviour, and since this is not part of my current request, I will just replace [event]original]
by [event][toto]
which doesn't cause me any errors and I can see the original message
field:
mutate {
copy => {
"message" => "[event][toto]"
}
}
filestream
input:
Using this input line from the syslog server:
Sep 15 11:56:18 dinf-miro sshd[164528]: Accepted password for myuser from 10.3.1.2 port 39100 ssh2
The result after it has been ingested will be:
GET filebeat-fsci2-8.6.2-sys-linux-2023.09.15/_doc/Dh-OmYoBiKBXXA2etYxs
{
"_index": "filebeat-fsci2-8.6.2-sys-linux-2023.09.15",
"_id": "Dh-OmYoBiKBXXA2etYxs",
"_version": 1,
"_seq_no": 5858,
"_primary_term": 1,
"found": true,
"_source": {
"agent": {
"name": "syslogf.domain.com",
"id": "7f080758-7e4f-4d73-9109-2a05b9e1a0b7",
"type": "filebeat",
"ephemeral_id": "058777ab-a825-4901-9380-8d51203b8986",
"version": "8.6.2"
},
"process": {
"name": "sshd",
"pid": 164528
},
"log": {
"file": {
"path": "/var/log/facultes/FSCI/fsci-secure.log"
},
"offset": 14600
},
"source": {
"port": 39100,
"ip": "10.3.1.2"
},
"Logstash.Hostname": "logstash-dev03.domain.com",
"tags": [
"audit",
"syslog",
"FSCI",
"auth",
"fileoutput",
"beats_input_codec_plain_applied"
],
"input": {
"type": "filestream"
},
"@timestamp": "2023-09-15T11:56:18.000Z",
"system": {
"auth": {
"ssh": {
"method": "password",
"event": "Accepted"
}
}
},
"ecs": {
"version": "8.0.0"
},
"related": {
"hosts": [
"dinf-miro"
],
"ip": [
"10.3.1.2"
],
"user": [
"myuser"
]
},
"host": {
"hostname": "dinf-miro",
"name": "syslogf.domain.com"
},
"@version": "1",
"event": {
"toto": "Sep 15 11:56:18 dinf-miro sshd[164528]: Accepted password for myuser from 10.3.1.2 port 39100 ssh2",
"ingested": "2023-09-15T15:56:21.222470803Z",
"kind": "event",
"action": "ssh_login",
"type": [
"info"
],
"category": [
"authentication",
"session"
],
"outcome": "success"
},
"user": {
"name": "myuser"
}
}
}
Note that the field [message]
has been removed (not sure why). Also, note that field [event][toto]
is present and contains the original message field value. We can also see that the fields has been properly parsed!
I think the issue I had before was the "- parsers
" I added to my filebeat config file. The only issue I have now, and I don't know why, is the timezone. You can see the timezone is correct into this output:
"@timestamp": "2023-09-15T11:56:18.000Z"
But in Kibana, it is four hour behind real time:
When the option - parsers
in filebeat.yml
, the timestamp was correctly parsed.
module system
With the module system
enabled (and filestream
disabled), using the following input line:
Sep 15 12:27:17 dinf-miro sshd[165567]: Accepted password for myuser from 10.3.1.2 port 39260 ssh2
The result after it has been ingested will be:
GET filebeat-fsci-8.6.2-sys-linux-2023.09.15/_doc/K4ermYoBiKBXXA2eD1mo
{
"_index": "filebeat-fsci-8.6.2-sys-linux-2023.09.15",
"_id": "K4ermYoBiKBXXA2eD1mo",
"_version": 1,
"_seq_no": 3730,
"_primary_term": 1,
"found": true,
"_source": {
"agent": {
"name": "syslogf.sti.usherbrooke.ca",
"id": "7f080758-7e4f-4d73-9109-2a05b9e1a0b7",
"type": "filebeat",
"ephemeral_id": "6bb67c55-5cc3-4514-b0f6-cdddd4528ad4",
"version": "8.6.2"
},
"process": {
"name": "sshd",
"pid": 165567
},
"log": {
"file": {
"path": "/var/log/facultes/FSCI/fsci-secure.log"
},
"offset": 18261
},
"source": {
"port": 39260,
"ip": "10.32.104.212"
},
"fileset": {
"name": "auth"
},
"Logstash.Hostname": "logstash-dev03.elk.usherbrooke.ca",
"tags": [
"audit",
"syslog",
"fileoutput",
"beats_input_codec_plain_applied"
],
"input": {
"type": "log"
},
"@timestamp": "2023-09-15T12:27:17.000-04:00",
"system": {
"auth": {
"ssh": {
"method": "password",
"event": "Accepted"
}
}
},
"ecs": {
"version": "8.0.0"
},
"related": {
"hosts": [
"dinf-miro"
],
"ip": [
"10.3.1.2"
],
"user": [
"myuser"
]
},
"service": {
"type": "system"
},
"host": {
"hostname": "dinf-miro",
"name": "syslogf.domain.com"
},
"@version": "1",
"event": {
"toto": "Sep 15 12:27:17 dinf-miro sshd[165567]: Accepted password for myuser from 10.3.1.2 port 39260 ssh2",
"ingested": "2023-09-15T16:27:19.334642719Z",
"timezone": "-04:00",
"kind": "event",
"module": "system",
"action": "ssh_login",
"type": [
"info"
],
"category": [
"authentication",
"session"
],
"dataset": "system.auth",
"outcome": "success"
},
"user": {
"name": "myuser"
}
}
}
This result shows almost the same as the filestream
!
I will still need to play into the logstash to make the parsing better. For example, solving that issue we was talking about:
No you can see by the output that only the lines with the
for myuser
get parsed and the ones withuser=myuser
do not... that is not to say you can not parse them just they are not by the current pipeline
There are also one field missing versus from the filestream versus the module, but I can handle it into the filebeat.yml:
"fileset": {
"name": "auth"
},
So in resume, I think the pipeline wasn't working well because of the - parsers
filter from the filebeat.yml. The issue was the guy between the chair and keyboard!
Regarding the timezone, do you have an explanation? From the system
module there is a field [event][timezone] = "-04:00"
, which is not present with the filestream
. Will try to add it to see if it change something.
Thank you again for all you time finding out what's going on into this thread!
Regards,
Yanick
Hi @yquirion
I am a little confused after the very detailed answer I gave why you did not confirm if you could reproduce the same.
At this point, I am going to assume you could.
In general if you are going to try to recreate module with inputs you will need to look at the ingest pipeline and what else the module is doing you can see this under module
directory
in this case.
cd ./module/system/auth/
So first timezone
.
This is simple, the modules add the add_locale
processors which adds the timezone then the pipeline uses that to adjust the syslog timestamp ... the filestream input does not know anything about that.
You can
So this is a simple fix below.
- type: filestream
# Change to true to enable this input configuration.
enabled: true
pipeline: filebeat-8.6.2-system-auth-pipeline
# Paths that should be crawled and fetched. Glob based paths.
paths:
- /Users/sbrown/workspace/sample-data/discuss/syslog-pipeline/fsci-secure.log
index: 'filebeat-auth-8.6.2-sys-linux'
tags: "preserve_original_event"
processors:
- add_locale: ~
How did I know this I went and looked at that module in detail
from filebeat directory, this is where the definitions of the module is if you want an plain input to work like a module you need to look in these directories and understand what it is doing
cd ./module/system/auth/
cat config/auth.yml
and if you look at the ingest pipeline which if you are going to use "outside" the module you really need to look at and understand.
You can see here the date processor uses the timezone
{
"date": {
"target_field": "@timestamp",
"formats": [
"MMM d HH:mm:ss",
"MMM dd HH:mm:ss",
"ISO8601"
],
"timezone": "{{{ event.timezone }}}",
"if": "ctx.event?.timezone != null",
"field": "system.auth.timestamp",
"on_failure": [
{
"append": {
"value": "{{{ _ingest.on_failure_message }}}",
"field": "error.message"
}
}
]
}
}
Your next question about the message
field again you just need to look at the ingest pipeline and follow the logic and / or run _simluate/verbose=true
to see what is happening....
When this message is sent through the pipeline it is "fully" parsed and thus there is no message left as it is completely parsed all the pertinent data ends up in fields.
Sep 15 11:56:18 dinf-miro sshd[164528]: Accepted password for myuser from 10.3.1.2 port 39100 ssh2
Go look at the code and you should see...
1st) The message gets renamed as event.original
2nd) There is an initial syslog parsing the leftover event is in _temp.message
3rd) There there are subsequent Groks...
The first tries a bunch of combinations and if nothing matches it puts _temp.message
back into message
This grok
{
"grok": {
"tag": "grok-specific-messages",
"field": "_temp.message",
"ignore_missing": true,
"patterns": [
"^%{DATA:system.auth.ssh.event} %{DATA:system.auth.ssh.method} for (invalid user)?%{DATA:user.name} from %{IPORHOST:source.ip} port %{NUMBER:source.port:long} ssh2(: %{GREEDYDATA:system.auth.ssh.signature})?",
"^%{DATA:system.auth.ssh.event} user %{DATA:user.name} from %{IPORHOST:source.ip}",
"^Did not receive identification string from %{IPORHOST:system.auth.ssh.dropped_ip}",
"^%{DATA:user.name} :( %{DATA:system.auth.sudo.error} ;)? TTY=%{DATA:system.auth.sudo.tty} ; PWD=%{DATA:system.auth.sudo.pwd} ; USER=%{DATA:system.auth.sudo.user} ; COMMAND=%{GREEDYDATA:system.auth.sudo.command}",
"^new group: name=%{DATA:group.name}, GID=%{NUMBER:group.id}",
"^new user: name=%{DATA:user.name}, UID=%{NUMBER:user.id}, GID=%{NUMBER:group.id}, home=%{DATA:system.auth.useradd.home}, shell=%{DATA:system.auth.useradd.shell}$"
],
"description": "Grok specific auth messages.",
"on_failure": [
{
"rename": {
"description": "Leave the unmatched content in message.",
"field": "_temp.message",
"target_field": "message"
}
}
]
}
}
But this DOES in fact exactly match the 2nd grok
exactly so it does not fail and put the _temp.message
back into message
that is the end of the processing... it is fully parsed and no need for a message fields.
If it Had Not matched it would have tried the next grok to try decode pam
messages which yours do not fit the pattern.
etc.
This is why on some of the specific messages there are no leftover message
because it is fully consumed.
Also if you look at the bottom of the ingest pipeline you will see...
{
"remove": {
"ignore_missing": true,
"field": "event.original",
"if": "ctx?.tags == null || !(ctx.tags.contains('preserve_original_event'))",
"ignore_failure": true
}
}
so if you just add a tag to either the module or input that will preserve the event.original
tags: "preserve_original_event"
All your event.original
issues are solved, so open up those ingest pipelines and read the code they are executed in order.
(note not every pipeline is guaranteed to have that logic so you need to look)
Hope this helps
Hi @stephenb,
Thank you again for the detailled explanation.
Friday before the end of the day I added this to the filebeat.yml:
processors:
- add_locale: ~
And the issue with timezone has been solved in Kibana! Seems you found the same!
Also thanks finding out the issue of the missing field message
. However, I'd like to keep it. So in the beginning of my pipeline on my logstash server I added the following:
mutate {
copy => {
"message" => "[event][original]"
}
}
But doing this causes me strange problems; on some logs, like the auth ones, I will have those fields added:
If I'm using field [event][test]
instead of ECS field [event][original]
, it works without those messages..
After looking at the module/system/auth/ingest/pipeline.yml
file I was not sure was could be the problem, so I use the _simulate like you show me earlier (very useful by the way).
Here is the JSON I used to make the test:
POST _ingest/pipeline/filebeat-8.6.2-system-auth-pipeline/_simulate?verbose=true
{
"docs": [
{
"_source": {
"@timestamp": "2023-09-18T20:24:52.517Z",
"@metadata": {
"beat": "filebeat",
"type": "_doc",
"version": "8.6.2",
"pipeline": "filebeat-8.6.2-system-auth-pipeline",
"raw_index": "filebeat-fsci-8.6.2-sys-linux"
},
"event": {
"timezone": "-04:00",
"original": "Sep 18 16:24:51 dinf-miro systemd-logind[1091]: New session 360 of user myuser."
},
"log": {
"file": {
"path": "/var/log/facultes/FSCI/fsci-secure.log"
},
"offset": 9370
},
"message": "Sep 18 16:24:51 dinf-miro systemd-logind[1091]: New session 360 of user myuser.",
"tags": [
"syslog",
"FSCI",
"auth"
],
"input": {
"type": "filestream"
},
"fileset": {
"name": "auth"
},
"host": {
"name": "syslogf.domain.com"
},
"agent": {
"ephemeral_id": "da03a79d-3ed9-403b-bfee-7c914baf9108",
"id": "7f080758-7e4f-4d73-9109-2a05b9e1a0b7",
"name": "syslogf.domain.com",
"type": "filebeat",
"version": "8.6.2"
},
"ecs": {
"version": "8.0.0"
}
}
}
]
}
And the result was:
{
"docs": [
{
"processor_results": [
{
"processor_type": "set",
"status": "success",
"doc": {
"_index": "_index",
"_id": "_id",
"_version": "-3",
"_source": {
"input": {
"type": "filestream"
},
"agent": {
"name": "syslogf.domain.com",
"id": "7f080758-7e4f-4d73-9109-2a05b9e1a0b7",
"ephemeral_id": "da03a79d-3ed9-403b-bfee-7c914baf9108",
"type": "filebeat",
"version": "8.6.2"
},
"@timestamp": "2023-09-18T20:24:52.517Z",
"ecs": {
"version": "8.0.0"
},
"log": {
"offset": 9370,
"file": {
"path": "/var/log/facultes/FSCI/fsci-secure.log"
}
},
"@metadata": {
"pipeline": "filebeat-8.6.2-system-auth-pipeline",
"beat": "filebeat",
"raw_index": "filebeat-fsci-8.6.2-sys-linux",
"type": "_doc",
"version": "8.6.2"
},
"host": {
"name": "syslogf.domain.com"
},
"event": {
"ingested": "2023-09-18T20:50:53.770090689Z",
"original": "Sep 18 16:24:51 dinf-miro systemd-logind[1091]: New session 360 of user myuser.",
"timezone": "-04:00"
},
"message": "Sep 18 16:24:51 dinf-miro systemd-logind[1091]: New session 360 of user myuser.",
"fileset": {
"name": "auth"
},
"tags": [
"syslog",
"FSCI",
"auth"
]
},
"_ingest": {
"pipeline": "filebeat-8.6.2-system-auth-pipeline",
"timestamp": "2023-09-18T20:50:53.770090689Z"
}
}
},
{
"processor_type": "rename",
"status": "skipped",
"if": {
"condition": "ctx.event?.original == null",
"result": false
}
},
{
"processor_type": "grok",
"status": "success",
"description": "Grok the message header.",
"tag": "grok-message-header",
"doc": {
"_index": "_index",
"_id": "_id",
"_version": "-3",
"_source": {
"agent": {
"name": "syslogf.domain.com",
"id": "7f080758-7e4f-4d73-9109-2a05b9e1a0b7",
"ephemeral_id": "da03a79d-3ed9-403b-bfee-7c914baf9108",
"type": "filebeat",
"version": "8.6.2"
},
"process": {
"name": "systemd-logind",
"pid": 1091
},
"_temp": {
"message": "New session 360 of user myuser."
},
"log": {
"offset": 9370,
"file": {
"path": "/var/log/facultes/FSCI/fsci-secure.log"
}
},
"@metadata": {
"pipeline": "filebeat-8.6.2-system-auth-pipeline",
"beat": "filebeat",
"raw_index": "filebeat-fsci-8.6.2-sys-linux",
"type": "_doc",
"version": "8.6.2"
},
"message": "Sep 18 16:24:51 dinf-miro systemd-logind[1091]: New session 360 of user myuser.",
"fileset": {
"name": "auth"
},
"tags": [
"syslog",
"FSCI",
"auth"
],
"input": {
"type": "filestream"
},
"@timestamp": "2023-09-18T20:24:52.517Z",
"system": {
"auth": {
"timestamp": "Sep 18 16:24:51"
}
},
"ecs": {
"version": "8.0.0"
},
"host": {
"name": "syslogf.domain.com",
"hostname": "dinf-miro"
},
"event": {
"ingested": "2023-09-18T20:50:53.770090689Z",
"original": "Sep 18 16:24:51 dinf-miro systemd-logind[1091]: New session 360 of user myuser.",
"timezone": "-04:00"
}
},
"_ingest": {
"pipeline": "filebeat-8.6.2-system-auth-pipeline",
"timestamp": "2023-09-18T20:50:53.770090689Z"
}
}
},
{
"processor_type": "grok",
"status": "error",
"description": "Grok specific auth messages.",
"tag": "grok-specific-messages",
"error": {
"root_cause": [
{
"type": "illegal_argument_exception",
"reason": "Provided Grok expressions do not match field value: [New session 360 of user myuser.]"
}
],
"type": "illegal_argument_exception",
"reason": "Provided Grok expressions do not match field value: [New session 360 of user myuser.]"
}
},
{
"processor_type": "rename",
"status": "error",
"description": "Leave the unmatched content in message.",
"error": {
"root_cause": [
{
"type": "illegal_argument_exception",
"reason": "field [message] already exists"
}
],
"type": "illegal_argument_exception",
"reason": "field [message] already exists"
}
},
{
"processor_type": "set",
"status": "success",
"doc": {
"_index": "_index",
"_id": "_id",
"_version": "-3",
"_source": {
"agent": {
"name": "syslogf.domain.com",
"id": "7f080758-7e4f-4d73-9109-2a05b9e1a0b7",
"ephemeral_id": "da03a79d-3ed9-403b-bfee-7c914baf9108",
"type": "filebeat",
"version": "8.6.2"
},
"process": {
"name": "systemd-logind",
"pid": 1091
},
"_temp": {
"message": "New session 360 of user myuser."
},
"log": {
"offset": 9370,
"file": {
"path": "/var/log/facultes/FSCI/fsci-secure.log"
}
},
"@metadata": {
"pipeline": "filebeat-8.6.2-system-auth-pipeline",
"beat": "filebeat",
"raw_index": "filebeat-fsci-8.6.2-sys-linux",
"type": "_doc",
"version": "8.6.2"
},
"message": "Sep 18 16:24:51 dinf-miro systemd-logind[1091]: New session 360 of user myuser.",
"fileset": {
"name": "auth"
},
"error": {
"message": "field [message] already exists"
},
"tags": [
"syslog",
"FSCI",
"auth"
],
"input": {
"type": "filestream"
},
"@timestamp": "2023-09-18T20:24:52.517Z",
"system": {
"auth": {
"timestamp": "Sep 18 16:24:51"
}
},
"ecs": {
"version": "8.0.0"
},
"host": {
"name": "syslogf.domain.com",
"hostname": "dinf-miro"
},
"event": {
"ingested": "2023-09-18T20:50:53.770090689Z",
"original": "Sep 18 16:24:51 dinf-miro systemd-logind[1091]: New session 360 of user myuser.",
"timezone": "-04:00"
}
},
"_ingest": {
"pipeline": "filebeat-8.6.2-system-auth-pipeline",
"on_failure_pipeline": "filebeat-8.6.2-system-auth-pipeline",
"on_failure_message": "field [message] already exists",
"on_failure_processor_tag": null,
"timestamp": "2023-09-18T20:50:53.770090689Z",
"on_failure_processor_type": "rename"
}
}
}
]
}
]
}
From that result, we can see lots of errors. Some of them are because the grok patterns don't match the input data field:
{
"processor_type": "grok",
"status": "error",
"description": "Grok specific auth messages.",
"tag": "grok-specific-messages",
"error": {
"root_cause": [
{
"type": "illegal_argument_exception",
"reason": "Provided Grok expressions do not match field value: [New session 360 of user myuser.]"
}
],
"type": "illegal_argument_exception",
"reason": "Provided Grok expressions do not match field value: [New session 360 of user myuser.]"
}
},
Then, still from the grok instruction, on failure, it tries to rename the field _temp.message
(which has been created from the GROK pattern) to message
, but message
already exists.
{
"processor_type": "rename",
"status": "error",
"description": "Leave the unmatched content in message.",
"error": {
"root_cause": [
{
"type": "illegal_argument_exception",
"reason": "field [message] already exists"
}
],
"type": "illegal_argument_exception",
"reason": "field [message] already exists"
}
},
He is the code from the pipeline.yml
:
- grok:
description: Grok specific auth messages.
tag: grok-specific-messages
field: _temp.message
ignore_missing: true
patterns:
- '^%{DATA:system.auth.ssh.event} %{DATA:system.auth.ssh.method} for (invalid user)?%{DATA:user.name} from %{IPORHOST:source.ip} port %{NUMBER:source.port:long} ssh2(: %{GREEDYDATA:system.auth.ssh.signature})?'
- '^%{DATA:system.auth.ssh.event} user %{DATA:user.name} from %{IPORHOST:source.ip}'
- '^Did not receive identification string from %{IPORHOST:system.auth.ssh.dropped_ip}'
- '^%{DATA:user.name} :( %{DATA:system.auth.sudo.error} ;)? TTY=%{DATA:system.auth.sudo.tty} ; PWD=%{DATA:system.auth.sudo.pwd} ; USER=%{DATA:system.auth.sudo.user} ; COMMAND=%{GREEDYDATA:system.auth.sudo.command}'
- '^new group: name=%{DATA:group.name}, GID=%{NUMBER:group.id}'
- '^new user: name=%{DATA:user.name}, UID=%{NUMBER:user.id}, GID=%{NUMBER:group.id}, home=%{DATA:system.auth.useradd.home}, shell=%{DATA:system.auth.useradd.shell}$'
on_failure:
- rename:
description: Leave the unmatched content in message.
field: _temp.message
target_field: message
So this is the reason why I got those two extra fields into Kibana.
I tried another test. From the input I used for the ingest pipeline simulation, I removed the [event][original] that has been added my the logstash pipeline:
"event": {
"timezone": "-04:00",
},
And the result is different:
I still have the GROK parsing error:
{
"processor_type": "grok",
"status": "error",
"description": "Grok specific auth messages.",
"tag": "grok-specific-messages",
"error": {
"root_cause": [
{
"type": "illegal_argument_exception",
"reason": "Provided Grok expressions do not match field value: [New session 360 of user myuser.]"
}
],
"type": "illegal_argument_exception",
"reason": "Provided Grok expressions do not match field value: [New session 360 of user myuser.]"
}
},
But the rename
is working:
{
"processor_type": "rename",
"status": "success",
"description": "Leave the unmatched content in message.",
"doc": {
"_index": "_index",
"_id": "_id",
"_version": "-3",
"_source": {
[...]
"message": "New session 360 of user myuser.",
Later there are two GROK parsing errors but ends with a status of error_ignored
:
"processor_type": "grok",
"status": "error_ignored",
"description": "Grok usernames from PAM messages.",
"tag": "grok-pam-users",
"if": {
"condition": "ctx.message != null && ctx.message != \"\"",
"result": true
},
"ignored_error": {
"error": {
"root_cause": [
{
"type": "illegal_argument_exception",
"reason": "Provided Grok expressions do not match field value: [New session 360 of user myuser.]"
}
],
"type": "illegal_argument_exception",
"reason": "Provided Grok expressions do not match field value: [New session 360 of user myuser.]"
}
},
But in that case, just because I removed [event][original] field, I don't have the two extra fields into Kibana and everything is looking fine.
So I will try to use another fields name to keep the original message into, but that won't be the ECS standard.
Can you test at your end if you have the same result?
Sorry again for that long reply!
Thanks
Yanick
Hi again @stephenb,
I just find out something else.
When adding [event][original] field from logstash and simulate a server logon to generate some logs, most of the event were running into a GROK match failure. But one of them was working well:
POST _ingest/pipeline/filebeat-8.6.2-system-auth-pipeline/_simulate?verbose=true
{
"docs": [
{
"_source": {
"@timestamp": "2023-09-18T20:24:30.514Z",
"@metadata": {
"beat": "filebeat",
"type": "_doc",
"version": "8.6.2",
"pipeline": "filebeat-8.6.2-system-auth-pipeline",
"raw_index": "filebeat-fsci-8.6.2-sys-linux"
},
"log": {
"offset": 3441,
"file": {
"path": "/var/log/facultes/FSCI/fsci-secure.log"
}
},
"tags": [
"syslog",
"FSCI",
"auth",
"preserve_original_event"
],
"input": {
"type": "filestream"
},
"ecs": {
"version": "8.0.0"
},
"event": {
"timezone": "-04:00"
},
"message": "Sep 18 08:29:17 dinf-miro sshd[289831]: Accepted password for myuser from 10.3.1.2 port 34480 ssh2",
"fileset": {
"name": "auth"
},
"agent": {
"type": "filebeat",
"version": "8.6.2",
"ephemeral_id": "da03a79d-3ed9-403b-bfee-7c914baf9108",
"id": "7f080758-7e4f-4d73-9109-2a05b9e1a0b7",
"name": "syslogf.domain.com"
},
"host": {
"name": "syslogf.domain.com"
}
}
}
]
}
When running this into the ingest pipeline simulator, there was no error, except that my [event][original]
was removed.
Looking at the devtool output, I saw this:
{
"processor_type": "remove",
"status": "success",
"if": {
"condition": "ctx?.tags == null || !(ctx.tags.contains('preserve_original_event'))",
"result": true
},
So I added "preserve_original_event" into the tags from the filebeat.yml
and it solve the issue:
{
"processor_type": "remove",
"status": "skipped",
"if": {
"condition": "ctx?.tags == null || !(ctx.tags.contains('preserve_original_event'))",
"result": false
}
}
From the pipeline code:
- remove:
field: event.original
if: "ctx?.tags == null || !(ctx.tags.contains('preserve_original_event'))"
ignore_failure: true
ignore_missing: true
Just wondering why they are doing that!
Thanks
Because at scale saving indexing the event.original
can add more storage requirements so we want to provide the option for well parsed data it may not provide much additional value.
We see this particularly is structured security events at scale.
Hi @stephenb,
Sorry for the late reply!
Just wanna thank you for you help in this topic! I was able to make it working and the user is happy about the result!
Thanks again!
Yanick
This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.
© 2020. All Rights Reserved - Elasticsearch
Apache, Apache Lucene, Apache Hadoop, Hadoop, HDFS and the yellow elephant logo are trademarks of the Apache Software Foundation in the United States and/or other countries.