Filebeat modules vs filestream input

That would explain most of it... :slight_smile:

We can fix timezones later...

I still do not see the user getting parsed ... show me everything

right but how sure of the logic are you :wink:

Show me everything.... (anonymized) I may not get back to this until later

HI @stephenb,

I tought I could be able to add attachements to a reply, but it seems I can't.

I've uploaded the files into my Onedrive.

Here is the link (read only bnut you could be able to download them):
https://usherbrooke-my.sharepoint.com/:f:/g/personal/quiy2001_usherbrooke_ca/EsdHUJkmQ-VInIbPnoXjLzkBVLgcpM-MeLIeSewhW2WiDA?e=ROCKvU

When looking at the 10-pipeline.conf file, the logic for this particular data will starts at line 535. All stuff above is not related to this data.

Please let me know if that work or not.

Thanks
Yanick

So I just ran

fsci-secure.log -> Filebeat (Your System Module with Auth) -> Logstash (your conf) -> Elasticsearch

To be transparent in your logstash conf I commented out the APACHE processing because you must have custom patterns some where and I commented out the file output section that writes to the root directory

But the output section looks like this... which is the most important
#SC is StephenComment

if "fileonly" not in [tags] {
		stdout {}
    elasticsearch {
			hosts => [ "http://localhost:9200" ]
#SC      hosts => [ "${coordinator_node_0}", "${coordinator_node_1}" ]
#SC      ssl => true
#SC      cacert => "/etc/logstash/certs/UdeS-Chain-Base64.crt"
#SC      sniffing => false

      manage_template => false

      #index => "%{[@metadata][target_index]}-%{+YYYY.MM.dd}"
      index => "%{[@metadata][target_index]}-%{[@metadata][index_time]}"
      pipeline => "%{[@metadata][target_pipeline]}"
      #pipeline => "filebeat-8.6.2-system-auth-pipeline"

#SC        user => "logstash_user"
#SC        password => "${logstash_user_pwd}"
    }
  }

Here is what I found... first you need to set this setting (there is a really long conversation hidden here) otherwise logstash will create and event.original fields which can mess up the ingest pipeline parsing.

set in your logstash.yml
pipeline.ecs_compatibility: disabled

Even with doing all this only some of the users are parse and they are probably not the ones you think

I made sure all 10 log lines were unique with time and process ids here is my test set

Sep 14 18:00:11 dinf-miro sshd[121966]: Received disconnect from 10.3.1.2 port 33678:11: disconnected by user
Sep 14 18:00:12 dinf-miro sshd[121967]: Disconnected from user myuser 10.3.1.2 port 33678
Sep 14 18:00:13 dinf-miro sshd[121888]: pam_unix(sshd:session): session closed for user myuser
Sep 14 18:00:14 dinf-miro systemd-logind[1091]: Session 142 logged out. Waiting for processes to exit.
Sep 14 18:00:15 dinf-miro systemd-logind[1091]: Removed session 142.
Sep 14 18:00:16 dinf-miro sshd[122208]: pam_unix(sshd:auth): authentication failure; logname= uid=0 euid=0 tty=ssh ruser= rhost=10.3.1.2  user=myuser
Sep 14 18:00:17 dinf-miro sshd[122209]: pam_sss(sshd:auth): authentication success; logname= uid=0 euid=0 tty=ssh ruser= rhost=10.3.1.2 user=myuser
Sep 14 18:00:18 dinf-miro sshd[122210]: Accepted password for myuser from 10.3.1.2 port 33732 ssh2
Sep 14 18:00:19 dinf-miro sshd[122211]: pam_unix(sshd:session): session opened for user myuser by (uid=0)
Sep 14 18:00:20 dinf-miro systemd-logind[1091]: New session 143 of user myuser.

No you can see by the output that only the lines with the for myuser get parsed and the ones with user=myuser do not... that is not to say you can not parse them just they are not by the current pipeline

Only the logs with seconds :13, :18 and :19 get the user parsed.... not the other ones

Now then I ran

fsci-secure.log -> Filebeat (Filestream Input with Pipelien) -> Logstash (your conf) -> Elasticsearch

Same Exact Data Set
Disabled system module

with this with the as the filestream input

 # Auth LOGS
- type: filestream

  # Change to true to enable this input configuration.
  enabled: true

  pipeline: filebeat-8.6.2-system-auth-pipeline

  # Paths that should be crawled and fetched. Glob based paths.
  paths:
    - /Users/sbrown/workspace/sample-data/discuss/syslog-pipeline/fsci-secure.log

  index: 'filebeat-auth-8.6.2-sys-linux'

And I got the exact same result same records were parsed exactly the same...

So tell me what you can reproduce....

Hi @stephenb,

First I already have the parameter pipeline.ecs_compatibility: disabled sets into my logstash.yml file.

On each of my pipelines, I always make a copy of [message] into [event][original] using mutate.

However, I'm currently struggling with [event][original]. There is a very strange behaviour, and since this is not part of my current request, I will just replace [event]original] by [event][toto] which doesn't cause me any errors and I can see the original message field:

  mutate {
    copy => {
      "message" => "[event][toto]"
    }
  }

filestream input:

Using this input line from the syslog server:

Sep 15 11:56:18 dinf-miro sshd[164528]: Accepted password for myuser from 10.3.1.2 port 39100 ssh2

The result after it has been ingested will be:
GET filebeat-fsci2-8.6.2-sys-linux-2023.09.15/_doc/Dh-OmYoBiKBXXA2etYxs

{
  "_index": "filebeat-fsci2-8.6.2-sys-linux-2023.09.15",
  "_id": "Dh-OmYoBiKBXXA2etYxs",
  "_version": 1,
  "_seq_no": 5858,
  "_primary_term": 1,
  "found": true,
  "_source": {
    "agent": {
      "name": "syslogf.domain.com",
      "id": "7f080758-7e4f-4d73-9109-2a05b9e1a0b7",
      "type": "filebeat",
      "ephemeral_id": "058777ab-a825-4901-9380-8d51203b8986",
      "version": "8.6.2"
    },
    "process": {
      "name": "sshd",
      "pid": 164528
    },
    "log": {
      "file": {
        "path": "/var/log/facultes/FSCI/fsci-secure.log"
      },
      "offset": 14600
    },
    "source": {
      "port": 39100,
      "ip": "10.3.1.2"
    },
    "Logstash.Hostname": "logstash-dev03.domain.com",
    "tags": [
      "audit",
      "syslog",
      "FSCI",
      "auth",
      "fileoutput",
      "beats_input_codec_plain_applied"
    ],
    "input": {
      "type": "filestream"
    },
    "@timestamp": "2023-09-15T11:56:18.000Z",
    "system": {
      "auth": {
        "ssh": {
          "method": "password",
          "event": "Accepted"
        }
      }
    },
    "ecs": {
      "version": "8.0.0"
    },
    "related": {
      "hosts": [
        "dinf-miro"
      ],
      "ip": [
        "10.3.1.2"
      ],
      "user": [
        "myuser"
      ]
    },
    "host": {
      "hostname": "dinf-miro",
      "name": "syslogf.domain.com"
    },
    "@version": "1",
    "event": {
      "toto": "Sep 15 11:56:18 dinf-miro sshd[164528]: Accepted password for myuser from 10.3.1.2 port 39100 ssh2",
      "ingested": "2023-09-15T15:56:21.222470803Z",
      "kind": "event",
      "action": "ssh_login",
      "type": [
        "info"
      ],
      "category": [
        "authentication",
        "session"
      ],
      "outcome": "success"
    },
    "user": {
      "name": "myuser"
    }
  }
}

Note that the field [message] has been removed (not sure why). Also, note that field [event][toto] is present and contains the original message field value. We can also see that the fields has been properly parsed!

I think the issue I had before was the "- parsers" I added to my filebeat config file. The only issue I have now, and I don't know why, is the timezone. You can see the timezone is correct into this output:

"@timestamp": "2023-09-15T11:56:18.000Z"

But in Kibana, it is four hour behind real time:

When the option - parsers in filebeat.yml, the timestamp was correctly parsed.

module system
With the module system enabled (and filestream disabled), using the following input line:

Sep 15 12:27:17 dinf-miro sshd[165567]: Accepted password for myuser from 10.3.1.2 port 39260 ssh2

The result after it has been ingested will be:

GET filebeat-fsci-8.6.2-sys-linux-2023.09.15/_doc/K4ermYoBiKBXXA2eD1mo
{
  "_index": "filebeat-fsci-8.6.2-sys-linux-2023.09.15",
  "_id": "K4ermYoBiKBXXA2eD1mo",
  "_version": 1,
  "_seq_no": 3730,
  "_primary_term": 1,
  "found": true,
  "_source": {
    "agent": {
      "name": "syslogf.sti.usherbrooke.ca",
      "id": "7f080758-7e4f-4d73-9109-2a05b9e1a0b7",
      "type": "filebeat",
      "ephemeral_id": "6bb67c55-5cc3-4514-b0f6-cdddd4528ad4",
      "version": "8.6.2"
    },
    "process": {
      "name": "sshd",
      "pid": 165567
    },
    "log": {
      "file": {
        "path": "/var/log/facultes/FSCI/fsci-secure.log"
      },
      "offset": 18261
    },
    "source": {
      "port": 39260,
      "ip": "10.32.104.212"
    },
    "fileset": {
      "name": "auth"
    },
    "Logstash.Hostname": "logstash-dev03.elk.usherbrooke.ca",
    "tags": [
      "audit",
      "syslog",
      "fileoutput",
      "beats_input_codec_plain_applied"
    ],
    "input": {
      "type": "log"
    },
    "@timestamp": "2023-09-15T12:27:17.000-04:00",
    "system": {
      "auth": {
        "ssh": {
          "method": "password",
          "event": "Accepted"
        }
      }
    },
    "ecs": {
      "version": "8.0.0"
    },
    "related": {
      "hosts": [
        "dinf-miro"
      ],
      "ip": [
        "10.3.1.2"
      ],
      "user": [
        "myuser"
      ]
    },
    "service": {
      "type": "system"
    },
    "host": {
      "hostname": "dinf-miro",
      "name": "syslogf.domain.com"
    },
    "@version": "1",
    "event": {
      "toto": "Sep 15 12:27:17 dinf-miro sshd[165567]: Accepted password for myuser from 10.3.1.2 port 39260 ssh2",
      "ingested": "2023-09-15T16:27:19.334642719Z",
      "timezone": "-04:00",
      "kind": "event",
      "module": "system",
      "action": "ssh_login",
      "type": [
        "info"
      ],
      "category": [
        "authentication",
        "session"
      ],
      "dataset": "system.auth",
      "outcome": "success"
    },
    "user": {
      "name": "myuser"
    }
  }
}

This result shows almost the same as the filestream!

I will still need to play into the logstash to make the parsing better. For example, solving that issue we was talking about:

No you can see by the output that only the lines with the for myuser get parsed and the ones with user=myuser do not... that is not to say you can not parse them just they are not by the current pipeline

There are also one field missing versus from the filestream versus the module, but I can handle it into the filebeat.yml:

    "fileset": {
      "name": "auth"
    },

So in resume, I think the pipeline wasn't working well because of the - parsers filter from the filebeat.yml. The issue was the guy between the chair and keyboard!

Regarding the timezone, do you have an explanation? From the system module there is a field [event][timezone] = "-04:00", which is not present with the filestream. Will try to add it to see if it change something.

Thank you again for all you time finding out what's going on into this thread!

Regards,
Yanick

Hi @yquirion

I am a little confused after the very detailed answer I gave why you did not confirm if you could reproduce the same.

At this point, I am going to assume you could.

In general if you are going to try to recreate module with inputs you will need to look at the ingest pipeline and what else the module is doing you can see this under module directory
in this case.

cd ./module/system/auth/

So first timezone.

This is simple, the modules add the add_locale processors which adds the timezone then the pipeline uses that to adjust the syslog timestamp ... the filestream input does not know anything about that.
You can

So this is a simple fix below.

- type: filestream

  # Change to true to enable this input configuration.
  enabled: true

  pipeline: filebeat-8.6.2-system-auth-pipeline

  # Paths that should be crawled and fetched. Glob based paths.
  paths:
    - /Users/sbrown/workspace/sample-data/discuss/syslog-pipeline/fsci-secure.log

  index: 'filebeat-auth-8.6.2-sys-linux'

  tags: "preserve_original_event"

  processors:
  - add_locale: ~

How did I know this I went and looked at that module in detail
from filebeat directory, this is where the definitions of the module is if you want an plain input to work like a module you need to look in these directories and understand what it is doing

cd ./module/system/auth/
cat config/auth.yml

and if you look at the ingest pipeline which if you are going to use "outside" the module you really need to look at and understand.

You can see here the date processor uses the timezone

   {
      "date": {
        "target_field": "@timestamp",
        "formats": [
          "MMM  d HH:mm:ss",
          "MMM dd HH:mm:ss",
          "ISO8601"
        ],
        "timezone": "{{{ event.timezone }}}",
        "if": "ctx.event?.timezone != null",
        "field": "system.auth.timestamp",
        "on_failure": [
          {
            "append": {
              "value": "{{{ _ingest.on_failure_message }}}",
              "field": "error.message"
            }
          }
        ]
      }
    }

Your next question about the message field again you just need to look at the ingest pipeline and follow the logic and / or run _simluate/verbose=true to see what is happening....

When this message is sent through the pipeline it is "fully" parsed and thus there is no message left as it is completely parsed all the pertinent data ends up in fields.

Sep 15 11:56:18 dinf-miro sshd[164528]: Accepted password for myuser from 10.3.1.2 port 39100 ssh2

Go look at the code and you should see...

1st) The message gets renamed as event.original
2nd) There is an initial syslog parsing the leftover event is in _temp.message
3rd) There there are subsequent Groks...

The first tries a bunch of combinations and if nothing matches it puts _temp.message back into message

This grok

{
      "grok": {
        "tag": "grok-specific-messages",
        "field": "_temp.message",
        "ignore_missing": true,
        "patterns": [
          "^%{DATA:system.auth.ssh.event} %{DATA:system.auth.ssh.method} for (invalid user)?%{DATA:user.name} from %{IPORHOST:source.ip} port %{NUMBER:source.port:long} ssh2(: %{GREEDYDATA:system.auth.ssh.signature})?",
          "^%{DATA:system.auth.ssh.event} user %{DATA:user.name} from %{IPORHOST:source.ip}",
          "^Did not receive identification string from %{IPORHOST:system.auth.ssh.dropped_ip}",
          "^%{DATA:user.name} :( %{DATA:system.auth.sudo.error} ;)? TTY=%{DATA:system.auth.sudo.tty} ; PWD=%{DATA:system.auth.sudo.pwd} ; USER=%{DATA:system.auth.sudo.user} ; COMMAND=%{GREEDYDATA:system.auth.sudo.command}",
          "^new group: name=%{DATA:group.name}, GID=%{NUMBER:group.id}",
          "^new user: name=%{DATA:user.name}, UID=%{NUMBER:user.id}, GID=%{NUMBER:group.id}, home=%{DATA:system.auth.useradd.home}, shell=%{DATA:system.auth.useradd.shell}$"
        ],
        "description": "Grok specific auth messages.",
        "on_failure": [
          {
            "rename": {
              "description": "Leave the unmatched content in message.",
              "field": "_temp.message",
              "target_field": "message"
            }
          }
        ]
      }
    }

But this DOES in fact exactly match the 2nd grok exactly so it does not fail and put the _temp.message back into message that is the end of the processing... it is fully parsed and no need for a message fields.

If it Had Not matched it would have tried the next grok to try decode pam messages which yours do not fit the pattern.

etc.

This is why on some of the specific messages there are no leftover message because it is fully consumed.

Also if you look at the bottom of the ingest pipeline you will see...

{
      "remove": {
        "ignore_missing": true,
        "field": "event.original",
        "if": "ctx?.tags == null || !(ctx.tags.contains('preserve_original_event'))",
        "ignore_failure": true
      }
    }

so if you just add a tag to either the module or input that will preserve the event.original

tags: "preserve_original_event"

All your event.original issues are solved, so open up those ingest pipelines and read the code :slight_smile: they are executed in order.

(note not every pipeline is guaranteed to have that logic so you need to look)

Hope this helps

1 Like

Hi @stephenb,

Thank you again for the detailled explanation.

Friday before the end of the day I added this to the filebeat.yml:

processors:
  - add_locale: ~

And the issue with timezone has been solved in Kibana! Seems you found the same!

Also thanks finding out the issue of the missing field message. However, I'd like to keep it. So in the beginning of my pipeline on my logstash server I added the following:

  mutate {
    copy => {
      "message" => "[event][original]"
    }
  }

But doing this causes me strange problems; on some logs, like the auth ones, I will have those fields added:

If I'm using field [event][test] instead of ECS field [event][original], it works without those messages..

After looking at the module/system/auth/ingest/pipeline.yml file I was not sure was could be the problem, so I use the _simulate like you show me earlier (very useful by the way).

Here is the JSON I used to make the test:

POST _ingest/pipeline/filebeat-8.6.2-system-auth-pipeline/_simulate?verbose=true
{
  "docs": [
    {
      "_source": {
        "@timestamp": "2023-09-18T20:24:52.517Z",
        "@metadata": {
          "beat": "filebeat",
          "type": "_doc",
          "version": "8.6.2",
          "pipeline": "filebeat-8.6.2-system-auth-pipeline",
          "raw_index": "filebeat-fsci-8.6.2-sys-linux"
        },
        "event": {
          "timezone": "-04:00",
          "original": "Sep 18 16:24:51 dinf-miro systemd-logind[1091]: New session 360 of user myuser."
        },
        "log": {
          "file": {
            "path": "/var/log/facultes/FSCI/fsci-secure.log"
          },
          "offset": 9370
        },
        "message": "Sep 18 16:24:51 dinf-miro systemd-logind[1091]: New session 360 of user myuser.",
        "tags": [
          "syslog",
          "FSCI",
          "auth"
        ],
        "input": {
          "type": "filestream"
        },
        "fileset": {
          "name": "auth"
        },
        "host": {
          "name": "syslogf.domain.com"
        },
        "agent": {
          "ephemeral_id": "da03a79d-3ed9-403b-bfee-7c914baf9108",
          "id": "7f080758-7e4f-4d73-9109-2a05b9e1a0b7",
          "name": "syslogf.domain.com",
          "type": "filebeat",
          "version": "8.6.2"
        },
        "ecs": {
          "version": "8.0.0"
        }
      }
    }
  ]
}

And the result was:

{
  "docs": [
    {
      "processor_results": [
        {
          "processor_type": "set",
          "status": "success",
          "doc": {
            "_index": "_index",
            "_id": "_id",
            "_version": "-3",
            "_source": {
              "input": {
                "type": "filestream"
              },
              "agent": {
                "name": "syslogf.domain.com",
                "id": "7f080758-7e4f-4d73-9109-2a05b9e1a0b7",
                "ephemeral_id": "da03a79d-3ed9-403b-bfee-7c914baf9108",
                "type": "filebeat",
                "version": "8.6.2"
              },
              "@timestamp": "2023-09-18T20:24:52.517Z",
              "ecs": {
                "version": "8.0.0"
              },
              "log": {
                "offset": 9370,
                "file": {
                  "path": "/var/log/facultes/FSCI/fsci-secure.log"
                }
              },
              "@metadata": {
                "pipeline": "filebeat-8.6.2-system-auth-pipeline",
                "beat": "filebeat",
                "raw_index": "filebeat-fsci-8.6.2-sys-linux",
                "type": "_doc",
                "version": "8.6.2"
              },
              "host": {
                "name": "syslogf.domain.com"
              },
              "event": {
                "ingested": "2023-09-18T20:50:53.770090689Z",
                "original": "Sep 18 16:24:51 dinf-miro systemd-logind[1091]: New session 360 of user myuser.",
                "timezone": "-04:00"
              },
              "message": "Sep 18 16:24:51 dinf-miro systemd-logind[1091]: New session 360 of user myuser.",
              "fileset": {
                "name": "auth"
              },
              "tags": [
                "syslog",
                "FSCI",
                "auth"
              ]
            },
            "_ingest": {
              "pipeline": "filebeat-8.6.2-system-auth-pipeline",
              "timestamp": "2023-09-18T20:50:53.770090689Z"
            }
          }
        },
        {
          "processor_type": "rename",
          "status": "skipped",
          "if": {
            "condition": "ctx.event?.original == null",
            "result": false
          }
        },
        {
          "processor_type": "grok",
          "status": "success",
          "description": "Grok the message header.",
          "tag": "grok-message-header",
          "doc": {
            "_index": "_index",
            "_id": "_id",
            "_version": "-3",
            "_source": {
              "agent": {
                "name": "syslogf.domain.com",
                "id": "7f080758-7e4f-4d73-9109-2a05b9e1a0b7",
                "ephemeral_id": "da03a79d-3ed9-403b-bfee-7c914baf9108",
                "type": "filebeat",
                "version": "8.6.2"
              },
              "process": {
                "name": "systemd-logind",
                "pid": 1091
              },
              "_temp": {
                "message": "New session 360 of user myuser."
              },
              "log": {
                "offset": 9370,
                "file": {
                  "path": "/var/log/facultes/FSCI/fsci-secure.log"
                }
              },
              "@metadata": {
                "pipeline": "filebeat-8.6.2-system-auth-pipeline",
                "beat": "filebeat",
                "raw_index": "filebeat-fsci-8.6.2-sys-linux",
                "type": "_doc",
                "version": "8.6.2"
              },
              "message": "Sep 18 16:24:51 dinf-miro systemd-logind[1091]: New session 360 of user myuser.",
              "fileset": {
                "name": "auth"
              },
              "tags": [
                "syslog",
                "FSCI",
                "auth"
              ],
              "input": {
                "type": "filestream"
              },
              "@timestamp": "2023-09-18T20:24:52.517Z",
              "system": {
                "auth": {
                  "timestamp": "Sep 18 16:24:51"
                }
              },
              "ecs": {
                "version": "8.0.0"
              },
              "host": {
                "name": "syslogf.domain.com",
                "hostname": "dinf-miro"
              },
              "event": {
                "ingested": "2023-09-18T20:50:53.770090689Z",
                "original": "Sep 18 16:24:51 dinf-miro systemd-logind[1091]: New session 360 of user myuser.",
                "timezone": "-04:00"
              }
            },
            "_ingest": {
              "pipeline": "filebeat-8.6.2-system-auth-pipeline",
              "timestamp": "2023-09-18T20:50:53.770090689Z"
            }
          }
        },
        {
          "processor_type": "grok",
          "status": "error",
          "description": "Grok specific auth messages.",
          "tag": "grok-specific-messages",
          "error": {
            "root_cause": [
              {
                "type": "illegal_argument_exception",
                "reason": "Provided Grok expressions do not match field value: [New session 360 of user myuser.]"
              }
            ],
            "type": "illegal_argument_exception",
            "reason": "Provided Grok expressions do not match field value: [New session 360 of user myuser.]"
          }
        },
        {
          "processor_type": "rename",
          "status": "error",
          "description": "Leave the unmatched content in message.",
          "error": {
            "root_cause": [
              {
                "type": "illegal_argument_exception",
                "reason": "field [message] already exists"
              }
            ],
            "type": "illegal_argument_exception",
            "reason": "field [message] already exists"
          }
        },
        {
          "processor_type": "set",
          "status": "success",
          "doc": {
            "_index": "_index",
            "_id": "_id",
            "_version": "-3",
            "_source": {
              "agent": {
                "name": "syslogf.domain.com",
                "id": "7f080758-7e4f-4d73-9109-2a05b9e1a0b7",
                "ephemeral_id": "da03a79d-3ed9-403b-bfee-7c914baf9108",
                "type": "filebeat",
                "version": "8.6.2"
              },
              "process": {
                "name": "systemd-logind",
                "pid": 1091
              },
              "_temp": {
                "message": "New session 360 of user myuser."
              },
              "log": {
                "offset": 9370,
                "file": {
                  "path": "/var/log/facultes/FSCI/fsci-secure.log"
                }
              },
              "@metadata": {
                "pipeline": "filebeat-8.6.2-system-auth-pipeline",
                "beat": "filebeat",
                "raw_index": "filebeat-fsci-8.6.2-sys-linux",
                "type": "_doc",
                "version": "8.6.2"
              },
              "message": "Sep 18 16:24:51 dinf-miro systemd-logind[1091]: New session 360 of user myuser.",
              "fileset": {
                "name": "auth"
              },
              "error": {
                "message": "field [message] already exists"
              },
              "tags": [
                "syslog",
                "FSCI",
                "auth"
              ],
              "input": {
                "type": "filestream"
              },
              "@timestamp": "2023-09-18T20:24:52.517Z",
              "system": {
                "auth": {
                  "timestamp": "Sep 18 16:24:51"
                }
              },
              "ecs": {
                "version": "8.0.0"
              },
              "host": {
                "name": "syslogf.domain.com",
                "hostname": "dinf-miro"
              },
              "event": {
                "ingested": "2023-09-18T20:50:53.770090689Z",
                "original": "Sep 18 16:24:51 dinf-miro systemd-logind[1091]: New session 360 of user myuser.",
                "timezone": "-04:00"
              }
            },
            "_ingest": {
              "pipeline": "filebeat-8.6.2-system-auth-pipeline",
              "on_failure_pipeline": "filebeat-8.6.2-system-auth-pipeline",
              "on_failure_message": "field [message] already exists",
              "on_failure_processor_tag": null,
              "timestamp": "2023-09-18T20:50:53.770090689Z",
              "on_failure_processor_type": "rename"
            }
          }
        }
      ]
    }
  ]
}

From that result, we can see lots of errors. Some of them are because the grok patterns don't match the input data field:

        {
          "processor_type": "grok",
          "status": "error",
          "description": "Grok specific auth messages.",
          "tag": "grok-specific-messages",
          "error": {
            "root_cause": [
              {
                "type": "illegal_argument_exception",
                "reason": "Provided Grok expressions do not match field value: [New session 360 of user myuser.]"
              }
            ],
            "type": "illegal_argument_exception",
            "reason": "Provided Grok expressions do not match field value: [New session 360 of user myuser.]"
          }
        },

Then, still from the grok instruction, on failure, it tries to rename the field _temp.message (which has been created from the GROK pattern) to message, but message already exists.

        {
          "processor_type": "rename",
          "status": "error",
          "description": "Leave the unmatched content in message.",
          "error": {
            "root_cause": [
              {
                "type": "illegal_argument_exception",
                "reason": "field [message] already exists"
              }
            ],
            "type": "illegal_argument_exception",
            "reason": "field [message] already exists"
          }
        },

He is the code from the pipeline.yml:

  - grok:
      description: Grok specific auth messages.
      tag: grok-specific-messages
      field: _temp.message
      ignore_missing: true
      patterns:
        - '^%{DATA:system.auth.ssh.event} %{DATA:system.auth.ssh.method} for (invalid user)?%{DATA:user.name} from %{IPORHOST:source.ip} port %{NUMBER:source.port:long} ssh2(: %{GREEDYDATA:system.auth.ssh.signature})?'
        - '^%{DATA:system.auth.ssh.event} user %{DATA:user.name} from %{IPORHOST:source.ip}'
        - '^Did not receive identification string from %{IPORHOST:system.auth.ssh.dropped_ip}'
        - '^%{DATA:user.name} :( %{DATA:system.auth.sudo.error} ;)? TTY=%{DATA:system.auth.sudo.tty} ; PWD=%{DATA:system.auth.sudo.pwd} ; USER=%{DATA:system.auth.sudo.user} ; COMMAND=%{GREEDYDATA:system.auth.sudo.command}'
        - '^new group: name=%{DATA:group.name}, GID=%{NUMBER:group.id}'
        - '^new user: name=%{DATA:user.name}, UID=%{NUMBER:user.id}, GID=%{NUMBER:group.id}, home=%{DATA:system.auth.useradd.home}, shell=%{DATA:system.auth.useradd.shell}$'
      on_failure:
        - rename:
            description: Leave the unmatched content in message.
            field: _temp.message
            target_field: message

So this is the reason why I got those two extra fields into Kibana.

I tried another test. From the input I used for the ingest pipeline simulation, I removed the [event][original] that has been added my the logstash pipeline:

        "event": {
          "timezone": "-04:00",
        },

And the result is different:

I still have the GROK parsing error:

        {
          "processor_type": "grok",
          "status": "error",
          "description": "Grok specific auth messages.",
          "tag": "grok-specific-messages",
          "error": {
            "root_cause": [
              {
                "type": "illegal_argument_exception",
                "reason": "Provided Grok expressions do not match field value: [New session 360 of user myuser.]"
              }
            ],
            "type": "illegal_argument_exception",
            "reason": "Provided Grok expressions do not match field value: [New session 360 of user myuser.]"
          }
        },

But the rename is working:

        {
          "processor_type": "rename",
          "status": "success",
          "description": "Leave the unmatched content in message.",
          "doc": {
            "_index": "_index",
            "_id": "_id",
            "_version": "-3",
            "_source": {
              [...]
              "message": "New session 360 of user myuser.",

Later there are two GROK parsing errors but ends with a status of error_ignored:

          "processor_type": "grok",
          "status": "error_ignored",
          "description": "Grok usernames from PAM messages.",
          "tag": "grok-pam-users",
          "if": {
            "condition": "ctx.message != null && ctx.message != \"\"",
            "result": true
          },
          "ignored_error": {
            "error": {
              "root_cause": [
                {
                  "type": "illegal_argument_exception",
                  "reason": "Provided Grok expressions do not match field value: [New session 360 of user myuser.]"
                }
              ],
              "type": "illegal_argument_exception",
              "reason": "Provided Grok expressions do not match field value: [New session 360 of user myuser.]"
            }
          },

But in that case, just because I removed [event][original] field, I don't have the two extra fields into Kibana and everything is looking fine.

So I will try to use another fields name to keep the original message into, but that won't be the ECS standard.

Can you test at your end if you have the same result?

Sorry again for that long reply!

Thanks
Yanick

Hi again @stephenb,

I just find out something else.

When adding [event][original] field from logstash and simulate a server logon to generate some logs, most of the event were running into a GROK match failure. But one of them was working well:

POST _ingest/pipeline/filebeat-8.6.2-system-auth-pipeline/_simulate?verbose=true
{
  "docs": [
    {
      "_source": {
        "@timestamp": "2023-09-18T20:24:30.514Z",
        "@metadata": {
          "beat": "filebeat",
          "type": "_doc",
          "version": "8.6.2",
          "pipeline": "filebeat-8.6.2-system-auth-pipeline",
          "raw_index": "filebeat-fsci-8.6.2-sys-linux"
        },
        "log": {
          "offset": 3441,
          "file": {
            "path": "/var/log/facultes/FSCI/fsci-secure.log"
          }
        },
        "tags": [
          "syslog",
          "FSCI",
          "auth",
          "preserve_original_event"
        ],
        "input": {
          "type": "filestream"
        },
        "ecs": {
          "version": "8.0.0"
        },
        "event": {
          "timezone": "-04:00"
        },
        "message": "Sep 18 08:29:17 dinf-miro sshd[289831]: Accepted password for myuser from 10.3.1.2 port 34480 ssh2",
        "fileset": {
          "name": "auth"
        },
        "agent": {
          "type": "filebeat",
          "version": "8.6.2",
          "ephemeral_id": "da03a79d-3ed9-403b-bfee-7c914baf9108",
          "id": "7f080758-7e4f-4d73-9109-2a05b9e1a0b7",
          "name": "syslogf.domain.com"
        },
        "host": {
          "name": "syslogf.domain.com"
        }
      }
    }
  ]
}

When running this into the ingest pipeline simulator, there was no error, except that my [event][original] was removed.

Looking at the devtool output, I saw this:

        {
          "processor_type": "remove",
          "status": "success",
          "if": {
            "condition": "ctx?.tags == null || !(ctx.tags.contains('preserve_original_event'))",
            "result": true
          },

So I added "preserve_original_event" into the tags from the filebeat.yml and it solve the issue:

        {
          "processor_type": "remove",
          "status": "skipped",
          "if": {
            "condition": "ctx?.tags == null || !(ctx.tags.contains('preserve_original_event'))",
            "result": false
          }
        }

From the pipeline code:

  - remove:
      field: event.original
      if: "ctx?.tags == null || !(ctx.tags.contains('preserve_original_event'))"
      ignore_failure: true
      ignore_missing: true

Just wondering why they are doing that! :thinking:

Thanks :grinning:

Because at scale saving indexing the event.original can add more storage requirements so we want to provide the option for well parsed data it may not provide much additional value.

We see this particularly is structured security events at scale.

1 Like

Hi @stephenb,

Sorry for the late reply!

Just wanna thank you for you help in this topic! I was able to make it working and the user is happy about the result!

Thanks again!
Yanick

1 Like

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.