Difference in sending static vs "live feeding" logs to logstash via filebeat

We have two different ELK servers which are used to analyse logs from our own application, one is in-house and the other is on the customers site. We have different pipelines for the application itself and then also for the various application services. Example we have application.log pipeline, then application.service1.log, application.service2.log and so on in the

The in-house setup gathers the logs from a share after they are moved there by a script and are static in that they are no longer being changed or updated. Filebeat is configured to push the logs into logstash.

On the customer site we have multiple application servers with filebeat running, pushing the the logs directly to logstash over the network to the ELK server. Some of the logs are constantly being amended and added to over time

We have noticed that the logs are being indexed differently on the customer site.

On the customer site there appears to be some "cross contamination" between the application.log and application.server1.log logs when we see log lines in the message field that shouldn´t be there.

Our question is, should there be an difference between sending static logs vs a live feed to logstash and if so what filebeat input options (or elasticesearch output?) should be applied in this "live feed scenario"

Apologies if some of the terminology is off its because I am new to ELK stack.

Thanks in advance

There is basically no difference, FIlebeat is line oriented, it will start reading the files and processing line by line, the only difference is that when file are constantly being written, they may be rotated and depending on the configuration you can have duplicate messags or miss some logs.

It is not clear what you mean with that, can you provide more context? Are the logs on application.log different from the logs on application.server1.log?

You need to provide more context about this issue like the filebeat.yml, the logstash configuration, what is your output and what is the expected output.

Apologies, I am a newbie so my ELK terminology might be off. And also I made a typo, application.server1.log should be application.service1.log. I will try add context best as I can.

The filebeat.yml is as below

filebeat.registry.path: ./filebeat

logging.to_files: true
logging.files:
path: C:\Company\Data\Logs\Monitoring
name: filebeat.log
rotateeverybytes: 10485760
keepfiles: 2
logging.level: info

filebeat.spool_size: 102400
filebeat.idle_timeout: 15s

filebeat.inputs:

  • input_type: log
    paths:

    • C:\Company\Data\Logs\Application.Server.exe.log
      fields: {log_type: Application_Server_log_files}
      fields_under_root: true
      multiline.pattern: '^(20[0-9]{2}(-[0-9]{2}){2} [0-9]{2}(:[0-9]{2}){2})|([0-9]{2} [JFMASOND][a-z]{2} 20[0-9]{2})'
      multiline.negate: true
      multiline.match: after
      close_removed: false
      close_eof: false
      clean_removed: true
      scan_frequency: 1h
      backoff: 5m
  • input_type: log
    paths:

    • C:\Company\Data\Logs\Application.Serivce.*.exe.log
      fields: {log_type: Application_Service_log_files}
      fields_under_root: true
      multiline.pattern: '^(20[0-9]{2}(-[0-9]{2}){2} [0-9]{2}(:[0-9]{2}){2})|([0-9]{2} [JFMASOND][a-z]{2} 20[0-9]{2})'
      multiline.negate: true
      multiline.match: after
      close_removed: false
      close_eof: false
      clean_removed: true
      scan_frequency: 1h
      backoff: 5m

output.logstash:
hosts: ["ESServer:5044"]
bulk_max_size: 8192
worker: 4
compression_level: 0
pipelining: 5

We then have separate logstash filter configs for both Application.Server and Application.Service. The Application.Service.config is as below, its missing some additional groks for the sake of brevity.

filter {
if [log_type] == "Application_Service_log_files" {

fingerprint {
    source => ["Site", "message", "source", "log_date", "logger_name", "tool_name", "level" ]
    concatenate_sources => true
    method => "MURMUR3"
}
	
	
	# first filter
      grok {
            add_tag => [ "valid", "elastic" ]
            match => [ "message", "%{DATESTAMP:log_date} \[%{DATA:service_version}\]\[%{DATA:tool_name}\] %{LOGLEVEL:level}\s+%{NOTSPACE:logger_name} %{GREEDYDATA:message}",
                       "source", ".+" ]
            break_on_match => false
            overwrite => [ "message" ]
      }
      date {
            match => ["log_date", "yy-MM-dd HH:mm:ss.SSS"]
      }
    # ERROR
      grok {
                    add_tag => [ "ERROR" ]
					match => [ "level", "ERROR" ]
                    tag_on_failure => []
      }

    # FATAL
      grok {
                    add_tag => [ "FATAL" ]
                    match => [ "level", "FATAL" ]
                    tag_on_failure => []


      mutate {
            remove_tag => [ "valid", "beats_input_codec_plain_applied" ]
            rename => { "[agent][name]" => "[site]" }
			rename => { "[agent][hostname]" => "[host]" }
            remove_field => ["[agent][ephemeral_id]", "[agent][id]", "[agent][type]", "[agent][version]", "input_type", "offset"]
      }
}

}

Hopefully this helps with understanding our setup.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.