Hi all,
I've two filebeats located on different servers with different purposes. One of them I've been using with the logstash configuration I give below to ship docker logs. And I've a new filebeat for shipping hazelcast logs with the given configuration. My ELK stack version is 5.5.1
So my question is can i configure logstash to listen on the same port for multiple beats?
And what filter would be good to use in order to ingest the log below? I couldn't quite wrap my head around that.
My example hazelcast log is below:
2019-03-26 09:43:37.617 INFO 31828 --- [d.HealthMonitor] c.h.internal.diagnostics.HealthMonitor : [ipv4]:5701 [servr-prod] [3.8.3] processors=8, physical.memory.total=31.2G, physical.memory.free=1001.0M, swap.space.total=16.0G, swap.space.free=14.9G, heap.memory.used=6.2G, heap.memory.free=5.2G, heap.memory.total=11.4G, heap.memory.max=11.4G, heap.memory.used/total=54.22%, heap.memory.used/max=54.22%, minor.gc.count=329, minor.gc.time=65348ms, major.gc.count=2, major.gc.time=109ms, load.process=0.00%, load.system=0.50%, load.systemAverage=54.00%, thread.count=74, thread.peakCount=79, cluster.timeDiff=-16228, event.q.size=0, executor.q.async.size=0, executor.q.client.size=0, executor.q.query.size=0, executor.q.scheduled.size=0, executor.q.io.size=0, executor.q.system.size=0, executor.q.operations.size=1, executor.q.priorityOperation.size=0, operations.completed.count=390597068, executor.q.mapLoad.size=0, executor.q.mapLoadAllKeys.size=0, executor.q.cluster.size=0, executor.q.response.size=0, operations.running.count=0, operations.pending.invocations.percentage=0.00%, operations.pending.invocations.count=1, proxy.count=0, clientEndpoint.count=19, connection.active.count=21, client.connection.count=19, connection.count=2
The one to deliver hazelcast logs' configuration is below:
filebeat.prospectors:
- input_type: log
paths:- /products/logs/hzcluster.log
multiline.pattern: '^['
multiline.negate: true
multiline.match: after
output.logstash:
hosts: ["ip:5044"]
logging.level: debug
- /products/logs/hzcluster.log
The one responsible for delivering docker logs is below:
filebeat:
prospectors:
- type: log
paths:
- /var/lib/docker/containers//.log
json:
message_key: log
keys_under_root: true
multiline:
pattern: '^APP||^APM||^20\d\d-'
negate: true
match: after
processors:
- add_docker_metadata: ~
output:
logstash:
hosts: ["ip:5044"]
compression_level: 0
logging:
metrics:
enabled: false
My logtash config is below:
input {
beats {
port => 5044
type => "docker_logs" } }
filter {
mutate { copy => { "log" => "[@metadata][log]" }}
if [log] =~ /^APM/ {
mutate { add_field => { "[@metadata][logType]" => "APM" } }
grok {
match => { "log" => "APM|(?[^|])|(?.)" }
}
mutate {
# The new line here is the only why to put a new line ...
gsub => ["apmJson", "", ""]
}
json {
source => "apmJson"
}
} else if [log] =~ /^APP/ {
mutate { add_field => { "[@metadata][logType]" => "APP" } }
grok {
match => { "log" => "APP|(?[^|])|(?[^|])|(?[^|])|(?[^|])|(?[^|])|(?[^|])|(?[^|])|(?[.a-zA-Z0-9]) --- (?.*)" } } }
date {
match => [ "logTime" , "yyyy-MM-dd HH:mm:ss.SSS" ]
timezone => "UTC" }
mutate {
remove_field => ["apmJson","beat","stream","docker","log","source","type","prospector","host","logTime","logLevel","logger" ]
}
}
output {
if [@metadata][logType] == "APM" {
elasticsearch {
hosts => ["ip:9200"]
index => "monitoring"
document_type => "metric"
}
} else {
if [@metadata][logType] == "APP" {
elasticsearch {
hosts => ["ip:9200"]
index => "logstash-%{+YYYY.MM.dd}"
document_type => "app-log" } }
file {
path => "/products/logstash/logs/vf-docker-%{+YYYY.MM.dd}.log"
codec => line { format => "%{[@metadata][log]}" }
} } }
Any help is highly appreciated. Thanks