Filebeat randomly missing input records

We routinely (re-)load ISE events in elasticsearch using the following command:
cat ${FILELIST} | sort -u >>iselog.log.reprocess

We sort the input file as it is made of ISE log events over multiple lines, and in our current setup, we have no guarantee that these lines are delivered in the right order.
There is a logstash filter that re-orders those lines, and loads the re-assembled events in the target index.
The issue is that we only load and process about 95% of the events in the input file. On average, 5% are not loaded (e.g. 300 out of 6000 re-assembled events). The problem is not linked to the event records themselves (format, etc.), as two consecutive runs of the command above will miss different events. It's pretty random.
There must be a race somewhere, but I can't figure out where.
The config for Filebeat is:

    filebeat.inputs:
<some inputs skipped>
- type: log
  paths:
    - /opt/elk/data/iselog/iselog.log*
  input_type: log
  fields:
    type: ISELOG
    document_type: ISELOG
output:
  logstash:
    hosts: ["@@ELK_CURRENT_HOST_LN@@:@@LOGSTASH_PORT_02@@"]
    loadbalance: true

Issue description sounds similar to Filebeat missing log lines but I'm not too sure I understood the fix (buying more CPU?).

What does your Logstash pipeline look like?

As it is getting event lines from ISE, the first part aggregates the various lines into a single event. The aggregate filter - and the ruby code in the filter - only works when lines are delivered in the right order (that part is, btw, essentially copied/pasted from https://stackoverflow.com/questions/34259414/syslog-message-in-multi-parts-can-logstash-reassemble-it).
We had some problems to guarantee this, and as a workaround we now re-process in batch: we drop the index of the day, sort the ISE event lines of that day, and re-pipe the output to Filebeat.
The pipeline is quite lengthy. I annotated with ### the area where code was removed for the sake of clarity.

 
filter {
if [fields][type] =~ "ISELOG" {
    # Parse an event line

    grok {
        patterns_dir => ["/opt/elk/config/logstash/patterns/grok-patterns"]
        match => [
            "message", "(?:\<%{POSINT:syslog_pri}\>)?%{SYSLOGTIMESTAMP:timestamp} %{IPORHOST:hostname} %{WORD:kind} %{NOTSPACE:taskid} %{INT:duration:int} %{INT:order:int}( %{YEAR}-%{MONTHNUM}-%{MONTHDAY} %{TIME} %{ISO8601_TIMEZONE})? %{GREEDYDATA:rest}"
        ]
    }

    # If this is an event with a single line -> no aggregate filter
    # Simply keep copy of messge in map variable

    if [order] == 0 {
        if [duration] == 1 {
            ruby {
                code => "
                    map = {}
                    map['the_message'] = {}
                    map['the_message']['0'] = event.get('message')
                    event.set('message_parts', map['the_message'])
                    event.set('tags', 'finish')
                "
            }
        }

        # If this is the first event line of an event split over multiple lines, start an aggregate filter
        # with the creation of a map variable and save copy of message to it
        #

        else {
            aggregate {
                task_id => "%{taskid}"
                code => "map['the_message'] ||= {} ; map['the_message']['0'] = event.get('message')"
                map_action => "create"
            }
        }
    }
    else {
    # Keep adding event lines to the array until the last part is received
    # This is to ensure messages are in the correct order
    # We're not interested in PRI, timestamp ... on all but the first message so keep only 'rest' matched field here

        aggregate {
            task_id => "%{taskid}"
            code => "
                map['the_message'][event.get('order').to_s] = event.get('rest')
                event.set('message_parts', map['the_message'])
                if map['the_message'].length == event.get('duration') then
                    event.set('tags', 'finish')
                end
            "
            map_action => "update"
        }
    }

    # If we got the last part (the tags field contains 'finish'), concatenate event lines
    # Else drop the event line (there is a copy in map anyway)

    if "finish" in [tags] {
        # Ruby code to convert the array to a well formatted single string
        ruby {
            code => "
                msg = ''
                a = event.get('[message_parts]')
                num = a.length

                for i in 0..num-1
                    msg +=  a[i.to_s]
                end

                # set the message to the new concatenated one
                event.set('concat_message', msg)
            "
        }
        # Remove those fields we don't need in elasticsearch
        mutate {
            remove_field => [ "syslog_pri", "timestamp", "hostname", "kind", "taskid", "order", "duration", "rest" , "timestamp", "syslog_pri", "message", "message_parts"]
        }

        # Now finally match the full event (i.e. the concatenation of all event lines)
        grok {
            match => {
                "concat_message" => ["(?:\<%{POSINT:syslog_pri}\>)?%{SYSLOGTIMESTAMP:timestamp} %{IPORHOST:hostname} %{WORD:kind} %{NOTSPACE:taskid} %{INT:duration:int} %{INT:order:int} %{YEAR}-%{MONTHNUM}-%{MONTHDAY} %{TIME} %{ISO8601_TIMEZONE} %{GREEDYDATA:attributes}"]
            }
        }
        date {
            match => [ "timestamp", "MMM dd HH:mm:ss", "MMM  d HH:mm:ss" ]
        }

        # Parsing / matching depends on the type of event we are processing (type of event is recorded in 'kind' field)

        # Authentication events - to be processed except when they don't have UserName
        # ISE regularly notifies a 'spurious' authentication event that does not correspond to actual users connections

        if ("CISE_Passed_Authentications" in [kind]) {
            if ([concat_message] =~ /Passed-Authentication: Authentication succeeded/) {

                # Retrieve easily identifiable fields

                grok {
                    break_on_match => false
                    match => {

                    "attributes" => [
                                "UserName=%{WORD:user_id}",
                                "(?<country_cd>(?<=countryCode=)\d+)",
                                "(?<manager_id>(?<=manager=CN=)\w{2}\d{3})",
                                "(?<cost_center>(?<=costCenter=)[^,]+)",
                                "(?<session_id>(?<=Class=)[^;]+)"
                                ]
                    }
                }

                # Split the message (attributes) in comma-separated (when not escaped) elements
                # Next split each element by equal sign (when not escaped) into key/value
                # Keep the field names we're interested in and remove escape and superfluous spaces

                ruby {
                    code => "
			# ### Removed code snippet - basically formatting some fields and adding to the event
                    "
                }
            }
	    
	    # Ignore event
            else {
                mutate {
                    add_tag => ["ignore_event"]
		}
            }
        }

	# ### Processing of other types of messages removed 

        # The aggregate filter for reconstituting events from N event lines

        aggregate {
            task_id => "%{taskid}"
            code => ""
            map_action => "update"
            end_of_task => true
        }

        # Drop fields we do not need
        mutate {
            remove_field => ["order", "duration", "attributes"]
            rename => ["concat_message", "message" ]
        }
    }
    else {
        # Ignore individual parts
        drop{}
    }
}
}

output {
     if [fields][type] =~ "ISELOG" {
         elasticsearch {
            hosts => ["https://@@ELK_HOST_LN_01@@:@@ELASTICSEARCH_PORT@@", "https://@@ELK_HOST_LN_02@@:@@ELASTICSEARCH_PORT@@", "https://@@ELK_HOST_LN_03@@:@@ELASTICSEARCH_PORT@@"]
            index => "logstash-iselog-%{+YYYY.MM.dd}"
            # ### Further param removed
         }
     }
}

I also put the script that we run to reload events. I'm not too sure about the behaviour of Filebeat when we delete a file and then re-create it. Does it position itself correctly?

cd /opt/elk/data/iselog
\rm -f iselog.log.reprocess

# Reprocess the last 3 ISE event files

for file in `ls -lrt  iselog.log.20* | tail -3  |awk '{print $9}' `
do
  # Sort the records we want to reload and pipe to Filebeat
  cat ${file} | sort -u >>iselog.log.reprocess
done

As you are using the aggregate filter the processing order is not defined unless you make sure there is only one worker thread and you disable tha java execution engine. Would events being processed out of order explain the missing documents?

We finally found the problem: we didn't have any setting for pipeline.workers in logstash.yaml, hence it was defaulted to the number of CPU core's (in our case, 4), hence the erratic behaviour when processing multi-line events.
Maybe this required configuration could be mentioned as well on the page describing the aggregate filter (https://www.elastic.co/guide/en/logstash/current/plugins-filters-aggregate.html), not just the -w 1 command line parameter...
And by the way, this setting also solved the out-of-order delivery of event lines to logstash. Turns out event lines were actually delivered to logstash in the right order, and it was just the incorrect setting of logstash that caused this spurious problem. So we've removed the batch reload of events as everything is now consumed properly in straight-through mode.
Thanks a lot for your support.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.