Create timestamp from nested JSON elements

Hi all,
I am indexing nested JSON files using Logstash and want to extract the timestamp from them. Here are a couple of examples.
Example 1

{
  "ip": "127.0.0.1",
  "data": {
    "postgres": {
      "status": "success",
      "protocol": "postgres",
      "result": {
        "supported_versions": "FATAL:  unsupported frontend protocol 0.0: server supports 1.0 to 3.0",
        "protocol_error": {
          "code": "0A000",
          "file": "postmaster.c",
          "line": "2071",
          "message": "unsupported frontend protocol 255.255: server supports 1.0 to 3.0",
          "routine": "ProcessStartupPacket",
          "severity": "FATAL",
          "severity_v": "FATAL"
        },
        "is_ssl": true
      },
      "timestamp": "2021-10-14T06:42:08Z"
    }
  }
}

Example 2

{
  "ip": "127.0.0.1",
  "data": {
    "ssh": {
      "status": "success",
      "protocol": "ssh",
      "result": {
        "server_id": {
          "raw": "SSH-2.0-OpenSSH_X.X",
          "version": "2.0",
          "software": "OpenSSH_X.X"
        },
        "algorithm_selection": {
          "dh_kex_algorithm": "curve25519-sha256@libssh.org",
          "host_key_algorithm": "ecdsa-sha2-nistp256",
          "client_to_server_alg_group": {
            "cipher": "aes128-ctr",
            "mac": "hmac-sha2-256",
            "compression": "none"
          },
          "server_to_client_alg_group": {
            "cipher": "aes128-ctr",
            "mac": "hmac-sha2-256",
            "compression": "none"
          }
        },
        "key_exchange": {
          "curve25519_sha256_params": {
            "server_public": "fnBsVPDLuMTsaQBUTii6/cBuG3+AUeDIFj4QlcdYqEM="
          }
        }
      },
      "timestamp": "2021-10-14T06:42:07Z"
    }
  }
}

As you can see, in both examples, the timestamp is inside nested JSON of the protocol.
Now, I was successful in accessing the timestamp in the first example by the following configuration;

filter {
	json {
		source => "message"
	}
	date {
		match => [ "[data][postgres][timestamp]", "ISO8601" ]
	}

	mutate {
		remove_field => ["message"]
	}
}

However, I am looking for a more viable solution. Is it possible to access the timestamp field by using indices instead of their names? Then, the timestamp would be [data][0][timestamp] (or something like this) and I wouldn't have to create an if statement for each protocol.

For posterity, OP was discussing this issue with me in a Gitter chat. Here is a summary:

Due to the unpredictable key names in the incoming data, using a pre-made filter such as date might prove difficult without using one condition per expected "protocol".

The solution that came up was to use a ruby filter to traverse the data safely:

data = event.get('[data]')
if data.is_a?(Hash)
    data.each do |key, value|
        if value.is_a?(Hash)
            ts = value.fetch('timestamp', nil)
            event.set('@timestamp', LogStash::Timestamp.parse_iso8601(ts)) if ts
        end
    end
end

This snippet can be either inlined inside the code attribute of the filter, or saved inside a file and referenced via the path attribute:

filter {

    ruby {
        # Either inline ...
        code => "
        data = event.get('[data]')
        if data.is_a?(Hash)
            ...
        end
        "

        # ... or from file.
        path => "my_scripts/protocol_timestamp.rb"
    }

}
1 Like

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.