Pushing partly JSON log to ES with filebeat

Hi!

Could someone give me some guidance, how to push audit.log which looks like below to ES?
In Elasticsearch I would like to have timestamp which is the first one on the line and then information from JSON part: event, user and data. It would be awesome if the data JSON could be also put as separate fields, but I dont know what are all the possible fields to make index template.

2020-02-19T08:53:29+00:00 ip-10-10-10-5 INFO  [Car Rental System] 2020-02-19 08:53:29+0000 - {"event":"Add client","user":"bob.johnson","data":{"clientName":"Coca-Foola","clientType":"COM","clientCode":"0170743762120"}}
2020-02-19T09:24:03+00:00 ip-10-10-10-5 INFO  [Car Rental System] 2020-02-19 09:24:03+0000 - {"event":"Log out user","user":"bob.johnson"}
2020-02-19T13:51:44+00:00 ip-10-10-10-5 INFO  [Car Rental System] 2020-02-19 13:51:44+0000 - {"event":"Log in user failed"}
2020-02-19T13:51:52+00:00 ip-10-10-10-5 INFO  [Car Rental System] 2020-02-19 13:51:52+0000 - {"event":"Log in user","user":"bob.johnson"}
2020-02-19T14:23:15+00:00 ip-10-10-10-5 INFO  [Car Rental System] 2020-02-19 14:23:15+0000 - {"event":"Log out user","user":"bob.johnson"}
2020-02-19T14:33:15+00:00 ip-10-10-10-5 INFO  [Car Rental System] 2020-02-19 14:33:15+0000 - {"event":"Log in user","user":"bob.johnson"}
2020-02-19T14:33:52+00:00 ip-10-10-10-5 INFO  [Car Rental System] 2020-02-19 14:33:52+0000 - {"event":"Register client rental","user":"bob.johnson","data":{"carCode":"fseff232fs","carClass":"SUV","licensePlate":"0170368015672","rentalInformation":{"Make":"Toyota","clientType":"COM","clientCode":"0170743762120","promotionCode":"ride-free"}}}
2020-02-19T14:33:57+00:00 ip-10-10-10-5 INFO  [Car Rental System] 2020-02-19 14:33:57+0000 - {"event":"Approve rental","user":"bob.johnson","data":{"carCode":"fseff232fs"}}

Hi @tuudik,

if your audit log is a file on disk I would suggest using Filebeat to ingest it using the log input. Assuming Filebeat then indexes your logs as documents with the message field containing strings as shown in your examples, you could could use an ingest pipeline like the following to parse it:

{
    "description": "rental events",
    "processors": [
      {
        "grok": {
          "field": "message",
          "patterns": [
            "%{TIMESTAMP_ISO8601:@timestamp}%{SPACE}%{HOSTNAME:host.hostname}%{SPACE}%{LOGLEVEL:log.level}%{SPACE}\\[%{DATA:service.name}\\] %{TIMESTAMP_ISO8601} - %{GREEDYDATA:payload}"
          ]
        }
      },
      {
        "json": {
          "field": "payload"
        }
      },
      {
        "rename": {
          "field": "payload.event",
          "target_field": "event.action"
        }
      },
      {
        "rename": {
          "field": "payload.user",
          "target_field": "user.name"
        }
      },
      {
        "rename": {
          "field": "payload.data",
          "target_field": "data"
        }
      },
      {
        "set": {
          "field": "event.module",
          "value": "car-rental"
        }
      },
      {
        "set": {
          "field": "event.type",
          "value": "change"
        }
      },
      {
        "set": {
          "field": "event.dataset",
          "value": "{{event.module}}.{{event.type}}"
        }
      },
      {
        "remove": {
          "field": [
            "payload"
          ]
        }
      }
    ]
  }

I would suggest to use ECS as a guide to choose field names and type as I have done in the pipeline above. This will make integration with other tools in the Elastic Stack easier and ease correlation with other data sources. To that end I would also propose to use an index template like the following:

{
  "index_patterns": [
    "logs-rental-*"
  ],
  "aliases": {
    "logs-rental": {
      "is_write_index": true
    }
  },
  "mappings": {
    "_source": {},
    "_meta": {},
    "dynamic_templates": [
      {
        "strings_as_keywords": {
          "match_mapping_type": "string",
          "mapping": {
            "type": "keyword"
          }
        }
      },
      {
        "data_as_keywords": {
          "path_match": "data.*",
          "mapping": {
            "type": "keyword"
          }
        }
      }
    ],
    "properties": {
      "@timestamp": {
        "type": "date"
      }
    }
  }
}

If you like thinks particularly clean you could then wrap all this up in your own Filebeat module.

Hope this can get you started. Let us know if you get stuck on anything specific.

btw, I used the grok debugger and the pipeline _simulate api to develop the examples above. They're pretty useful for development:

POST /_ingest/pipeline/_simulate
{
  "pipeline": {
    "description": "simulated pipeline",
    "processors": [
      /* ... */
    ]
  },
  "docs": [
    {
      "_index": "logs-rental-00001",
      "_id": "id1",
      "_source": {
        "message": """2020-02-19T14:33:52+00:00 ip-10-10-10-5 INFO  [Car Rental System] 2020-02-19 14:33:52+0000 - {"event":"Register client rental","user":"bob.johnson","data":{"carCode":"fseff232fs","carClass":"SUV","licensePlate":"0170368015672","rentalInformation":{"Make":"Toyota","clientType":"COM","clientCode":"0170743762120","promotionCode":"ride-free"}}}"""
      }
    },
    {
      "_index": "logs-rental-00001",
      "_id": "id2",
      "_source": {
        "message": """2020-02-19T08:53:29+00:00 ip-10-10-10-5 INFO  [Car Rental System] 2020-02-19 08:53:29+0000 - {"event":"Add client","user":"bob.johnson","data":{"clientName":"Coca-Foola","clientType":"COM","clientCode":"0170743762120"}}"""
      }
    }
  ]
}

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.