Applying Fields Parsed From a Few Logs to Many Logs

Hello! I am working with Nessus scan results, and have been using Logstash to parse out additional fields based on specific logs that are ingested.

For example, a "mac_address" field is parsed out IF the "name" field is "Ethernet MAC Addresses":

The same is done with other fields, such as hostname, device_type, etc., as well as tagging the machine as a "server" or a "workstation". This is all done with Logstash.

Here is my Logstash config:

input {
    beats {
            id = "nessusbeat"
            port = 5057
            codec = json {
                    charset = "ISO-8859-1"
            }
            client_inactivity_timeout = 3600
    }
}

filter {
    if [fields][index] == "nessusbeat" {
        mutate {
                remove_field = [ '' ]
        }
        mutate {
                rename = { "host" = "source_ip" }
        }
        if "Ethernet MAC Addresses" in [name] {
                grok {
                        match = [ 'plugin_output', '[A-z ]*:\n[ ]*-[ ]*(?<mac_address(.*))' ]
                }
        }
        if "Additional DNS Hostnames" in [name] {
                grok {
                        match = [ 'plugin_output', '[A-z ]*:\n[ ]*-[ ]*(?<hostname(.*))' ]
                }
        }
        if "Common Platform Enumeration (CPE)" in [name] {
                grok {
                        match = [ 'plugin_output', '\n[A-z ]*:[ ]*\n\n[ ]*(?<cpe([^\n]*)).*' ]
                }
        }
        if "OS Identification" in [name] {
                grok {
                        match = [ 'plugin_output', '\n[A-z ]*:[ ]*(?<operating_system([^\n]*)).*' ]
                }
        }
        if "Device Type" in [name] {
                grok {
                        match = [ 'plugin_output', '[A-z ]*:[ ]*(?<device_type([^\n]*)).*' ]
                }
        }
        if "H**_SERVERS" in [plugin_output] {
                mutate {
                        add_tag = [ "server" ]
                }
        }
        if "H**_WORKSTATIONS" in [plugin_output] {
                mutate {
                        add_tag = [ "workstation" ]
                }
        }
    }
}

My problem, however, is that the parsed fields only reflect in those FEW logs that contain that information, rather than applying to the entire scan. This makes sense to me as to why it is doing that, but I am wondering if there is a way to achieve what I want to do.

I was thinking maybe changing up the mapping and implementing some sort of parent/child relationship with these fields, but I really don't know where to effectively start. This is also the reason why I am posting this here and not in Logstash, because I believe I may be able to index/structure the data in a way that will work for me.

I would like to accomplish this in order to create server/workstation filters and a data table containing host information in my Nessus dashboard. Currently, it will only filter out the few logs that the fields are parsed from, because they are not applied to the entire group of scan logs.

Any advice?

Thanks,
Joe

You'll need to enrich the record with a Lookup filter.

Docs & Blog Post

Before I dive into this, my understanding is that the Elasticsearch filter takes a field from logs currently present in an index and copies them to new logs.

My Nessus scan results enter Elasticsearch as individual results, not as an entire scan. I'm worried that the data enrichment will miss any log that comes before the value is parsed out.

If my thought process is correct, I believe I would need to be able to copy old fields to new logs AND new fields to old logs, if that makes sense. Or can I do that with this as well?

Logstash runs stateless. So if you have partial records coming in that you want enriched, it has to pull that enrichment data from somewhere (ES, Memcache, etc.). If you want to occasionally enrich records already indexed, setup another pipeline which pulls from ES on a schedule. Docs are here.

Thank you for your help, I got it working. In case someone else may need to do this, I'll explain.

I changed the logstash pipeline (the one above) to output to a temporary index. From there, I have the following, which queries the temporary index, performs the data enrichment, and outputs to the final index:

input {
	elasticsearch {
		hosts => "foo:9200"
		index => "temp_nessusbeat-*"
		query => '{"query":{"bool":{"must_not":[{"exists":{"field":"tags"}}],"filter":{"range":{"@timestamp":{"gte":"now-5m","lte":"now"}}}}}}'
		scroll => "5m"
		schedule => "*/5 * * * *"
		add_field => {
			"index" => "nessusbeat"
		}
	}
}

filter {
	if "nessusbeat" in [index] {
		mutate {
			remove_field => [ "[fields][index]" ]
		}
		elasticsearch {
			hosts => "foo:9200"
			index => "temp_nessusbeat-*"
			query => 'tags:nessus_mac AND source_ip:%{[source_ip]} AND @timestamp:[now-5m TO now]'
			fields => { "mac_address" => "mac_address" }
		}
		elasticsearch {
			hosts => "foo:9200"
			index => "temp_nessusbeat-*"
			query => 'tags:nessus_device AND source_ip:%{[source_ip]} AND @timestamp:[now-5m TO now]'
			fields => { "device_type" => "device_type" }
		}
		elasticsearch {
			hosts => "foo:9200"
			index => "temp_nessusbeat-*"
			query => 'tags:nessus_hostname AND source_ip:%{[source_ip]} AND @timestamp:[now-5m TO now]'
			fields => { "hostname" => "hostname" }
		}
		elasticsearch {
			hosts => "foo:9200"
			index => "temp_nessusbeat-*"
			query => 'tags:nessus_cpe AND source_ip:%{[source_ip]} AND @timestamp:[now-5m TO now]'
			fields => { "cpe" => "cpe" }
		}
		elasticsearch {
			hosts => "foo:9200"
			index => "temp_nessusbeat-*"
			query => 'tags:nessus_type AND source_ip:%{[source_ip]} AND @timestamp:[now-5m TO now]'
			fields => { "device_tag" => "device_tag" }
		}
		elasticsearch {
			hosts => "foo:9200"
			index => "temp_nessusbeat-*"
			query => 'tags:nessus_os AND source_ip:%{[source_ip]} AND @timestamp:[now-5m TO now]'
			fields => { "operating_system" => "operating_system" }
		}
	}
}

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.