Hello,
I've been trying to configure this .conf file to help parse out .json files correctly. This script is able to ingest Google Cloud Audit Logs (in .json), but fails to parse it correctly:
input {
# stdin {}
file {
type => "json"
path => "E:/Evidence-Collection/Test-Collection/*.json"
sincedb_path => "C:/Users/forensic-user/Documents/Elastic-Kibana/logstash-8.9.1/data/queue/sincedb_json_ingest_.log"
mode => "read"
codec => "json"
file_completed_action => "log"
file_completed_log_path => "C:/Users/forensic-user/Documents/Elastic-Kibana/logstash-8.9.1/logs/logstash-json_logs_read.log"
}
}
filter {
geoip {
source => "sourceIPAddress"
target => "geoip"
add_tag => ["cloudtrail-geoip"]
ecs_compatibility => "disabled"
}
}
output {
stdout {
codec => rubydebug
}
elasticsearch {
hosts => ["http://X.X.X.X:9200"]
index => "gcp-test"
}
}
So then I read into SOF-ELK to find the config files they use to help filter the information and rename the fields, which here is the finished conf file that I'm trying to use:
input {
file {
type => "json"
path => "E:/Evidence-Collection/Test-Collection/*.json"
sincedb_path => "C:/Users/forensic-user/Documents/Elastic-Kibana/logstash-8.9.1/data/queue/sincedb_json_ingest_.log"
mode => "read"
codec => "json"
file_completed_action => "log"
file_completed_log_path => "C:/Users/forensic-user/Documents/Elastic-Kibana/logstash-8.9.1/logs/logstash-json_logs_read.log"
}
}
filter {
if [type] == "gcp" {
date {
match => [ "[raw][timestamp]", "ISO8601" ]
}
mutate {
ecs_compatibility => "disabled"
rename => {
"[raw][logName]" => "log_name"
"[raw][severity]" => "severity"
"[raw][insertId]" => "gcp_log_id"
"[raw][protoPayload][requestMetadata][callerIp]" => "source_ip"
"[raw][protoPayload][requestMetadata][callerSuppliedUserAgent]" => "useragent"
"[raw][protoPayload][@type]" => "event_type"
"[raw][protoPayload][@type][status][message]" => "status_message"
"[raw][protoPayload][authenticationInfo][principalEmail]" => "username"
"[raw][protoPayload][serviceName]" => "service_name"
"[raw][protoPayload][authorizationInfo]" => "authorization_info"
"[raw][protoPayload][methodName]" => "method_name"
"[raw][protoPayload][resourceName]" => "resource_name"
"[raw][protoPayload][request][service_account][display_name]" => "service_account_name"
"[raw][protoPayload][request][service_account][description]" => "service_account_description"
"[raw][protoPayload][response][email]" => "account_email"
"[raw][protoPayload][response][unique_id]" => "account_id"
"[raw][jsonPayload][message]" => "system_message"
"[raw][jsonPayload][connection][dest_ip]" => "destination_ip"
"[raw][jsonPayload][connection][dest_port]" => "destination_port"
"[raw][jsonPayload][connection][protocol]" => "protocol"
"[raw][jsonPayload][connection][src_ip]" => "source_ip"
"[raw][jsonPayload][connection][src_port]" => "source_port"
"[raw][jsonPayload][disposition]" => "disposition"
"[raw][jsonPayload][instance][project_id]" => "vpc_project_id"
"[raw][jsonPayload][instance][region]" => "vpc_region"
"[raw][jsonPayload][instance][vm_name]" => "vm_name"
"[raw][jsonPayload][instance][zone]" => "resource_zone"
"[raw][jsonPayload][rule_details][action]" => "firewall_action"
"[raw][jsonPayload][rule_details][direction]" => "firewall_direction"
"[raw][jsonPayload][rule_details][ip_port_info][ip_protocol]" => "firewall_rule_protocol"
"[raw][jsonPayload][rule_details][ip_port_info][port_range]" => "firewall_rule_ports"
"[raw][jsonPayload][rule_details][priority]" => "firewall_rule_priority"
"[raw][jsonPayload][rule_details][reference]" => "firewall_rule_reference"
"[raw][jsonPayload][rule_details][source_range]" => "firewall_rule_source_ranges"
"[raw][jsonPayload][rule_details][target_tag]" => "firewall_rule_target_tags"
"[raw][jsonPayload][vpc][subnetwork_name]" => "subnetwork_name"
"[raw][jsonPayload][vpc][vpc_name]" => "vpc_name"
"[raw][textPayload]" => "text_payload"
"[raw][labels][compute.googleapis.com/resource_name]" => "compute_resource_name"
"[raw][resource][type]" => "resource_type"
"[raw][resource][labels][bucket_name]" => "bucket_name"
"[raw][resource][labels][location]" => "resource_location"
"[raw][resource][labels][zone]" => "resource_zone"
"[raw][resource][labels][project_id]" => "project_id"
"[raw][resource][labels][instance_id]" => "instance_id"
"[raw][protoPayload][serviceData][policyDelta][bindingDeltas]" => "policy_deltas"
"[raw][protoPayload][requestMetadata][destinationAttributes][ip]" => "destination_ip"
"[raw][protoPayload][requestMetadata][destinationAttributes][port]" => "destination_port"
}
# add_tag => [ "gcp_log" ]
}
# remove remaining fields
mutate {
remove_field => [ "raw" ]
}
# split authorization_info out into authorization_permissions - but keep the original intact
if [authorization_info] {
ruby {
path => "C:\Users\forensic-user\Documents\Elastic-Kibana\logstash-8.9.1\split_gcp_authinfo_fields.rb"
script_params => {
"source_field" => "[authorization_info]"
"destination_field" => "[authorization_permissions]"
"key_field" => "permission"
}
}
}
}
}
output {
stdout {
codec => rubydebug
}
elasticsearch {
hosts => ["http://X.X.X.X:9200"]
index => "gcp-test-2"
}
}
I'm trying to figure out what I'm doing wrong within this code since it keeps going into a constant loop of:
[2023-09-22T15:42:32,016][DEBUG][org.logstash.execution.PeriodicFlush][main] Pushing flush onto pipeline.
[2023-09-22T15:42:32,185][DEBUG][logstash.instrument.periodicpoller.jvm] collector name {:name=>"G1 Young Generation"}
[2023-09-22T15:42:32,186][DEBUG][logstash.instrument.periodicpoller.jvm] collector name {:name=>"G1 Old Generation"}
[2023-09-22T15:42:35,397][DEBUG][logstash.instrument.periodicpoller.cgroup] One or more required cgroup files or directories not found: /proc/self/cgroup, /sys/fs/cgroup/cpuacct, /sys/fs/cgroup/cpu
[2023-09-22T15:42:37,015][DEBUG][org.logstash.execution.PeriodicFlush][main] Pushing flush onto pipeline.
[2023-09-22T15:42:37,201][DEBUG][logstash.instrument.periodicpoller.jvm] collector name {:name=>"G1 Young Generation"}
[2023-09-22T15:42:37,201][DEBUG][logstash.instrument.periodicpoller.jvm] collector name {:name=>"G1 Old Generation"}
[2023-09-22T15:42:38,120][DEBUG][filewatch.sincedbcollection][main][8cdf346204be2079af0234f08541de4924cba983224b0294c7a9d5e986affa10] writing sincedb (delta since last write = 15)
[2023-09-22T15:42:40,400][DEBUG][logstash.instrument.periodicpoller.cgroup] One or more required cgroup files or directories not found: /proc/self/cgroup, /sys/fs/cgroup/cpuacct, /sys/fs/cgroup/cpu
[2023-09-22T15:42:42,015][DEBUG][org.logstash.execution.PeriodicFlush][main] Pushing flush onto pipeline.
[2023-09-22T15:42:42,216][DEBUG][logstash.instrument.periodicpoller.jvm] collector name {:name=>"G1 Young Generation"}
[2023-09-22T15:42:42,216][DEBUG][logstash.instrument.periodicpoller.jvm] collector name {:name=>"G1 Old Generation"}
Please let me know how I can fix this script in order to successfully ingest the information.
leandrojmp
(Leandro Pereira)
September 22, 2023, 4:00pm
2
Hello,
Do you have any ERROR
or WARN
logs where it shows the parse is failing? None of the logs you shared are relevant for your issue.
I would suggest that you disable DEBUG
log in logstash as it will be very noisy.
If you have a parsing error it will be logged as a WARN, no need to use DEBUG.
It will not let me post my screenshots; however, within the logs it will show "_jsonparsefailure" within the log and that is it (besides the timestamp, etc.).
leandrojmp
(Leandro Pereira)
September 22, 2023, 4:42pm
4
The tag _jsonparsefailure
means that the json filter or codec was not able to parse the message as a json because it is not a valid json.
You need to share the json file you are trying to parse and the logs you are getting in logstash, in plain text please, avoid sharing screen shots.
Also, is your json file one json document per line or the json is pretty printed on it? It needs to be one json document per line.
[
{
"protoPayload": {
"@type": "type.googleapis.com/google.cloud.audit.AuditLog",
"status": {},
"authenticationInfo": {
"principalEmail": "example@gmail.com"
},
"requestMetadata": {
"callerIp": "1.1.1.1",
"callerSuppliedUserAgent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/115.0.0.0 Safari/537.36,gzip(gfe),gzip(gfe)",
"requestAttributes": {},
"destinationAttributes": {}
},
"serviceName": "serviceusage.googleapis.com",
"methodName": "google.longrunning.Operations.GetOperation",
"authorizationInfo": [
{
"resource": "projectnumbers/0123456789",
"permission": "serviceusage.services.enable",
"granted": true,
"resourceAttributes": {}
},
{
"resource": "projectnumbers/0123456789",
"permission": "serviceusage.services.enable",
"granted": true,
"resourceAttributes": {}
},
{
"resource": "projectnumbers/0123456789/operations/-",
"permission": "serviceusage.operations.get",
"granted": true,
"resourceAttributes": {}
},
{
"resource": "projectnumbers/0123456789/services/-",
"permission": "serviceusage.services.get",
"granted": true,
"resourceAttributes": {}
}
],
"resourceName": "projects/0123456789/operations/acf.p2-0123456789-xxxxxx-xxxxxx-xxxx-xxxx-xxxxxxxxxxxxx"
},
"insertId": "fjp5t1d3yvz",
"resource": {
"type": "audited_resource",
"labels": {
"service": "serviceusage.googleapis.com",
"method": "google.longrunning.Operations.GetOperation",
"project_id": "marine-storm-123456789"
}
},
"timestamp": "2023-08-11T18:29:50.004372Z",
"severity": "NOTICE",
"logName": "projects/marine-storm-337223/logs/cloudaudit.googleapis.com%2Factivity",
"receiveTimestamp": "2023-08-11T18:29:50.745614622Z"
},
Here is the data from Logstash, into Elasticsearch:
{
"@timestamp": [
"2023-09-13T14:19:06.518Z"
],
"@version": [
"1"
],
"@version.keyword": [
"1"
],
"host.name": [
"windows20server"
],
"host.name.keyword": [
"windows20server"
],
"log.file.path": [
"E:/Evidence-Collection/Test-Collection/downloaded-logs-20230811-130500.json"
],
"log.file.path.keyword": [
"E:/Evidence-Collection/Test-Collection/downloaded-logs-20230811-130500.json"
],
"message": [
" \"receiveTimestamp\": \"2023-08-11T18:56:49.425765464Z\""
],
"message.keyword": [
" \"receiveTimestamp\": \"2023-08-11T18:56:49.425765464Z\""
],
"tags": [
"_jsonparsefailure",
"_geoip_lookup_failure"
],
"tags.keyword": [
"_jsonparsefailure",
"_geoip_lookup_failure"
],
"type": [
"json"
],
"type.keyword": [
"json"
],
"_id": "corojooBw9XJIP4k9eHb",
"_index": "gcp-test",
"_score": null
}
Badger
September 22, 2023, 5:51pm
7
That is not valid JSON. It suggests your JSON is pretty-printed, in which case you will need to replace the json codec with a multiline codec and use a json filter to parse it.
So would it be:
type => "json"
path => "E:/Evidence-Collection/Test-Collection/*.json"
sincedb_path => "C:/Users/forensic-user/Documents/Elastic-Kibana/logstash-8.9.1/data/queue/sincedb_json_ingest_.log"
mode => "read"
codec => "multiline"
Badger
September 22, 2023, 6:00pm
9
There is an example of using a multiline codec to consume an entire file here . If your file contains multiple objects you will need to find a regexp that can match the end of a JSON object.
Does this look right?
input {
file {
type => "json"
path => "E:/Evidence-Collection/Test-Collection/*.json"
sincedb_path => "C:/Users/forensic-user/Documents/Elastic-Kibana/logstash-8.9.1/data/queue/sincedb_json_ingest_.log"
mode => "read"
codec => multiline {
pattern => "^Spalanzani"
negate => true
what => previous
auto_flush_interval => 1
multiline_tag => ""
}
file_completed_action => "log"
file_completed_log_path => "C:/Users/forensic-user/Documents/Elastic-Kibana/logstash-8.9.1/logs/logstash-json_logs_read.log"
}
}
Badger
September 22, 2023, 6:47pm
11
If you want to consume the entire file as a single event then that looks right to me.
Does this look good for a split array?
input {
file {
type => "json"
path => "E:/Evidence-Collection/Test-Collection/*.json"
sincedb_path => "C:/Users/forensic-user/Documents/Elastic-Kibana/logstash-8.9.1/data/queue/sincedb_json_ingest_.log"
mode => "read"
codec => multiline {
pattern => "^Spalanzani"
negate => true
what => previous
auto_flush_interval => 1
multiline_tag => ""
}
file_completed_action => "log"
file_completed_log_path => "C:/Users/forensic-user/Documents/Elastic-Kibana/logstash-8.9.1/logs/logstash-json_logs_read.log"
}
}
filter {
if [type] == "gcp" {
date {
match => [ "[raw][timestamp]", "ISO8601" ]
}
split {
field => "someField" }
mutate {
ecs_compatibility => "disabled"
rename => {
"[raw][logName]" => "log_name"
"[raw][severity]" => "severity"
"[raw][insertId]" => "gcp_log_id"
"[raw][protoPayload][requestMetadata][callerIp]" => "source_ip"
"[raw][protoPayload][requestMetadata][callerSuppliedUserAgent]" => "useragent"
"[raw][protoPayload][@type]" => "event_type"
Badger
September 22, 2023, 7:58pm
13
Hard to tell without knowing what the message format is like.
After ingestion, or are you talking about the format of the json?
Badger
September 22, 2023, 10:27pm
15
I am talking about the JSON. BTW, you should probably start the filter section with a json filter since you no longer have a json codec.
I was able to put the .json filter within there; however, I'm getting that same "loop" that I had at the beginning of the discussion. Any thoughts of where I may need to edit in the script?
input {
file {
type => "json"
path => "E:/Evidence-Collection/Test-Collection/*.json"
sincedb_path => "C:/Users/forensic-user/Documents/Elastic-Kibana/logstash-8.9.1/data/queue/sincedb_json_ingest_.log"
mode => "read"
codec => multiline {
pattern => "^Spalanzani"
negate => true
what => previous
auto_flush_interval => 1
multiline_tag => ""
}
file_completed_action => "log"
file_completed_log_path => "C:/Users/forensic-user/Documents/Elastic-Kibana/logstash-8.9.1/logs/logstash-json_logs_read.log"
}
}
filter {
json {
source => "message"
target => "someField"
remove_field => [ "message" ]
}
if [type] == "gcp" {
date {
match => [ "[raw][timestamp]", "ISO8601" ]
}
split {
field => "someField" }
mutate {
ecs_compatibility => "disabled"
rename => {
"[raw][logName]" => "log_name"
"[raw][severity]" => "severity"
"[raw][insertId]" => "gcp_log_id"
"[raw][protoPayload][requestMetadata][callerIp]" => "source_ip"
"[raw][protoPayload][requestMetadata][callerSuppliedUserAgent]" => "useragent"
"[raw][protoPayload][@type]" => "event_type"
"[raw][protoPayload][@type][status][message]" => "status_message"
"[raw][protoPayload][authenticationInfo][principalEmail]" => "username"
"[raw][protoPayload][serviceName]" => "service_name"
"[raw][protoPayload][authorizationInfo]" => "authorization_info"
"[raw][protoPayload][methodName]" => "method_name"
"[raw][protoPayload][resourceName]" => "resource_name"
"[raw][protoPayload][request][service_account][display_name]" => "service_account_name"
"[raw][protoPayload][request][service_account][description]" => "service_account_description"
"[raw][protoPayload][response][email]" => "account_email"
"[raw][protoPayload][response][unique_id]" => "account_id"
"[raw][jsonPayload][message]" => "system_message"
"[raw][jsonPayload][connection][dest_ip]" => "destination_ip"
"[raw][jsonPayload][connection][dest_port]" => "destination_port"
"[raw][jsonPayload][connection][protocol]" => "protocol"
"[raw][jsonPayload][connection][src_ip]" => "source_ip"
"[raw][jsonPayload][connection][src_port]" => "source_port"
"[raw][jsonPayload][disposition]" => "disposition"
"[raw][jsonPayload][instance][project_id]" => "vpc_project_id"
"[raw][jsonPayload][instance][region]" => "vpc_region"
"[raw][jsonPayload][instance][vm_name]" => "vm_name"
"[raw][jsonPayload][instance][zone]" => "resource_zone"
"[raw][jsonPayload][rule_details][action]" => "firewall_action"
"[raw][jsonPayload][rule_details][direction]" => "firewall_direction"
"[raw][jsonPayload][rule_details][ip_port_info][ip_protocol]" => "firewall_rule_protocol"
"[raw][jsonPayload][rule_details][ip_port_info][port_range]" => "firewall_rule_ports"
"[raw][jsonPayload][rule_details][priority]" => "firewall_rule_priority"
"[raw][jsonPayload][rule_details][reference]" => "firewall_rule_reference"
"[raw][jsonPayload][rule_details][source_range]" => "firewall_rule_source_ranges"
"[raw][jsonPayload][rule_details][target_tag]" => "firewall_rule_target_tags"
"[raw][jsonPayload][vpc][subnetwork_name]" => "subnetwork_name"
"[raw][jsonPayload][vpc][vpc_name]" => "vpc_name"
"[raw][textPayload]" => "text_payload"
"[raw][labels][compute.googleapis.com/resource_name]" => "compute_resource_name"
"[raw][resource][type]" => "resource_type"
"[raw][resource][labels][bucket_name]" => "bucket_name"
"[raw][resource][labels][location]" => "resource_location"
"[raw][resource][labels][zone]" => "resource_zone"
"[raw][resource][labels][project_id]" => "project_id"
"[raw][resource][labels][instance_id]" => "instance_id"
"[raw][protoPayload][serviceData][policyDelta][bindingDeltas]" => "policy_deltas"
"[raw][protoPayload][requestMetadata][destinationAttributes][ip]" => "destination_ip"
"[raw][protoPayload][requestMetadata][destinationAttributes][port]" => "destination_port"
}
# add_tag => [ "gcp_log" ]
}
# remove remaining fields
mutate {
remove_field => [ "raw" ]
}
# split authorization_info out into authorization_permissions - but keep the original intact
if [authorization_info] {
ruby {
path => "C:\Users\forensic-user\Documents\Elastic-Kibana\logstash-8.9.1\split_gcp_authinfo_fields.rb"
script_params => {
"source_field" => "[authorization_info]"
"destination_field" => "[authorization_permissions]"
"key_field" => "permission"
}
}
}
}
}
output {
stdout {
codec => rubydebug
}
elasticsearch {
hosts => ["http://1.1.1.1:9200"]
index => "gcp-test-2"
}
}
Badger
September 25, 2023, 3:47pm
17
That is normal for DEBUG level logging. It does not suggest there is a problem.
Does this debug mean anything important? I saw your previous post about it with the auto_flush_interval. What's the value to it? Would it be: auto_flush_interval => 0
[2023-09-25T15:57:00,904][DEBUG][logstash.instrument.periodicpoller.cgroup] One or more required cgroup files or directories not found: /proc/self/cgroup, /sys/fs/cgroup/cpuacct, /sys/fs/cgroup/cpu
I'm not too sure what to do now.. this script has been running for quite some time, and yet nothing is showing up within Index Management.