Logstash error while importing datas from kafka

Hi friends, I use logstash 2.4, kafka 0.10.1 and a cluster Elasticsearch 5.1.1

I import from Active Directory three logs: Application logs, System logs and Security logs.
I create three topics in zookeeper from that logs :
_ topic_id => "ActiveDirectory-Application-Logs"
_ topic_id => "ActiveDirectory-System-Logs"
_ topic_id => "ActiveDirectory-Security-Logs"

For storing datas, I use three indices in Elasticsearch:
_ index => "logstash-application"
_ index => "logstash-system"
_ index => "logstash-security"

In /etc/logstash/conf.d/ I have three conf files:

logstash-application.conf
input {
kafka {
zk_connect => ["192.xxx.xxx.xxx:2181"]
group_id => "logstash-application"
topic_id => "ActiveDirectory-Application-Logs"
reset_beginning => "false"
consumer_threads => 1
codec => json {}
}
}
output {
elasticsearch {
user => logstash_internal
password => xxxxxxxxx
hosts => ["192.xxx.xxx.xxx:9200"]
index => "logstash-application"}
}

logstash-system.conf
input {
kafka {
zk_connect => ["192.xxx.xxx.xxx:2181"]
group_id => "logstash-system"
topic_id => "ActiveDirectory-System-Logs"
reset_beginning => "false"
consumer_threads => 1
codec => json {}
}
}
output {
elasticsearch {
user => logstash_internal
password => xxxxxxxxxx
hosts => ["192.xxx.xxx.xxx:9200"]
index => "logstash-system"}
}

logstash-security.conf
input {
kafka {
zk_connect => ["192.xxx.xxx.xxx:2181"]
group_id => "logstash-security"
topic_id => "ActiveDirectory-Security-Logs"
reset_beginning => "false"
consumer_threads => 1
codec => json {}
}
}
output {
elasticsearch {
user => logstash_internal
password => xxxxxxxx
hosts => ["192.xxx.xxx.xxx:9200"]
index => "logstash-security"}
}

When I start the pipeline, datas are stored in elasticsearch. But in my logstash-security index, there is some mistakes, some system or application logs are present in security, or security logs in application.

I don't know how to force logs to go in the right index.

Thanks for your help, and sorry for my english.

Fayce

Logstash has a single event pipeline where events from all inputs go to all outputs. Separating inputs and outputs in different files does not change this. If you want an event to only reach some outputs you need to wrap the outputs in a conditionals to selects how events are routed.

This exact question comes up here quite often. I'm sure you'll find useful information in the archives.

Ok thanks, I will search again :slight_smile:

Do you think that unique file will work ?

input {
kafka {
zk_connect => ["192.xxx.xxx.xxx:2181"]
topic_id => "ActiveDirectory-System-Logs"
topic_id => "ActiveDirectory-Security-Logs"
topic_id => "ActiveDirectory-Application-Logs"]
reset_beginning => "false"
consumer_threads => 1
codec => json {}
}
}

output {
if [log_name] == "Application" {
elasticsearch {
user => logstash_internal
password => xxxxxxxxxx
embedded => true
index => "logstash-application"
}
} else if [log_name] == "System"{
elasticsearch {
user => logstash_internal
password => xxxxxxxxx
embedded => true
index => "logstash-system"
}
} else if [log_name] == "Security"{
user => logstash_internal
password => xxxxxxxxx
embedded => true
index => "logstash-security"
}
}

Yeah, follow that pattern (but you're missing an elasticsearch { line in the last else if block).

Here is my final conf file

input {
kafka {
zk_connect => ["192.xxx.xxx.xxx:2181"]
group_id => "logstash-application"
topic_id => "ActiveDirectory-Application-Logs"
reset_beginning => "false"
consumer_threads => 1
codec => json {}
}
kafka {
zk_connect => ["192.xxx.xxx.xxx:2181"]
group_id => "logstash-system"
topic_id => "ActiveDirectory-System-Logs"
reset_beginning => "false"
consumer_threads => 1
codec => json {}
}
kafka {
zk_connect => ["192.xxx.xxx.xxx:2181"]
group_id => "logstash-security"
topic_id => "ActiveDirectory-Security-Logs"
reset_beginning => "false"
consumer_threads => 1
codec => json {}
}
}

output {
if [log_name] == "Application" {
elasticsearch {
user => logstash_internal
password => xxxxxx
embedded => true
index => "logstash-application"
}
} else if [log_name] == "System"{
elasticsearch {
user => logstash_internal
password => xxxxxx
embedded => true
index => "logstash-system"
}
} else if [log_name] == "Security"{
elasticsearch {
user => logstash_internal
password => xxxxxx
embedded => true
index => "logstash-security"
}
}
}

But when I start the instance I have that error :

{:timestamp=>"2017-01-17T14:21:05.034000+0100", :message=>"fetched an invalid config", :config=>"input {\r\n kafka {\r\n zk_connect => ["192.xxx.xxx.xxx:2181"]\r\n group_id => "logstash-application"\r\n topic_id => "ActiveDirectory-Application-Logs"\r\n reset_beginning => "false"\r\n consumer_threads => 1\r\n codec => json {}\r\n}\r\n kafka {\r\n zk_connect => ["192.xxx.xxx.xxx:2181"]\r\n group_id => "logstash-system"\r\n topic_id => "ActiveDirectory-System-Logs"\r\n reset_beginning => "false"\r\n consumer_threads => 1\r\n codec => json {}\r\n}\r\n kafka {\r\n zk_connect => ["192.xxx.xxx.xxx:2181"]\r\n group_id => "logstash-security"\r\n topic_id => "ActiveDirectory-Security-Logs"\r\n reset_beginning => "false"\r\n consumer_threads => 1\r\n codec => json {}\r\n}\r\n}\r\n\r\noutput {\r\nif [log_name] == "Application" {\r\n elasticsearch {\r\n user => logstash_internal\r\n password => xxxxxx\r\n embedded => true\r\n index => "logstash-application"\r\n }\r\n } else if [log_name] == "System"{\r\n elasticsearch {\r\n user => logstash_internal\r\n password => xxxxxx\r\n embedded => true\r\n index => "logstash-system"\r\n }\r\n } else if [log_name] == "Security"{\r\n elasticsearch {\r\n user => logstash_internal\r\n password => xxxxxx\r\n embedded => true\r\n index => "logstash-security"\r\n}\r\n}\r\n}\n", :reason=>"Expected one of #, input, filter, output at line 1, column 1 (byte 1) after ", :level=>:error}

We are close :slight_smile: thanks for help

I don't believe the embedded option is supported in Logstash 2.0 and later, but it appears it complains about the very beginning of the file. Check that you don't have a garbage character there. Otherwise reduce the configuration until it works to narrow it down.

It works. you're right Magnus, I deleted the embedded option and used Hexdump - for garbage character.

Thanks a lot for help :slight_smile: :slight_smile: :slight_smile:

Fayce

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.