Multiple filebeats with different log type specification on logstash

Hi all,

I've two filebeats located on different servers with different purposes. One of them I've been using with the logstash configuration I give below to ship docker logs. And I've a new filebeat for shipping hazelcast logs with the given configuration. My ELK stack version is 5.5.1

So my question is can i configure logstash to listen on the same port for multiple beats?

And what filter would be good to use in order to ingest the log below? I couldn't quite wrap my head around that.

My example hazelcast log is below:

2019-03-26 09:43:37.617 INFO 31828 --- [d.HealthMonitor] c.h.internal.diagnostics.HealthMonitor : [ipv4]:5701 [servr-prod] [3.8.3] processors=8, physical.memory.total=31.2G, physical.memory.free=1001.0M, swap.space.total=16.0G, swap.space.free=14.9G, heap.memory.used=6.2G, heap.memory.free=5.2G, heap.memory.total=11.4G, heap.memory.max=11.4G, heap.memory.used/total=54.22%, heap.memory.used/max=54.22%, minor.gc.count=329, minor.gc.time=65348ms, major.gc.count=2, major.gc.time=109ms, load.process=0.00%, load.system=0.50%, load.systemAverage=54.00%, thread.count=74, thread.peakCount=79, cluster.timeDiff=-16228, event.q.size=0, executor.q.async.size=0, executor.q.client.size=0, executor.q.query.size=0, executor.q.scheduled.size=0, executor.q.io.size=0, executor.q.system.size=0, executor.q.operations.size=1, executor.q.priorityOperation.size=0, operations.completed.count=390597068, executor.q.mapLoad.size=0, executor.q.mapLoadAllKeys.size=0, executor.q.cluster.size=0, executor.q.response.size=0, operations.running.count=0, operations.pending.invocations.percentage=0.00%, operations.pending.invocations.count=1, proxy.count=0, clientEndpoint.count=19, connection.active.count=21, client.connection.count=19, connection.count=2

The one to deliver hazelcast logs' configuration is below:

filebeat.prospectors:

  • input_type: log
    paths:
    • /products/logs/hzcluster.log
      multiline.pattern: '^['
      multiline.negate: true
      multiline.match: after
      output.logstash:
      hosts: ["ip:5044"]
      logging.level: debug

The one responsible for delivering docker logs is below:

filebeat:
prospectors:
- type: log
paths:
- /var/lib/docker/containers//.log
json:
message_key: log
keys_under_root: true
multiline:
pattern: '^APP||^APM||^20\d\d-'
negate: true
match: after
processors:

  • add_docker_metadata: ~
    output:
    logstash:
    hosts: ["ip:5044"]
    compression_level: 0
    logging:
    metrics:
    enabled: false

My logtash config is below:

input {
beats {
port => 5044
type => "docker_logs" } }
filter {
mutate { copy => { "log" => "[@metadata][log]" }}
if [log] =~ /^APM/ {
mutate { add_field => { "[@metadata][logType]" => "APM" } }
grok {
match => { "log" => "APM|(?[^|])|(?.)" }
}
mutate {
# The new line here is the only why to put a new line ...
gsub => ["apmJson", "", ""]
}
json {
source => "apmJson"
}
} else if [log] =~ /^APP/ {
mutate { add_field => { "[@metadata][logType]" => "APP" } }
grok {
match => { "log" => "APP|(?[^|])|(?[^|])|(?[^|])|(?[^|])|(?[^|])|(?[^|])|(?[^|])|(?[.a-zA-Z0-9]) --- (?.*)" } } }
date {
match => [ "logTime" , "yyyy-MM-dd HH:mm:ss.SSS" ]
timezone => "UTC" }
mutate {
remove_field => ["apmJson","beat","stream","docker","log","source","type","prospector","host","logTime","logLevel","logger" ]
}
}
output {
if [@metadata][logType] == "APM" {
elasticsearch {
hosts => ["ip:9200"]
index => "monitoring"
document_type => "metric"
}
} else {
if [@metadata][logType] == "APP" {
elasticsearch {
hosts => ["ip:9200"]
index => "logstash-%{+YYYY.MM.dd}"
document_type => "app-log" } }
file {
path => "/products/logstash/logs/vf-docker-%{+YYYY.MM.dd}.log"
codec => line { format => "%{[@metadata][log]}" }
} } }

Any help is highly appreciated. Thanks

Yes. You may want to add a field in filebeat to identify the type of log if you do that.

For that hazelcast log I would use

    mutate { add_field => { "[@metadata][copy]" => "%{[message]}" } }
    mutate { gsub => [ "[@metadata][copy]", ".*\]", "" ] }
    kv { source => "[@metadata][copy]" field_split => ", " }

Hey Badger,

Thanks for the reply. I'll give it a go.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.