현재 filebeat으로 log file을 읽어서 logstash로 보내고 logstash에서 s3와 kafka로 전송하는 간단한 스트림을 구성했습니다.
file beat 설정은 아래와 같고
# List of prospectors to fetch data.
prospectors:
-
paths:
- D:\*.log
input_type: log
document_type: rhyme
ignore_older: 2h
output:
logstash:
# The Logstash hosts
hosts: ["internal.logstash:5100"]
logging:
level: debug
# enable file rotation with default configuration
to_files: true
# do not log to syslog
to_syslog: false
files:
path: C:\Program Files\Filebeat\log
rotateeverybytes: 10485760 # = 10MB
name: beat.log
keepfiles: 7
logstash의 경우에도 beat에서 받고 grok처리 후 output으로 가는 단순한 구조입니다.
input {
beats {
port => 5100
type => "rhyme"
congestion_threshold => 60
}
}
filter {
if [type] == "rhyme" {
grok {
match => ["message", "%{DATA:topic_id}\|%{DATA:message_id}\|%{GREEDYDATA:body}"]
}
}
}
output {
if [type] == "rhyme" {
s3 {
access_key_id => "???"
secret_access_key => "???"
region => "???"
bucket => "???"
time_file => 60
prefix => "rhyme-"
}
kafka {
topic_id => "%{topic_id}"
message_key => "%{message_id}"
codec => plain {
format => "%{body}"
}
bootstrap_servers => "internal.kafka:6667"
}
}
}
그런데 순서보장이 되지 않습니다. log파일은 분명히 순서대로 되어있습니다만...
전부 다 출력을 찍어보기에는 간헐적이라 잡아낼 끈기도 없구요.
혹시 체크해야할 포인트들만 알려주시면 감사하겠습니다.