Dissect Logs

I simply want to turn key:value data from a message string into fields in Elastic, so I can analyze the logs with Kibana.

I'd like to use Dissect to do the following:

Dissecting this log:

conn=10 op=221996 RESULT err=0 tag=101 nentries=1 wtime=0.000062880 optime=0.000082472 etime=0.000143846

With this Dissect Filter:

conn=%{conn} op=%{op} RESULT err=%{} tag=%{} nentries=%{} wtime=%{wtime} optime=%{optime} etime=%{etime}

Hopefully gives this result:

  • conn=10
  • op=221996
  • wtime=0.000062880
  • optime=0.000082472
  • etime=0.000143846

But how would I do this? Is there an example of this somewhere?
How do I make sure it only targets logs from specific hosts and services, and doesn't try to dissect ALL logs going into Elastic.

There is almost too much documentation. I got confused reading through these articles. It was too advanced and too much info for what I am trying to do. I just need a very very simple example to start with, so I can understand:

A simple example like this would maybe be nice:
"If you place this code, in this file, and this log is ingested, then the following log would be the result in Elastic."
I need to understand a basic use-case first, before I can understand the advanced documentation.

I'd use Grok for this in an Ingest Pipeline. You can run this in Dev Tools in Kibana to see what it does.

POST _ingest/pipeline/_simulate
{
  "pipeline": {
    "processors": [
      {
        "grok": {
          "field": "message",
          "patterns": [
            "conn=%{INT:conn} op=%{INT:op} RESULT err=%{INT:err} tag=%{INT:tag} nentries=%{INT:nentries} wtime=%{NUMBER:wtime} optime=%{NUMBER:optime} etime=%{NUMBER:etime}"
          ]
        }
      },
      {
        "convert": {
          "field": "op",
          "type": "integer"
        }
      },
      {
        "convert": {
          "field": "nentries",
          "type": "integer"
        }
      },
      {
        "convert": {
          "field": "conn",
          "type": "integer"
        }
      },
      {
        "convert": {
          "field": "err",
          "type": "integer"
        }
      },
      {
        "convert": {
          "field": "tag",
          "type": "integer"
        }
      }
    ]
  },
  "docs": [
    {
      "_source": {
        "message": "conn=10 op=221996 RESULT err=0 tag=101 nentries=1 wtime=0.000062880 optime=0.000082472 etime=0.000143846"
      }
    }
  ]
}
1 Like

I would actually recommend the kv filter over both dissect or grok for this use-case. It is purpose built for key-value data.

1 Like

I tried KV but there is a RESULT string in the middle of the message that throws it off. Maybe you could remove it first then KV.

1 Like

How do I know if I am reading Documentation about a Logstash Pipeline or an Elastic pipeline?
I wasn't even aware there was any different until just now.
I want to make a Logstash Pipeline. How do I test Logstash Pipelines? I think the Dev Tools console is for Elastic Pipelines.

You posted in the Elasticsearch category vs Logstash so was assuming you were talking about ingest pipelines.

In Logstash you will use the Grok Filter with the same pattern I posted above.

filter {
 grok {
  match => { "message": "conn=10 op=221996 RESULT err=0 tag=101 nentries=1 wtime=0.000062880 optime=0.000082472 etime=0.000143846" }
 }
}
1 Like

Thanks a lot Aaron. You're a god. :heart:
Never tried receiving quality help this fast before. Only when posting here does this happen.

I managed to make this ingest pipeline in Elastic Dev Tools.
But apparently my team wants me to make it as a Logstash pipeline instead of Elastic.
Kind of makes me wonder why the syntax isn't identical for Logstash and Elastic pipelines. Seems kinda weird that they're part of the same stack, and to me seem like they serve an identical purpose, but have different syntax. Thats just some user feedback I guess.

Is there a Dev Tool like this for debugging and developing Logstash pipelines? Where I can get syntax help, and can see the results of the filters/mutations against a few test logs?

POST _ingest/pipeline/_simulate
{
  "pipeline": {
    "description": "IDM Server Access Logs",
    "processors": [
      {
        "grok": {
          "description": "Match Access Logs from IDM",
          "field": "message",
          "patterns": [
            "[%{SYSLOGBASE:timestamptmp}]%{SPACE}conn=%{NUMBER:conn}%{SPACE}fd=%{NUMBER:fd}%{SPACE}slot=%{NUMBER:slot}%{SPACE}connection%{SPACE}from%{SPACE}%{IP:source.ip}%{SPACE}to%{SPACE}%{IP:destination.ip}",
            "[%{SYSLOGBASE:timestamptmp}]%{SPACE}conn=%{NUMBER:conn}%{SPACE}op=%{NUMBER:op}%{SPACE}%{WORD:response}%{SPACE}%{GREEDYDATA:message}"
          ],
          "pattern_definitions": {
            "DIR_389_DATE": "%{MONTHDAY}/%{MONTH}/%{YEAR}:%{TIME}%{SPACE}%{INT}"
          }
        }
      },
      {
        "convert": {
          "field": "conn",
          "type": "integer"
        }
      },
      {
        "convert": {
          "if": "ctx?.response != null",
          "field": "op",
          "type": "integer"
        }
      },
      {
        "kv": {
          "description": "Key=Value Filter",
          "if": "ctx?.response == 'RESULT' || ctx?.response == 'BIND'",
          "field": "message",
          "field_split": " ",
          "value_split": "="
        }
      },
      {
        "grok": {
          "description": "Parsing SRCH",
          "if": "ctx?.response == 'SRCH'",
          "field": "message",
          "patterns": [
            "base=\"%{GREEDYDATA:search_base}\"%{SPACE}scope=%{NUMBER:scope}%{SPACE}filter=\"%{GREEDYDATA:filter}\"%{SPACE}attrs=\"%{GREEDYDATA:attrs}\""
          ]
        }
      }
    ]
  },
  "docs": [
    {
      "_source": {
        "message": "[07/Oct/2021:11:42:16.992012759 +0000] conn=17 op=23 VLV 0:2147483647:A 0:0 (0)"
      }
    },
    {
      "_source": {
        "message": "[07/Oct/2021:12:20:22.195029736 +0000] conn=65 op=0 BIND dn=\"\" method=sasl version=3 mech=GSS-SPNEGO"
      }
    },
    {
      "_source": {
        "message": "[07/Oct/2021:12:20:21.389966613 +0000] conn=63 op=4 fd=100 closed - U1"
      }
    },
    {
      "_source": {
        "message": "[07/Oct/2021:12:20:21.782680560 +0000] conn=64 fd=100 slot=100 connection from 192.168.32.31 to 192.168.32.31"
      }
    },
    {
      "_source": {
        "message": "[07/Oct/2021:12:20:21.792684713 +0000] conn=64 op=4 UNBIND"
      }
    },
    {
      "_source": {
        "message": "[07/Oct/2021:12:18:17.317416222 +0000] conn=62 op=8 RESULT err=0 tag=101 nentries=1 wtime=0.000091377 optime=0.000094949 etime=0.000185034"
      }
    },
    {
      "_source": {
        "message": "[07/Oct/2021:12:20:21.378172479 +0000] conn=9 op=168 SRCH base=\"cn=ipaConfig,cn=etc,dc=vagrant,dc=dap,dc=cfcs,dc=dk\" scope=0 filter=\"(objectClass=*)\" attrs=\"ipaConfigString ipaKrbAuthzData ipaUserAuthType\""
      }
    },
    {
      "_source": {
        "message": "[07/Oct/2021:12:20:21.383878855 +0000] conn=63 op=0 RESULT err=0 tag=97 nentries=0 wtime=0.000095614 optime=0.002627546 etime=0.002722173 dn=\"uid=admin,cn=users,cn=accounts,dc=vagrant,dc=dap,dc=cfcs,dc=dk\""
      }
    },
    {
      "_source": {
        "message": "[07/Oct/2021:12:20:21.828424794 +0000] conn=8 op=218 SRCH base=\"dc=vagrant,dc=dap,dc=cfcs,dc=dk\" scope=2 filter=\"(&(|(objectClass=krbprincipalaux)(objectClass=krbprincipal)(objectClass=ipakrbprincipal))(|(ipaKrbPrincipalAlias=admin@VAGRANT.DAP.CFCS.DK)(krbPrincipalName:caseIgnoreIA5Match:=admin@VAGRANT.DAP.CFCS.DK)))\" attrs=\"krbPrincipalName krbCanonicalName krbUPEnabled krbPrincipalKey krbTicketPolicyReference krbPrincipalExpiration krbPasswordExpiration krbPwdPolicyReference krbPrincipalType krbPwdHistory krbLastPwdChange krbPrincipalAliases krbLastSuccessfulAuth krbLastFailedAuth krbLoginFailedCount krbPrincipalAuthInd krbExtraData krbLastAdminUnlock krbObjectReferences krbTicketFlags krbMaxTicketLife krbMaxRenewableAge nsAccountLock passwordHistory ipaKrbAuthzData ipaUserAuthType ipatokenRadiusConfigLink objectClass\""
      }
    }
  ]
}

Logstash is a standalone program separate from Elasticsearch.

I would first find how these logs are being ingested currently. If they are through Logstash then you need to find the .conf files so you can make edits.

After you locate it and need assistance applying the filter I posted above it would be best to create a new topic under the Logstash category so someone can continue to assist.

1 Like

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.