Elasticsearch not parsing message string to JSON


(Chris) #1

Hi,

I am getting log messages to ES from logstash, which is parsed by the ingest pipeline in ES. These messages are actually the syslog filebeat messages.

The 'message' value is a string:

"message": [
      "Feb 21 09:33:55 ip-172-31-29-94 sudo:     root : TTY=pts/1 ; PWD=/var/log ; USER=root ; COMMAND=/bin/@kibana-highlighted-field@echo@/kibana-highlighted-field@ Hay version 8.0!"
    ]

This string is supposed to be parsed and converted to a JSON using the ingest pipeline like this:

"system": {
      "auth": {
        "hostname": "ip-xxxxxxxxxxxx",
        "sudo": {
          "tty": "pts/1",
          "pwd": "/home/ubuntu",
          "user": "root",
          "command": "/bin/echo Hay! 8.0"
        },
        "user": "ubuntu",
        "timestamp": "Feb 19 06:22:28"
      }

However this parsing and conversion to string is not happening and the message value stays as a String. Any light on this issue will be extremely helpful as I am out of options.

Below is the regexp in the ingest pipeline(relevant output of GET _ingest/pipeline):

{
  "filebeat-6.6.0-system-auth-pipeline" : {
    "description" : "Pipeline for parsing system authorisation/secure logs",
    "processors" : [
      {
        "grok" : {
          "field" : "message",
          "ignore_missing" : true,
          "pattern_definitions" : {
            "GREEDYMULTILINE" : "(.|\n)*"
          },
          "patterns" : [
            """%{SYSLOGTIMESTAMP:system.auth.timestamp} %{SYSLOGHOST:system.auth.hostname} sshd(?:\[%{POSINT:system.auth.pid}\])?: %{DATA:system.auth.ssh.event} %{DATA:system.auth.ssh.method} for (invalid user )?%{DATA:system.auth.user} from %{IPORHOST:system.auth.ssh.ip} port %{NUMBER:system.auth.ssh.port} ssh2(: %{GREEDYDATA:system.auth.ssh.signature})?""",
            """%{SYSLOGTIMESTAMP:system.auth.timestamp} %{SYSLOGHOST:system.auth.hostname} sshd(?:\[%{POSINT:system.auth.pid}\])?: %{DATA:system.auth.ssh.event} user %{DATA:system.auth.user} from %{IPORHOST:system.auth.ssh.ip}""",
            """%{SYSLOGTIMESTAMP:system.auth.timestamp} %{SYSLOGHOST:system.auth.hostname} sshd(?:\[%{POSINT:system.auth.pid}\])?: Did not receive identification string from %{IPORHOST:system.auth.ssh.dropped_ip}""",
            """%{SYSLOGTIMESTAMP:system.auth.timestamp} %{SYSLOGHOST:system.auth.hostname} sudo(?:\[%{POSINT:system.auth.pid}\])?: \s*%{DATA:system.auth.user} :( %{DATA:system.auth.sudo.error} ;)? TTY=%{DATA:system.auth.sudo.tty} ; PWD=%{DATA:system.auth.sudo.pwd} ; USER=%{DATA:system.auth.sudo.user} ; COMMAND=%{GREEDYDATA:system.auth.sudo.command}""",
            """%{SYSLOGTIMESTAMP:system.auth.timestamp} %{SYSLOGHOST:system.auth.hostname} groupadd(?:\[%{POSINT:system.auth.pid}\])?: new group: name=%{DATA:system.auth.groupadd.name}, GID=%{NUMBER:system.auth.groupadd.gid}""",
            """%{SYSLOGTIMESTAMP:system.auth.timestamp} %{SYSLOGHOST:system.auth.hostname} useradd(?:\[%{POSINT:system.auth.pid}\])?: new user: name=%{DATA:system.auth.useradd.name}, UID=%{NUMBER:system.auth.useradd.uid}, GID=%{NUMBER:system.auth.useradd.gid}, home=%{DATA:system.auth.useradd.home}, shell=%{DATA:system.auth.useradd.shell}$""",
            """%{SYSLOGTIMESTAMP:system.auth.timestamp} %{SYSLOGHOST:system.auth.hostname}? %{DATA:system.auth.program}(?:\[%{POSINT:system.auth.pid}\])?: %{GREEDYMULTILINE:system.auth.message}"""
          ]
        }
      },
      {
        "remove" : {
          "field" : "message"
        }
      },
      {
        "date" : {
          "field" : "system.auth.timestamp",
          "target_field" : "@timestamp",
          "formats" : [
            "MMM  d HH:mm:ss",
            "MMM dd HH:mm:ss"
          ],
          "ignore_failure" : true
        }
      },
      {
        "geoip" : {
          "field" : "system.auth.ssh.ip",
          "target_field" : "system.auth.ssh.geoip",
          "ignore_failure" : true
        }
      }
    ],
    "on_failure" : [
      {
        "set" : {
          "field" : "error.message",
          "value" : "{{ _ingest.on_failure_message }}"
        }
      }
    ]
  }
}

Thanks,
Chris.


(Jake Landis) #2

I tried to reproduce with the _simulate API, but the pipeline seems to work ( In Kibana Console format)

POST _ingest/pipeline/_simulate
{
  "docs": [
    {
      "_source": {
        "message": "Feb 21 09:33:55 ip-172-31-29-94 sudo:     root : TTY=pts/1 ; PWD=/var/log ; USER=root ; COMMAND=/bin/@kibana-highlighted-field@echo@/kibana-highlighted-field@ Hay version 8.0!"
      }
    }
  ],
  "pipeline": {
    "processors": [
      {
        "grok": {
          "field": "message",
          "ignore_missing": true,
          "pattern_definitions": {
            "GREEDYMULTILINE": "(.|\n)*"
          },
          "patterns": [
            """%{SYSLOGTIMESTAMP:system.auth.timestamp} %{SYSLOGHOST:system.auth.hostname} sshd(?:\[%{POSINT:system.auth.pid}\])?: %{DATA:system.auth.ssh.event} %{DATA:system.auth.ssh.method} for (invalid user )?%{DATA:system.auth.user} from %{IPORHOST:system.auth.ssh.ip} port %{NUMBER:system.auth.ssh.port} ssh2(: %{GREEDYDATA:system.auth.ssh.signature})?""",
            """%{SYSLOGTIMESTAMP:system.auth.timestamp} %{SYSLOGHOST:system.auth.hostname} sshd(?:\[%{POSINT:system.auth.pid}\])?: %{DATA:system.auth.ssh.event} user %{DATA:system.auth.user} from %{IPORHOST:system.auth.ssh.ip}""",
            """%{SYSLOGTIMESTAMP:system.auth.timestamp} %{SYSLOGHOST:system.auth.hostname} sshd(?:\[%{POSINT:system.auth.pid}\])?: Did not receive identification string from %{IPORHOST:system.auth.ssh.dropped_ip}""",
            """%{SYSLOGTIMESTAMP:system.auth.timestamp} %{SYSLOGHOST:system.auth.hostname} sudo(?:\[%{POSINT:system.auth.pid}\])?: \s*%{DATA:system.auth.user} :( %{DATA:system.auth.sudo.error} ;)? TTY=%{DATA:system.auth.sudo.tty} ; PWD=%{DATA:system.auth.sudo.pwd} ; USER=%{DATA:system.auth.sudo.user} ; COMMAND=%{GREEDYDATA:system.auth.sudo.command}""",
            """%{SYSLOGTIMESTAMP:system.auth.timestamp} %{SYSLOGHOST:system.auth.hostname} groupadd(?:\[%{POSINT:system.auth.pid}\])?: new group: name=%{DATA:system.auth.groupadd.name}, GID=%{NUMBER:system.auth.groupadd.gid}""",
            """%{SYSLOGTIMESTAMP:system.auth.timestamp} %{SYSLOGHOST:system.auth.hostname} useradd(?:\[%{POSINT:system.auth.pid}\])?: new user: name=%{DATA:system.auth.useradd.name}, UID=%{NUMBER:system.auth.useradd.uid}, GID=%{NUMBER:system.auth.useradd.gid}, home=%{DATA:system.auth.useradd.home}, shell=%{DATA:system.auth.useradd.shell}$""",
            """%{SYSLOGTIMESTAMP:system.auth.timestamp} %{SYSLOGHOST:system.auth.hostname}? %{DATA:system.auth.program}(?:\[%{POSINT:system.auth.pid}\])?: %{GREEDYMULTILINE:system.auth.message}"""
          ]
        }
      },
      {
        "remove": {
          "field": "message"
        }
      },
      {
        "date": {
          "field": "system.auth.timestamp",
          "target_field": "@timestamp",
          "formats": [
            "MMM  d HH:mm:ss",
            "MMM dd HH:mm:ss"
          ],
          "ignore_failure": true
        }
      },
      {
        "geoip": {
          "field": "system.auth.ssh.ip",
          "target_field": "system.auth.ssh.geoip",
          "ignore_failure": true
        }
      }
    ],
    "on_failure": [
      {
        "set": {
          "field": "error.message",
          "value": "{{ _ingest.on_failure_message }}"
        }
      }
    ]
  }
}

Results in

{
  "docs" : [
    {
      "doc" : {
        "_index" : "_index",
        "_type" : "_doc",
        "_id" : "_id",
        "_source" : {
          "system" : {
            "auth" : {
              "hostname" : "ip-172-31-29-94",
              "sudo" : {
                "tty" : "pts/1",
                "pwd" : "/var/log",
                "user" : "root",
                "command" : "/bin/@kibana-highlighted-field@echo@/kibana-highlighted-field@ Hay version 8.0!"
              },
              "user" : "root",
              "timestamp" : "Feb 21 09:33:55"
            }
          },
          "@timestamp" : "2019-02-21T09:33:55.000Z"
        },
        "_ingest" : {
          "timestamp" : "2019-03-15T15:38:01.123254Z"
        }
      }
    }
  ]
}

Can you try that on your system ? I wonder if there is something else going on here ?


(Chris) #3

@jakelandis I tried the simulate API and as you say it works when I use the simulate API as given by you. Is there anything else that I can check?

Thanks,
Chris.