How to configure different indexes in logstash

Hi Team,

Can anyone help me in confugiring multiple indexes so that logs are shipped to different indices based on the environment type(PROD,SIT & DEV). Currently my stepup is working with default filebeat-* index

Logstash configuration
input {
beats {
port => "5044"
ssl => true
ssl_certificate => "/etc/pki/tls/certs/logstash-forwarder.crt"
ssl_key => "/etc/pki/tls/private/logstash-forwarder.key"
}
}
filter {
if [type] == "syslog" {
grok {
match => { "message" => "%{SYSLOGTIMESTAMP:syslog_timestamp} %{SYSLOGHOST:syslog_hostname} %{DATA:syslog_program}(?:[%{POSINT:syslog_pid}])?: %{GREEDYDATA:syslog_message}" }
add_field => [ "received_at", "%{@timestamp}" ]
add_field => [ "received_from", "%{host}" ]
}
syslog_pri { }
date {
match => [ "syslog_timestamp", "MMM d HH:mm:ss", "MMM dd HH:mm:ss" ]
}
}
}
output {
elasticsearch {
hosts => ["localhost:9200"]
user => "xxx"
password => "xxx"
ssl => true
ssl_certificate_verification => true
index => "%{[@metadata][beat]}-%{+YYYY.MM.dd}"
document_type => "%{[@metadata][type]}"
}
}

This should help:

@magnusbaeck Thanks for your response.

I am using filebeat I have define path of logfile in paths:

  • /opt/example/*.log
    input_type: log

I want to look all the errormessages and error message details so I tried to index both messages

2016-Aug-21 08:11:41 646;ERROR ;Thread-21;YFS10003 ;[1471939901646] YFS:Invalid Order ; [system]; IntegrationAdapter

2016-Aug-21 08:11:41 647;ERRORDTL;Thread-21;YFS10003 ;[1471939901646]<?xml version="1.0" encoding="UTF-8"?>

I followed your steps and configured logstash

input {
beats {
port => "5044"
ssl => true
ssl_certificate => "/etc/pki/tls/certs/logstash-forwarder.crt"
ssl_key => "/etc/pki/tls/private/logstash-forwarder.key"
type => "ERROR"
}
beats {
port => "5044"
ssl => true
ssl_certificate => "/etc/pki/tls/certs/logstash-forwarder.crt"
ssl_key => "/etc/pki/tls/private/logstash-forwarder.key"
type => "ERRORDTL"
}
path => ["/opt/example/*.log"]
type => "syslog"
}
}

filter {
if [type] == "syslog" {
grok {
match => { "message" => "%{TIMESTAMP_ISO8601:time} %{LOGLEVEL:ERR?} [%{NUMBER:thread}] %{JAVACLASS:class} - %{GREEDYDATA:msg}" }
}
date {
match => [ "syslog_timestamp", "MMM d HH:mm:ss", "MMM dd HH:mm:ss" ]
}
}
}
output{
if [type] == " ERROR " {
elasticsearch {
hosts => ["localhost:9200"]
user => "xxx"
password => "xxx"
ssl => true
ssl_certificate_verification => true
index => " ERROR"
}
} else {
elasticsearch {
hosts => ["localhost:9200"]
user => "xx"
password => "xxx"
ssl => true
ssl_certificate_verification => true
index => " ERRORDTL"
}
}
}

Currently I am not getting any logs to logstash. Can you please let me know where could be the error.

I am new to this grokfilters and multiple indexing.

As I mentioned in firstpost everything works fine with default filebeat-* index.

input {
beats {
port => "5044"
ssl => true
ssl_certificate => "/etc/pki/tls/certs/logstash-forwarder.crt"
ssl_key => "/etc/pki/tls/private/logstash-forwarder.key"
type => "ERROR"
}
beats {
port => "5044"
ssl => true
ssl_certificate => "/etc/pki/tls/certs/logstash-forwarder.crt"
ssl_key => "/etc/pki/tls/private/logstash-forwarder.key"
type => "ERRORDTL"
}
path => ["/opt/example/*.log"]
type => "syslog"
}
}

Two problems make this configuration invalid:

  • You can't have to beats inputs that listen to the same port.
  • The path and type options belong inside a file block. Is this what your config looks like or is it a copy/paste mistake?

Currently I am not getting any logs to logstash.

Not any logs to Logstash or not any logs to Elasticsearch?

if [type] == " ERROR " {

Why do you have spaces on both sides of "ERROR"?

index => " ERROR"
index => " ERRORDTL"

Why do you have leading spaces in the index names?

1 Like

@magnusbaeck Thanks for your response

Actually I am looking for logs from dev servers gone to filebeat-dev index and sit server logs to filebeat-sit index.

How can I achieve this from existing setup. I have shared you field details and current logstash configuration where all logs of different environments shown under filebeat- indexin kibana

1 Like

fields of filebeats***************************************

curl -XGET "https://localhost:9200/filebeat-*/_search?pretty" -u logstash
Enter host password for user 'logstash':
{
"took" : 36,
"timed_out" : false,
"_shards" : {
"total" : 100,
"successful" : 100,
"failed" : 0
},
"hits" : {
"total" : 25882438,
"max_score" : 1.0,
"hits" : [ {
"_index" : "filebeat-2016.08.06",
"_type" : "log",
"_id" : "AVZjRgYRFjxIs5i8r3d4",
"_score" : 1.0,
"_source" : {
"message" : "16:07:20,279 DEBUG : # multipath.conf written by anaconda",
"@version" : "1",
"@timestamp" : "2016-08-06T12:41:15.576Z",
"count" : 1,
"offset" : 4222,
"type" : "log",
"input_type" : "log",
"source" : "/var/log/anaconda.storage.log",
"fields" : null,
"beat" : {
"hostname" : "dev.example.com",
"name" : "dev.example.com"
},
"host" : "dev.example.com",
"tags" : [ "beats_input_codec_plain_applied" ]
}
},

***************filbeat.json template which I have installed *********************************************************

{
"mappings": {
"default": {
"_all": {
"enabled": true,
"norms": {
"enabled": false
}
},
"dynamic_templates": [
{
"template1": {
"mapping": {
"doc_values": true,
"ignore_above": 1024,
"index": "not_analyzed",
"type": "{dynamic_type}"
},
"match": ""
}
}
],
"properties": {
"@timestamp": {
"type": "date"
},
"message": {
"type": "string",
"index": "analyzed"
},
"offset": {
"type": "long",
"doc_values": "true"
}
}
}
},
"settings": {
"index.refresh_interval": "5s"
},
"template": "filebeat-
"
}

************************current logstash configuration *******************************************

input {
beats {
port => "5044"
ssl => true
ssl_certificate => "/etc/pki/tls/certs/logstash-forwarder.crt"
ssl_key => "/etc/pki/tls/private/logstash-forwarder.key"
}
}
filter {
if [type] == "syslog" {
grok {
match => { "message" => "%{SYSLOGTIMESTAMP:syslog_timestamp} %{SYSLOGHOST:syslog_hostname} %{DATA:syslog_program}(?:[%{POSINT:syslog_pid}])?: %{GREEDYDATA:syslog_message}" }
add_field => [ "received_at", "%{@timestamp}" ]
add_field => [ "received_from", "%{host}" ]
}
syslog_pri { }
date {
match => [ "syslog_timestamp", "MMM d HH:mm:ss", "MMM dd HH:mm:ss" ]
}
}
}
output {
elasticsearch {
hosts => ["localhost:9200"]
user => "xxx"
password => "xxx"
ssl => true
index => "%{[@metadata][beat]}-%{+YYYY.MM.dd}"
document_type => "%{[@metadata][type]}"
}
}
~
~

Actually I am looking for logs from dev servers gone to filebeat-dev index and sit server logs to filebeat-sit index.

I believe that's covered by the StackOverflow post I referred you to earlier. If not, please ask a specific question.

@magnusbaeck how can get "type" field here as per your post. I am using filebeats on client servers and JSON template I installed on master. Already shared you the fileds in old post.

Should I give logfilepath again if I am using beats?

Request you to help on this

POST***********************
input {
udp {
...
type => "foo"
}
file {
...
type => "bar"
}
}

output {
if [type] == "foo" {
elasticsearch {
...
index => "foo-index"
}
} else {
elasticsearch {
...
index => "bar-index"
}
}
}

You don't have to use the type field, you can use any field you like. Maybe the hostname in the host field can be used? That's up to you.

@magnusbaeck thanks for your response.

I have configured my logstash output based on hostname but it doesnot reflect in Kibana.

I tried to search for index filbeatdev or filebeatsit but nothing is shown like that.

Not able to see any error messages in logstash
final message says
{:timestamp=>"2016-08-25T19:29:35.213000-0500", :message=>"Pipeline main started"}

**************************configuration is as below
multiple indexing

input {
beats {
port => "5044"
ssl => true
ssl_certificate => "/etc/pki/tls/certs/logstash-forwarder.crt"
ssl_key => "/etc/pki/tls/private/logstash-forwarder.key"
}
}
filter {
if [type] == "syslog" {
grok {
match => { "message" => "%{SYSLOGTIMESTAMP:syslog_timestamp} %{SYSLOGHOST:syslog_hostname} %{DATA:syslog_program}(?:[%{POSINT:syslog_pid}])?: %{GREEDYDATA:syslog_message}" }
add_field => [ "received_at", "%{@timestamp}" ]
add_field => [ "received_from", "%{host}" ]
}
syslog_pri { }
date {
match => [ "syslog_timestamp", "MMM d HH:mm:ss", "MMM dd HH:mm:ss" ]
}
}
}

output{
if [host] == ["example1.sit.com","example2.sit.com"] {
elasticsearch {
hosts => ["localhost:9200"]
user => "xxx"
password => "xxx"
ssl => true
ssl_certificate_verification => true
truststore => "/xxxx"
truststore_password => "xxxx"
index => "%{[@metadata][beatsit]}"
document_type => "%{[@metadata][type]}"
}
} else {
elasticsearch {
hosts => ["localhost:9200"]
user => "xxx"
password => "xxx"
ssl => true
ssl_certificate_verification => true
truststore => "/xxxx"
truststore_password => "xxxx"
index => "%{[@metadata][beatdev]}"
document_type => "%{[@metadata][type]}"
}
}
}

I see below errors in logstash now

:message=>"Failed action. ", :status=>400, :action=>["index", {:_id=>nil, :_index=>"%{[@metadata][beatdev]}--2016.08.26", :_type=>"filesystem", :_routing=>nil}, #<LogStash::Event:0x2274705f @metadata_accessors=#<LogStash::Util::Accessors:0x1fb6eb66 @store={"type"=>"filesystem", "beat"=>"topbeat"}, @lut={"[type]"=>[{"type"=>"filesystem", "beat"=>"topbeat"}, "type"], "[beatdev]"=>[{"type"=>"filesystem", "beat"=>"topbeat"}, "beatdev"]}>, @cancelled=false, @data={"@timestamp"=>"2016-08-26T04:49:59.610Z", "type"=>"filesystem", "fs"=>{"device_name"=>"none", "total"=>0, "used"=>0, "used_p"=>0, "free"=>0, "avail"=>0, "files"=>0, "free_files"=>0, "mount_point"=>"/proc/sys/fs/binfmt_misc"}, "count"=>1, "beat"=>{"hostname"=>"examplebuild.build.com", "name"=>"examplebuild.build.com"}, "@version"=>"1", "host"=>"examplebuild.build.com", "tags"=>["beats_input_raw_event"]}, @metadata={"type"=>"filesystem", "beat"=>"topbeat"}, @accessors=#<LogStash::Util::Accessors:0x16b84b51 @store={"@timestamp"=>"2016-08-26T04:49:59.610Z", "type"=>"filesystem", "fs"=>{"device_name"=>"none", "total"=>0, "used"=>0, "used_p"=>0, "free"=>0, "avail"=>0, "files"=>0, "free_files"=>0, "mount_point"=>"/proc/sys/fs/binfmt_misc"}, "count"=>1, "beat"=>{"hostname"=>"examplebuild.build.com", "name"=>"examplebuild.build.com"}, "@version"=>"1", "host"=>"examplebuild.build.com", "tags"=>["beats_input_raw_event"]}, @lut={"[beat][hostname]"=>[{"hostname"=>"examplebuild.build.com", "name"=>"examplebuild.build.com"}, "hostname"], "host"=>[{"@timestamp"=>"2016-08-26T04:49:59.610Z", "type"=>"filesystem", "fs"=>{"device_name"=>"none", "total"=>0, "used"=>0, "used_p"=>0, "free"=>0, "avail"=>0, "files"=>0, "free_files"=>0, "mount_point"=>"/proc/sys/fs/binfmt_misc"}, "count"=>1, "beat"=>{"hostname"=>"examplebuild.build.com", "name"=>"examplebuild.build.com"}, "@version"=>"1", "host"=>"examplebuild.build.com", "tags"=>["beats_input_raw_event"]}, "host"], "tags"=>[{"@timestamp"=>"2016-08-26T04:49:59.610Z", "type"=>"filesystem", "fs"=>{"device_name"=>"none", "total"=>0, "used"=>0, "used_p"=>0, "free"=>0, "avail"=>0, "files"=>0, "free_files"=>0, "mount_point"=>"/proc/sys/fs/binfmt_misc"}, "count"=>1, "beat"=>{"hostname"=>"examplebuild.build.com", "name"=>"examplebuild.build.com"}, "@version"=>"1", "host"=>"examplebuild.build.com", "tags"=>["beats_input_raw_event"]}, "tags"], "[type]"=>[{"@timestamp"=>"2016-08-26T04:49:59.610Z", "type"=>"filesystem", "fs"=>{"device_name"=>"none", "total"=>0, "used"=>0, "used_p"=>0, "free"=>0, "avail"=>0, "files"=>0, "free_files"=>0, "mount_point"=>"/proc/sys/fs/binfmt_misc"}, "count"=>1, "beat"=>{"hostname"=>"examplebuild.build.com", "name"=>"examplebuild.build.com"}, "@version"=>"1", "host"=>"examplebuild.build.com", "tags"=>["beats_input_raw_event"]}, "type"], "[host]"=>[{"@timestamp"=>"2016-08-26T04:49:59.610Z", "type"=>"filesystem", "fs"=>{"device_name"=>"none", "total"=>0, "used"=>0, "used_p"=>0, "free"=>0, "avail"=>0, "files"=>0, "free_files"=>0, "mount_point"=>"/proc/sys/fs/binfmt_misc"}, "count"=>1, "beat"=>{"hostname"=>"examplebuild.build.com", "name"=>"examplebuild.build.com"}, "@version"=>"1", "host"=>"examplebuild.build.com", "tags"=>["beats_input_raw_event"]}, "host"]}>>], :response=>{"create"=>{"_index"=>"%{[@metadata][beatdev]}--2016.08.26", "_type"=>"filesystem", "_id"=>"AVbFLu-gVOSyDPZ_PtOh", "status"=>400, "error"=>{"type"=>"illegal_argument_exception", "reason"=>"mapper [fs.used_p] of different type, current_type [double], merged_type [long]"}}}, :level=>:warn}

index => "%{[@metadata][beatsit]}"

What is this line supposed to mean? Do you have a [@metadata][beatsit] field in your events? What name do you want to use?

@magnusbaeck
logfile is as below

2016-Aug-24 22:14:31 400;WARN ;RMI TCP Connection(37948)-10.136.232.50; ;Clearing cache. Number cached=0 ; [system]; YFS_Additional_AttributeDBCacheHome @version:1 source:/opt/apps/Sterling94/Foundation/logs/sci_.log type:log input_type:log beat.hostname:examplesit.sit.com beat.name:examplesit.sit.com host:examplesit.sit.com tags:beats_input_codec_plain_applied @timestamp:August 25th 2016, 11:13:32.846 offset:2,171,741 count:1 fields: - _id:AVbAjssd_SBZg0ZCoQ0R _type:log _index:filebeat-2016.08.25 _score: -

previously with single index it will give filebeat-* as index

configuration as below
output {
elasticsearch {
hosts => ["localhost:9200"]
user => "xxx"
password => "xxx"
ssl => true
ssl_certificate_verification => true
truststore => "xxx"
truststore_password => "xxx"
index => "%{[@metadata][beat]}-%{+YYYY.MM.dd}"
document_type => "%{[@metadata][type]}"
}

Can you please help me here what I am doing wrong as I feel that I am close to crack multiple indexing but something is missing

Should I use different grok filter?

Please answer my questions. What index name do you want to use for the events from example?.sit.com?

Also,

if [host] == ["example1.sit.com","example2.sit.com"] {

doesn't work the way you think. This is what you want:

if [host] in ["example1.sit.com", "example2.sit.com"] {

I want to use "beats_sit" as indexname for sit environment (example1 and example2)
and other servers as "beats_dev" as index name.

if [host] in ["example1.sit.com", "example2.sit.com"] {
  elasticsearch {
    ...
    index => "beats_sit-%{+YYYY.MM.dd}"
  }
} else {
  elasticsearch {
    ...
    index => "beats_dev-%{+YYYY.MM.dd}"
  }
}
1 Like

@magnusbaeck you are brilliant and now multiple indexes is working. Thanks a million for your timely support. This is the bestever support I recieved with Elasticsearch team!!

One last thing if we have different types of beats like filebeat,topbeat and packetbeat how can we do this. Now all beats are moving to one index as per environment.

But that's what you did previously:

index => "%{[@metadata][beat]}-%{+YYYY.MM.dd}"

Hi can you please share your logstash conf,filebeats conf file ?? How is the index -beats_sit-YYYYMMDD working in kibana .have u created a template named beats_sit*.