Hi
I have a Windows FTP server that produces 3 different logs and they are all output in to the same folder on C: Drive. I am trying to get a separation between the logs so that I can create custom filters. I have set up a test for 2 of the logs and I am getting an issue with a duplication of files in Kibana. Everything else seems to be working ok so I am not sure why I am getting doubled up entries.
Audit Log
25 Aug 2017 00:00:08 Default Site 192.xxx.xxx.xxx user LogIn ****** SSH 22
Diagnostic Log
2017-09-15 06:31:15,790 INFO SFTPConnection [Session.4502958:Default Site:user?] Client sent SSH_MSG_KEXINIT
Filbeat.yml on Windows FTP server
- input_type: log
paths:
- C:\Test\audit.log
document_type: audit
- input_type: log
paths:
- C:\Test\diagnostic.log
document_type: diagnostic
output.logstash:
# The Logstash hosts
hosts:["10.xxx.xxx.xxx:5043", "10.xxx.xxx.xxx:5044"]
Logstash 2 x .conf files @ /etc/logstash/conf.d/
audit.conf
input{
beats{
port => "5043"
}
}
filter {
if [type] == "audit" {
grok {
patterns_dir => ["/etc/logstash/patterns"]
match => { "message" => "%{FTP_STAMP:ftpstamp}%{DATA:LOGLEVEL}%{IPV4:client}%{SPACE}%{USERNAME:userid}%{SPACE}%{GREEDYDATA:message}" }
}
date {
match => ["ftpstamp" , "dd MMM yyyy HH:mm:ss"]
target => ["@timestamp"]
}
}
}
output {
elasticsearch {
hosts => ["10.xxx.xxx.xxx:9200"]
manage_template => false
index => "%{[@metadata][beat]}-%{+YYYY.MM.dd}"
document_type => "%{[@metadata][type]}"
}
}
diagnostic.conf
input{
beats{
port => "5044"
}
}
filter {
if [type] == "diagnostic" {
grok {
match => { "message" => "%{TIMESTAMP_ISO8601:timestamp}%{GREEDYDATA:message}" }
}
date {
match => ["timestamp" , "yyyy-MM-dd HH:mm:ss,SSS"]
target => ["@timestamp"]
}
}
}
output {
elasticsearch {
hosts => ["10.xxx.xxx.xxx:9200"]
manage_template => false
index => "%{[@metadata][beat]}-%{+YYYY.MM.dd}"
document_type => "%{[@metadata][type]}"
}
}`
Tail of Logstash log during start up
[2017-09-15T14:06:04,423][INFO ][logstash.outputs.elasticsearch] Elasticsearch pool URLs updated {:changes=>{:removed=>[], :added=>[http://10.xxx.xxx.xxx:9200/]}}
[2017-09-15T14:06:04,432][INFO ][logstash.outputs.elasticsearch] Running health check to see if an Elasticsearch connection is working {:healthcheck_url=>http://10.xxx.xxx.xxx:9200/, :path=>"/"}
[2017-09-15T14:06:04,597][WARN ][logstash.outputs.elasticsearch] Restored connection to ES instance {:url=>"http://10.xxx.xxx.xxx:9200/"}
[2017-09-15T14:06:04,601][INFO ][logstash.outputs.elasticsearch] New Elasticsearch output {:class=>"LogStash::Outputs::ElasticSearch", :hosts=>["//10.xxx.xxx.xxx:9200"]}
[2017-09-15T14:06:04,609][INFO ][logstash.outputs.elasticsearch] Elasticsearch pool URLs updated {:changes=>{:removed=>[], :added=>[http://10.xxx.xxx.xxx:9200/]}}
[2017-09-15T14:06:04,610][INFO ][logstash.outputs.elasticsearch] Running health check to see if an Elasticsearch connection is working {:healthcheck_url=>http://10.xxx.xxx.xxx:9200/, :path=>"/"}
[2017-09-15T14:06:04,616][WARN ][logstash.outputs.elasticsearch] Restored connection to ES instance {:url=>"http://10.xxx.xxx.xxx:9200/"}
[2017-09-15T14:06:04,618][INFO ][logstash.outputs.elasticsearch] New Elasticsearch output {:class=>"LogStash::Outputs::ElasticSearch", :hosts=>["//10.xxx.xxx.xxx:9200"]}
[2017-09-15T14:06:04,859][INFO ][logstash.pipeline ] Starting pipeline {"id"=>"main", "pipeline.workers"=>4, "pipeline.batch.size"=>125, "pipeline.batch.delay"=>5, "pipeline.max_inflight"=>500}
[2017-09-15T14:06:05,857][INFO ][logstash.inputs.beats ] Beats inputs: Starting input listener {:address=>"0.0.0.0:5043"}
[2017-09-15T14:06:05,954][INFO ][logstash.inputs.beats ] Beats inputs: Starting input listener {:address=>"0.0.0.0:5044"}
[2017-09-15T14:06:05,957][INFO ][logstash.pipeline ] Pipeline main started
[2017-09-15T14:06:06,173][INFO ][logstash.agent ] Successfully started Logstash API endpoint {:port=>9600}
JSON file from one of the duplicates
{
"_index": "filebeat-2017.08.24",
"_type": "audit",
"_id": "AV6DaUIRTNMqag3Z2IkO",
"_version": 1,
"_score": null,
"_source": {
"offset": 153,
"input_type": "log",
"source": "C:\\Test\\audit.log",
"message": [
"25 Aug 2017 00:00:05\tDefault Site\t10.xxx.xxx.xxx\tuser\tLogIn\t******\tSSH\t22\t",
"LogIn\t******\tSSH\t22\t"
],
"type": "audit",
"userid": "user",
"tags": [
"Aust_Melb",
"beats_input_codec_plain_applied"
],
"@timestamp": "2017-08-24T14:00:05.000Z",
"ftpstamp": "25 Aug 2017 00:00:05",
"@version": "1",
"beat": {
"hostname": "FTP1",
"name": "FTP1",
"version": "5.5.2"
},
"host": "FTP1",
"client": "10.xxx.xxx.xxx",
"fields": {
"test": "test",
"hosts": [
"localhost:9200"
]
},
"LOGLEVEL": "\tDefault Site\t"
},
"fields": {
"@timestamp": [
1503583205000
]
},
"sort": [
1503583205000
]
}
Any help with this one would be much appreciated
Regards
SC