Syslog Ingest Pipeline not targeting data

We're trying to utilize ingest pipelines for some of our Filebeat data and the pipeline doesn't seem to processing any events. We've run this through the grok parser and that provides us with the correct output so I'm not sure what else to do. At this time we're trying to extract the eps values from the message field of the raw syslogs. If you need any information let me know and I'll post it. Any help would be appreciated.

Filebeat Config:

filebeat.inputs
- type: syslog
  enabled: true
  format: auto
  protocol.tcp:
    host: "0.0.0.0:5000"

setup.template.name: “syslog-testing"
setup.template.pattern: "syslog-testing*"

output.elasticsearch:
  hosts: ["https://abc.gov:9243"]
  index: "syslog-testing"
  username: "<redacted>"
  password: "<redacted>"
  pipeline: "qradar-eps"

Pipeline:

{
"qradar-eps": {
"processors": [
{
"grok": {
"field": "message",
"patterns": [
"""(%{NUMBER:time1}s: %{NUMBER:eps1} eps), (%{NUMBER:time2}s: %{NUMBER:eps2} eps), (%{NUMBER:time3}s: %{NUMBER:eps3} eps), (%{NUMBER:time4}s: %{NUMBER:eps4} eps), (%{NUMBER:time5}s: %{NUMBER:eps5} eps), (%{NUMBER:time6}s: %{NUMBER:eps6} eps), (%{NUMBER:time7}s: %{NUMBER:eps7} eps)"""
]
}
}
]
}
}

Example log:

{
  "_index": ".ds-syslog-testing-2023.05.10-000001",
  "_id": "9lgdB4gBYplAgGeV2-L4",
  "_version": 1,
  "_score": 0,
  "_source": {
    "@timestamp": "2023-05-10T19:22:56.000Z",
    "input": {
      "type": "syslog"
    },
    "ecs": {
      "version": "8.0.0"
    },
    "agent": {
      "ephemeral_id": "f6f23923-e832-459e-8824-20a0e0b79751",
      "id": "e7faf270-e51f-4fae-9a04-720d54908df8",
      "name": "filebeat.log",
      "type": "filebeat",
      "version": "8.7.1"
    },
    "hostname": "dlc-1056",
    "syslog": {
      "facility": 16,
      "facility_label": "local0",
      "priority": 134,
      "severity_label": "Informational"
    },
    "event": {
      "severity": 6
    },
    "log": {
      "source": {
        "address": "10.1.1.1:1000"
      }
    },
    "message": "abc123-log 2023-05-10 15:22:49,873 [3a01ef7b-7961-4bc9-a73a-3ab9fec2b36b/SequentialEventDispatcher] com.q1labs.sem.monitors.SourceMonitor: [INFO] [NOT:0000006000][10.1.1.1/- -] [-/- -]Incoming raw event rate (5s: 0.40 eps), (10s: 0.40 eps), (15s: 0.40 eps), (30s: 0.40 eps), (60s: 0.47 eps), (300s: 0.48 eps), (900s: 0.48 eps). Peak in the last 60s: 1.00 eps. Max Seen 18.40 eps. DLC Throttles/5s (60s: 0.00). Total DLC Throttles in the last 60s: 0. Total DLC Throttles: 0. DLC Threshold: 1.0E8. ",
    "tags": [
      "abc456_testing"
    ],
    "host": {
      "os": {
        "version": "7.9 (Maipo)",
        "family": "redhat",
        "name": "Red Hat Enterprise Linux Server",
        "kernel": "3.10.0-1160.83.1.el7.x86_64",
        "codename": "Maipo",
        "type": "linux",
        "platform": "rhel"
      },
      "id": "4ad693cbe0e9404bad767e038a80d05f",
      "containerized": false,
      "ip": [
        "10.115.59.173"
      ],
      "mac": [
        "00-50-56-A6-53-15"
      ],
      "name": "abc456",
      "hostname": "abc456",
      "architecture": "x86_64"
    }
  },
  "fields": {
    "@timestamp": [
      "2023-05-10T19:22:56.000Z"
    ]
  }
}

Part of the response:

GET _nodes/stats/ingest?filter_path=nodes.*.ingest
          "qradar-eps": {
            "count": 0,
            "time_in_millis": 0,
            "current": 0,
            "failed": 0,
            "processors": [
              {
                "grok": {
                  "type": "grok",
                  "stats": {
                    "count": 0,
                    "time_in_millis": 0,
                    "current": 0,
                    "failed": 0
                  }
                }
              }
            ]
          }

What happens when you add a on_failure section to your ingest pipeline?

"on_failure": [
        {
          "append": {
            "field": "error.message",
            "value": "{{{_ingest.on_failure_message}}}"
          }
        }
      ]

No luck so far.

          "qradar-eps": {
            "count": 0,
            "time_in_millis": 0,
            "current": 0,
            "failed": 0,
            "processors": [
              {
                "grok": {
                  "type": "grok",
                  "stats": {
                    "count": 0,
                    "time_in_millis": 0,
                    "current": 0,
                    "failed": 0
                  }
                }
              }
            ]
          }
GET _ingest/pipeline/qradar-eps
{
  "qradar-eps": {
    "processors": [
      {
        "grok": {
          "field": "message",
          "patterns": [
            """\(%{NUMBER:time1}s: %{NUMBER:eps1} eps\), \(%{NUMBER:time2}s: %{NUMBER:eps2} eps\), \(%{NUMBER:time3}s: %{NUMBER:eps3} eps\), \(%{NUMBER:time4}s: %{NUMBER:eps4} eps\), \(%{NUMBER:time5}s: %{NUMBER:eps5} eps\), \(%{NUMBER:time6}s: %{NUMBER:eps6} eps\), \(%{NUMBER:time7}s: %{NUMBER:eps7} eps\)"""
          ],
          "on_failure": [
            {
              "append": {
                "field": "error.message",
                "value": "{{{_ingest.on_failure_message}}}"
              }
            }
          ]
        }
      }
    ]
  }
}

@Ryan_Downey I see that your grok pattern matches your example but only for the part of the EPS values. So there is no "match" for the entire message field.
Could you try and use the following grok pattern and then just ignore/drop the drop_start and drop_end fields?

(%{GREEDYDATA:drop_start} )\(%{NUMBER:time1}s: %{NUMBER:eps1} eps\), \(%{NUMBER:time2}s: %{NUMBER:eps2} eps\), \(%{NUMBER:time3}s: %{NUMBER:eps3} eps\), \(%{NUMBER:time4}s: %{NUMBER:eps4} eps\), \(%{NUMBER:time5}s: %{NUMBER:eps5} eps\), \(%{NUMBER:time6}s: %{NUMBER:eps6} eps\), \(%{NUMBER:time7}s: %{NUMBER:eps7} eps\)(.%{GREEDYDATA:drop_end})

Something else, have you tried linking the ingest pipeline to the index template? So events get processed by that ingest pipeline without you having to name the pipeline in your filebeat output.

1 Like

Anton,

Sorry for the delay, I forgot to hit the reply button. I've update the template and the grok pattern so we're now getting the data in the new fields populating. I appreciate the help getting this sorted out.

Ryan

This topic was automatically closed 14 days after the last reply. New replies are no longer allowed.