.JSON Conf File for Logstash

Hello,

I've been trying to configure this .conf file to help parse out .json files correctly. This script is able to ingest Google Cloud Audit Logs (in .json), but fails to parse it correctly:

input {
      # stdin {}
  file {
    type => "json"
    path => "E:/Evidence-Collection/Test-Collection/*.json"
  sincedb_path => "C:/Users/forensic-user/Documents/Elastic-Kibana/logstash-8.9.1/data/queue/sincedb_json_ingest_.log"
  mode => "read"
  codec => "json"
  file_completed_action => "log"
  file_completed_log_path => "C:/Users/forensic-user/Documents/Elastic-Kibana/logstash-8.9.1/logs/logstash-json_logs_read.log"
  }
}

filter {

	geoip {
		      source => "sourceIPAddress"
		      target => "geoip"
		      add_tag => ["cloudtrail-geoip"]
          ecs_compatibility => "disabled"
		      }
}

output {
      stdout  {
      codec => rubydebug
}
  elasticsearch {
		hosts => ["http://X.X.X.X:9200"]
    index => "gcp-test"
	}
}

So then I read into SOF-ELK to find the config files they use to help filter the information and rename the fields, which here is the finished conf file that I'm trying to use:

input {
  file {
    type => "json"
    path => "E:/Evidence-Collection/Test-Collection/*.json"
    sincedb_path => "C:/Users/forensic-user/Documents/Elastic-Kibana/logstash-8.9.1/data/queue/sincedb_json_ingest_.log"
    mode => "read"
    codec => "json"
    file_completed_action => "log"
    file_completed_log_path => "C:/Users/forensic-user/Documents/Elastic-Kibana/logstash-8.9.1/logs/logstash-json_logs_read.log"
    }
}

filter {
  if [type] == "gcp" {
    date {
      match => [ "[raw][timestamp]", "ISO8601" ]
    }

    mutate {
      ecs_compatibility => "disabled"
      rename => {
        "[raw][logName]" => "log_name"
        "[raw][severity]" => "severity"
        "[raw][insertId]" => "gcp_log_id"
        "[raw][protoPayload][requestMetadata][callerIp]" => "source_ip"
        "[raw][protoPayload][requestMetadata][callerSuppliedUserAgent]" => "useragent"
        "[raw][protoPayload][@type]" => "event_type"
        "[raw][protoPayload][@type][status][message]" => "status_message"
        "[raw][protoPayload][authenticationInfo][principalEmail]" => "username"
        "[raw][protoPayload][serviceName]" => "service_name"
        "[raw][protoPayload][authorizationInfo]" => "authorization_info"
        "[raw][protoPayload][methodName]" => "method_name"
        "[raw][protoPayload][resourceName]" => "resource_name"
        "[raw][protoPayload][request][service_account][display_name]" => "service_account_name"
        "[raw][protoPayload][request][service_account][description]" => "service_account_description"
        "[raw][protoPayload][response][email]" => "account_email"
        "[raw][protoPayload][response][unique_id]" => "account_id"
        "[raw][jsonPayload][message]" => "system_message"
        "[raw][jsonPayload][connection][dest_ip]" => "destination_ip"
        "[raw][jsonPayload][connection][dest_port]" => "destination_port"
        "[raw][jsonPayload][connection][protocol]" => "protocol"
        "[raw][jsonPayload][connection][src_ip]" => "source_ip"
        "[raw][jsonPayload][connection][src_port]" => "source_port"
        "[raw][jsonPayload][disposition]" => "disposition"
        "[raw][jsonPayload][instance][project_id]" => "vpc_project_id"
        "[raw][jsonPayload][instance][region]" => "vpc_region"
        "[raw][jsonPayload][instance][vm_name]" => "vm_name"
        "[raw][jsonPayload][instance][zone]" => "resource_zone"
        "[raw][jsonPayload][rule_details][action]" => "firewall_action"
        "[raw][jsonPayload][rule_details][direction]" => "firewall_direction"
        "[raw][jsonPayload][rule_details][ip_port_info][ip_protocol]" => "firewall_rule_protocol"
        "[raw][jsonPayload][rule_details][ip_port_info][port_range]" => "firewall_rule_ports"
        "[raw][jsonPayload][rule_details][priority]" => "firewall_rule_priority"
        "[raw][jsonPayload][rule_details][reference]" => "firewall_rule_reference"
        "[raw][jsonPayload][rule_details][source_range]" => "firewall_rule_source_ranges"
        "[raw][jsonPayload][rule_details][target_tag]" => "firewall_rule_target_tags"
        "[raw][jsonPayload][vpc][subnetwork_name]" => "subnetwork_name"
        "[raw][jsonPayload][vpc][vpc_name]" => "vpc_name"
        "[raw][textPayload]" => "text_payload"
        "[raw][labels][compute.googleapis.com/resource_name]" => "compute_resource_name"
        "[raw][resource][type]" => "resource_type"
        "[raw][resource][labels][bucket_name]" => "bucket_name"
        "[raw][resource][labels][location]" => "resource_location"
        "[raw][resource][labels][zone]" => "resource_zone"
        "[raw][resource][labels][project_id]" => "project_id"
        "[raw][resource][labels][instance_id]" => "instance_id"
        "[raw][protoPayload][serviceData][policyDelta][bindingDeltas]" => "policy_deltas"
        "[raw][protoPayload][requestMetadata][destinationAttributes][ip]" => "destination_ip"
        "[raw][protoPayload][requestMetadata][destinationAttributes][port]" => "destination_port"
      }
    #  add_tag => [ "gcp_log" ]
    }

    # remove remaining fields
    mutate {
      remove_field => [ "raw" ]
    }

    # split authorization_info out into authorization_permissions - but keep the original intact
    if [authorization_info] {
      ruby {
        path => "C:\Users\forensic-user\Documents\Elastic-Kibana\logstash-8.9.1\split_gcp_authinfo_fields.rb"
        script_params => {
          "source_field" => "[authorization_info]"
          "destination_field" => "[authorization_permissions]"
          "key_field" => "permission"
        }
      }
    }
  }
}

output {
    stdout  {
        codec => rubydebug
    }
    elasticsearch {
	    hosts => ["http://X.X.X.X:9200"]
      index => "gcp-test-2"
    }
}

I'm trying to figure out what I'm doing wrong within this code since it keeps going into a constant loop of:

[2023-09-22T15:42:32,016][DEBUG][org.logstash.execution.PeriodicFlush][main] Pushing flush onto pipeline.
[2023-09-22T15:42:32,185][DEBUG][logstash.instrument.periodicpoller.jvm] collector name {:name=>"G1 Young Generation"}
[2023-09-22T15:42:32,186][DEBUG][logstash.instrument.periodicpoller.jvm] collector name {:name=>"G1 Old Generation"}
[2023-09-22T15:42:35,397][DEBUG][logstash.instrument.periodicpoller.cgroup] One or more required cgroup files or directories not found: /proc/self/cgroup, /sys/fs/cgroup/cpuacct, /sys/fs/cgroup/cpu
[2023-09-22T15:42:37,015][DEBUG][org.logstash.execution.PeriodicFlush][main] Pushing flush onto pipeline.
[2023-09-22T15:42:37,201][DEBUG][logstash.instrument.periodicpoller.jvm] collector name {:name=>"G1 Young Generation"}
[2023-09-22T15:42:37,201][DEBUG][logstash.instrument.periodicpoller.jvm] collector name {:name=>"G1 Old Generation"}
[2023-09-22T15:42:38,120][DEBUG][filewatch.sincedbcollection][main][8cdf346204be2079af0234f08541de4924cba983224b0294c7a9d5e986affa10] writing sincedb (delta since last write = 15)
[2023-09-22T15:42:40,400][DEBUG][logstash.instrument.periodicpoller.cgroup] One or more required cgroup files or directories not found: /proc/self/cgroup, /sys/fs/cgroup/cpuacct, /sys/fs/cgroup/cpu
[2023-09-22T15:42:42,015][DEBUG][org.logstash.execution.PeriodicFlush][main] Pushing flush onto pipeline.
[2023-09-22T15:42:42,216][DEBUG][logstash.instrument.periodicpoller.jvm] collector name {:name=>"G1 Young Generation"}
[2023-09-22T15:42:42,216][DEBUG][logstash.instrument.periodicpoller.jvm] collector name {:name=>"G1 Old Generation"}

Please let me know how I can fix this script in order to successfully ingest the information.

Hello,

Do you have any ERROR or WARN logs where it shows the parse is failing? None of the logs you shared are relevant for your issue.

I would suggest that you disable DEBUG log in logstash as it will be very noisy.

If you have a parsing error it will be logged as a WARN, no need to use DEBUG.

It will not let me post my screenshots; however, within the logs it will show "_jsonparsefailure" within the log and that is it (besides the timestamp, etc.).

The tag _jsonparsefailure means that the json filter or codec was not able to parse the message as a json because it is not a valid json.

You need to share the json file you are trying to parse and the logs you are getting in logstash, in plain text please, avoid sharing screen shots.

Also, is your json file one json document per line or the json is pretty printed on it? It needs to be one json document per line.

[
  {
    "protoPayload": {
      "@type": "type.googleapis.com/google.cloud.audit.AuditLog",
      "status": {},
      "authenticationInfo": {
        "principalEmail": "example@gmail.com"
      },
      "requestMetadata": {
        "callerIp": "1.1.1.1",
        "callerSuppliedUserAgent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/115.0.0.0 Safari/537.36,gzip(gfe),gzip(gfe)",
        "requestAttributes": {},
        "destinationAttributes": {}
      },
      "serviceName": "serviceusage.googleapis.com",
      "methodName": "google.longrunning.Operations.GetOperation",
      "authorizationInfo": [
        {
          "resource": "projectnumbers/0123456789",
          "permission": "serviceusage.services.enable",
          "granted": true,
          "resourceAttributes": {}
        },
        {
          "resource": "projectnumbers/0123456789",
          "permission": "serviceusage.services.enable",
          "granted": true,
          "resourceAttributes": {}
        },
        {
          "resource": "projectnumbers/0123456789/operations/-",
          "permission": "serviceusage.operations.get",
          "granted": true,
          "resourceAttributes": {}
        },
        {
          "resource": "projectnumbers/0123456789/services/-",
          "permission": "serviceusage.services.get",
          "granted": true,
          "resourceAttributes": {}
        }
      ],
      "resourceName": "projects/0123456789/operations/acf.p2-0123456789-xxxxxx-xxxxxx-xxxx-xxxx-xxxxxxxxxxxxx"
    },
    "insertId": "fjp5t1d3yvz",
    "resource": {
      "type": "audited_resource",
      "labels": {
        "service": "serviceusage.googleapis.com",
        "method": "google.longrunning.Operations.GetOperation",
        "project_id": "marine-storm-123456789"
      }
    },
    "timestamp": "2023-08-11T18:29:50.004372Z",
    "severity": "NOTICE",
    "logName": "projects/marine-storm-337223/logs/cloudaudit.googleapis.com%2Factivity",
    "receiveTimestamp": "2023-08-11T18:29:50.745614622Z"
  },

Here is the data from Logstash, into Elasticsearch:

{
  "@timestamp": [
    "2023-09-13T14:19:06.518Z"
  ],
  "@version": [
    "1"
  ],
  "@version.keyword": [
    "1"
  ],
  "host.name": [
    "windows20server"
  ],
  "host.name.keyword": [
    "windows20server"
  ],
  "log.file.path": [
    "E:/Evidence-Collection/Test-Collection/downloaded-logs-20230811-130500.json"
  ],
  "log.file.path.keyword": [
    "E:/Evidence-Collection/Test-Collection/downloaded-logs-20230811-130500.json"
  ],
  "message": [
    "    \"receiveTimestamp\": \"2023-08-11T18:56:49.425765464Z\""
  ],
  "message.keyword": [
    "    \"receiveTimestamp\": \"2023-08-11T18:56:49.425765464Z\""
  ],
  "tags": [
    "_jsonparsefailure",
    "_geoip_lookup_failure"
  ],
  "tags.keyword": [
    "_jsonparsefailure",
    "_geoip_lookup_failure"
  ],
  "type": [
    "json"
  ],
  "type.keyword": [
    "json"
  ],
  "_id": "corojooBw9XJIP4k9eHb",
  "_index": "gcp-test",
  "_score": null
}

That is not valid JSON. It suggests your JSON is pretty-printed, in which case you will need to replace the json codec with a multiline codec and use a json filter to parse it.

So would it be:

 type => "json"
    path => "E:/Evidence-Collection/Test-Collection/*.json"
    sincedb_path => "C:/Users/forensic-user/Documents/Elastic-Kibana/logstash-8.9.1/data/queue/sincedb_json_ingest_.log"
    mode => "read"
    codec => "multiline"

There is an example of using a multiline codec to consume an entire file here. If your file contains multiple objects you will need to find a regexp that can match the end of a JSON object.

Does this look right?

input {
  file {
    type => "json"
    path => "E:/Evidence-Collection/Test-Collection/*.json"
    sincedb_path => "C:/Users/forensic-user/Documents/Elastic-Kibana/logstash-8.9.1/data/queue/sincedb_json_ingest_.log"
    mode => "read"
    codec => multiline {
      pattern => "^Spalanzani" 
      negate => true 
      what => previous 
      auto_flush_interval => 1 
      multiline_tag => ""
    }
    file_completed_action => "log"
    file_completed_log_path => "C:/Users/forensic-user/Documents/Elastic-Kibana/logstash-8.9.1/logs/logstash-json_logs_read.log"
    }
}

If you want to consume the entire file as a single event then that looks right to me.

Does this look good for a split array?

input {
  file {
    type => "json"
    path => "E:/Evidence-Collection/Test-Collection/*.json"
    sincedb_path => "C:/Users/forensic-user/Documents/Elastic-Kibana/logstash-8.9.1/data/queue/sincedb_json_ingest_.log"
    mode => "read"
    codec => multiline {
      pattern => "^Spalanzani" 
      negate => true 
      what => previous 
      auto_flush_interval => 1 
      multiline_tag => ""
    }
    file_completed_action => "log"
    file_completed_log_path => "C:/Users/forensic-user/Documents/Elastic-Kibana/logstash-8.9.1/logs/logstash-json_logs_read.log"
    }
}

filter {
  if [type] == "gcp" {
    date {
      match => [ "[raw][timestamp]", "ISO8601" ]
    }
    split { 
      field => "someField" }
    mutate {
      ecs_compatibility => "disabled"
      rename => {
        "[raw][logName]" => "log_name"
        "[raw][severity]" => "severity"
        "[raw][insertId]" => "gcp_log_id"
        "[raw][protoPayload][requestMetadata][callerIp]" => "source_ip"
        "[raw][protoPayload][requestMetadata][callerSuppliedUserAgent]" => "useragent"
        "[raw][protoPayload][@type]" => "event_type"

Hard to tell without knowing what the message format is like.

After ingestion, or are you talking about the format of the json?

I am talking about the JSON. BTW, you should probably start the filter section with a json filter since you no longer have a json codec.

I was able to put the .json filter within there; however, I'm getting that same "loop" that I had at the beginning of the discussion. Any thoughts of where I may need to edit in the script?

input {
  file {
    type => "json"
    path => "E:/Evidence-Collection/Test-Collection/*.json"
    sincedb_path => "C:/Users/forensic-user/Documents/Elastic-Kibana/logstash-8.9.1/data/queue/sincedb_json_ingest_.log"
    mode => "read"
    codec => multiline {
      pattern => "^Spalanzani" 
      negate => true 
      what => previous 
      auto_flush_interval => 1 
      multiline_tag => ""
    }
    file_completed_action => "log"
    file_completed_log_path => "C:/Users/forensic-user/Documents/Elastic-Kibana/logstash-8.9.1/logs/logstash-json_logs_read.log"
    }
}

filter {
  json { 
    source => "message" 
    target => "someField" 
    remove_field => [ "message" ] 
    } 
    if [type] == "gcp" {
      date {
        match => [ "[raw][timestamp]", "ISO8601" ]
    }
      split { 
        field => "someField" }
      mutate {
        ecs_compatibility => "disabled"
        rename => {
          "[raw][logName]" => "log_name"
          "[raw][severity]" => "severity"
          "[raw][insertId]" => "gcp_log_id"
          "[raw][protoPayload][requestMetadata][callerIp]" => "source_ip"
          "[raw][protoPayload][requestMetadata][callerSuppliedUserAgent]" => "useragent"
          "[raw][protoPayload][@type]" => "event_type"
          "[raw][protoPayload][@type][status][message]" => "status_message"
          "[raw][protoPayload][authenticationInfo][principalEmail]" => "username"
          "[raw][protoPayload][serviceName]" => "service_name"
          "[raw][protoPayload][authorizationInfo]" => "authorization_info"
          "[raw][protoPayload][methodName]" => "method_name"
          "[raw][protoPayload][resourceName]" => "resource_name"
          "[raw][protoPayload][request][service_account][display_name]" => "service_account_name"
          "[raw][protoPayload][request][service_account][description]" => "service_account_description"
          "[raw][protoPayload][response][email]" => "account_email"
          "[raw][protoPayload][response][unique_id]" => "account_id"
          "[raw][jsonPayload][message]" => "system_message"
          "[raw][jsonPayload][connection][dest_ip]" => "destination_ip"
          "[raw][jsonPayload][connection][dest_port]" => "destination_port"
          "[raw][jsonPayload][connection][protocol]" => "protocol"
          "[raw][jsonPayload][connection][src_ip]" => "source_ip"
          "[raw][jsonPayload][connection][src_port]" => "source_port"
          "[raw][jsonPayload][disposition]" => "disposition"
          "[raw][jsonPayload][instance][project_id]" => "vpc_project_id"
          "[raw][jsonPayload][instance][region]" => "vpc_region"
          "[raw][jsonPayload][instance][vm_name]" => "vm_name"
          "[raw][jsonPayload][instance][zone]" => "resource_zone"
          "[raw][jsonPayload][rule_details][action]" => "firewall_action"
          "[raw][jsonPayload][rule_details][direction]" => "firewall_direction"
          "[raw][jsonPayload][rule_details][ip_port_info][ip_protocol]" => "firewall_rule_protocol"
          "[raw][jsonPayload][rule_details][ip_port_info][port_range]" => "firewall_rule_ports"
          "[raw][jsonPayload][rule_details][priority]" => "firewall_rule_priority"
          "[raw][jsonPayload][rule_details][reference]" => "firewall_rule_reference"
          "[raw][jsonPayload][rule_details][source_range]" => "firewall_rule_source_ranges"
          "[raw][jsonPayload][rule_details][target_tag]" => "firewall_rule_target_tags"
          "[raw][jsonPayload][vpc][subnetwork_name]" => "subnetwork_name"
          "[raw][jsonPayload][vpc][vpc_name]" => "vpc_name"
          "[raw][textPayload]" => "text_payload"
          "[raw][labels][compute.googleapis.com/resource_name]" => "compute_resource_name"
          "[raw][resource][type]" => "resource_type"
          "[raw][resource][labels][bucket_name]" => "bucket_name"
          "[raw][resource][labels][location]" => "resource_location"
          "[raw][resource][labels][zone]" => "resource_zone"
          "[raw][resource][labels][project_id]" => "project_id"
          "[raw][resource][labels][instance_id]" => "instance_id"
          "[raw][protoPayload][serviceData][policyDelta][bindingDeltas]" => "policy_deltas"
          "[raw][protoPayload][requestMetadata][destinationAttributes][ip]" => "destination_ip"
          "[raw][protoPayload][requestMetadata][destinationAttributes][port]" => "destination_port"
        }
    #  add_tag => [ "gcp_log" ]
    }
    # remove remaining fields
    mutate {
      remove_field => [ "raw" ]
    }
    # split authorization_info out into authorization_permissions - but keep the original intact
    if [authorization_info] {
      ruby {
        path => "C:\Users\forensic-user\Documents\Elastic-Kibana\logstash-8.9.1\split_gcp_authinfo_fields.rb"
        script_params => {
          "source_field" => "[authorization_info]"
          "destination_field" => "[authorization_permissions]"
          "key_field" => "permission"
        
        }
      }
    }
  }
}

output {
    stdout  {
        codec => rubydebug
    }
    elasticsearch {
	    hosts => ["http://1.1.1.1:9200"]
      index => "gcp-test-2"
    }
}

That is normal for DEBUG level logging. It does not suggest there is a problem.

Does this debug mean anything important? I saw your previous post about it with the auto_flush_interval. What's the value to it? Would it be: auto_flush_interval => 0

[2023-09-25T15:57:00,904][DEBUG][logstash.instrument.periodicpoller.cgroup] One or more required cgroup files or directories not found: /proc/self/cgroup, /sys/fs/cgroup/cpuacct, /sys/fs/cgroup/cpu

No, it is just noise.

I'm not too sure what to do now.. this script has been running for quite some time, and yet nothing is showing up within Index Management.