Create multiple indexes with single logstash configuration file depending upon the kafka channel


(Nandita Rane) #1

Please help me to resolve this issue.

Issue Description: I want to create a logstash configuration which will put logs in two different index.

I tried with this configuration but not able to create index in logstash.

please find the logstash configuration here,

input {
kafka {
codec => 'json'
bootstrap_servers =>['']
topics => 'application-logs'
auto_offset_reset => 'earliest'
group_id => 'logstash-test'
}
}
filter {
date {
match => ["[Kafka][EventTimestamp]", "yyyy-MM-dd-HH:mm:ss.SSSSSS"]
timezone => "Asia/Kolkata"
}
if [org_name] == "***gg-apic-uat" {
if [status_code] == "200 OK" {
mutate {
add_field => {"[Kafka][EventType]" => "LOG"}
}
} else {
mutate {
add_field => {"[Kafka][EventType]" => "ERROR"}
}
}
ruby {
code => "event.set('datetime2', DateTime.parse(event.get('[datetime]')).strftime('%Y-%m-%d-%H:%M:%S.%L'))"
}
mutate {
add_field => {"[Kafka][Channel]" => "APIConnect"}
add_field => {"[Kafka][TransactionTimeStamp]" => "%{[datetime2]}"}
add_field => {"[Kafka][TransactionID]" => "%{[transaction_id]}"}
add_field => {"[Kafka][EventTimestamp]" => "%{[datetime2]}"}
add_field => {"[Kafka][Code]" => "%{[status_code]}"}
add_field => {"[Kafka][Comoponent]" => "%{[api_name]}"}
add_field => {"[Kafka][Application]" => "%{[org_name]}"}
add_field => {"[Kafka][Node]" => "%{[uri_path]}"}
add_field => {"[Kafka][SourceIP]" => "%{[client_ip]}"}
add_field => {"[Kafka][Server]" => "%{[host]}"}
add_field => {"[Kafka][Description]" => "%{status_code}"}
}
# prune {
# whitelist_names => ["Kafka","@version","@timestamp"]
# }
}
if ![Kafka] {
drop {}
}
if ![Kafka][EventType] {
drop{}
}
if [Kafka][Channel] == "" {
drop {}
}
mutate {
add_field => ["[@metadata][DateSuffix]", ""]
}
ruby {
code => "event.set('[@metadata][DateSuffix]', DateTime.now.strftime('%Y-%m-%d')) "
}
ruby {
#code => "event.set('message_size', event.get('[Kafka][OriginalPayload]').bytesize)"
code => "event.set('message_size', event.get('[Kafka]').to_s.bytesize)"
}
}
output {
if ([Kafka][Channel] == "bulk-emal" or [Kafka][Channel] == "bulk-push") {
elasticsearch {
codec => json
hosts=> "http://"
index=>"test-bulk-logs-%{[@metadata][DateSuffix]}"
document_type => "logs"
}
}
else
{
elasticsearch {
codec => json
hosts=> "http:"
index=>"test-logs-%{[@metadata][DateSuffix]}"
document_type => "logs"

}
}
}


(Staale) #2

Did you try to replace both outputs with just a file to verify that your documents look like you expect? Always good to confirm that the docs are correct before you implement the if-then clauses.


(Nandita Rane) #3

Hi sir,
Thanks for quick reply,

the else loop in oouput is getting executed and test-log-* index is getting created
but the if loop is simply ignored by logstash and not throwing any error or exception

Your guidance will be of great help as I am stuck here since 4 days.

Thanks and regards,
Nandita Rasam.


(Staale) #4

I dont think the IF loop is ignored, but maybe the conditions are not met.. Could you paste an anonymized version of a document so we get to see a real example of one.


(Nandita Rane) #5

Hello Team,

Thanks for your help.

Issue is resolved.

Solution:
I checked kibana json and gave proper json fields in if condition.

Please find the solution here for future reference.


input {
kafka {
codec => 'json'
bootstrap_servers =>['xx.xxx.xxx.xx:9092']
topics => 'test-logs'
auto_offset_reset => 'earliest'
group_id => 'logstash-XXXX'
}
}
filter {

    date {
            match => ["[Kafka][EventTimestamp]", "yyyy-MM-dd-HH:mm:ss.SSSSSS"]
            timezone => "Asia/Kolkata"
    }

    if [org_name] == "XXX-XXXXX-XX" {
            if [status_code] == "200 OK" {
                    mutate {
                            add_field => {"[Kafka][EventType]" => "LOG"}
                    }
            } else {
                    mutate {
                            add_field => {"[Kafka][EventType]" => "ERROR"}
                    }
            }
            ruby {
                    code => "event.set('datetime2', DateTime.parse(event.get('[datetime]')).strftime('%Y-%m-%d-%H:%M:%S.%L'))"
            }
            mutate {
                    add_field => {"[Kafka][Channel]" => "APIConnect"}
                    add_field => {"[Kafka][TransactionTimeStamp]" => "%{[datetime2]}"}
                    add_field => {"[Kafka][TransactionID]" => "%{[transaction_id]}"}
                    add_field => {"[Kafka][EventTimestamp]" => "%{[datetime2]}"}
                    add_field => {"[Kafka][Code]" => "%{[status_code]}"}
                    add_field => {"[Kafka][Comoponent]" => "%{[api_name]}"}
                    add_field => {"[Kafka][Application]" => "%{[org_name]}"}
                    add_field => {"[Kafka][Node]" => "%{[uri_path]}"}
                    add_field => {"[Kafka][SourceIP]" => "%{[client_ip]}"}
                    add_field => {"[Kafka][Server]" => "%{[host]}"}
                    add_field => {"[Kafka][Description]" => "%{status_code}"}
            }               
    }

    if ![Kafka]  {
            drop {}
    }
    if ![Kafka][EventType] {
            drop{}
    }
    if [Kafka][Channel] == "" {
            drop {}
    }
    mutate {
            add_field => ["[@metadata][DateSuffix]", ""]
    }
    ruby {
            code => "event.set('[@metadata][DateSuffix]', DateTime.now.strftime('%Y-%m-%d')) "
    }
    ruby {                
            code => "event.set('message_size', event.get('[Kafka]').to_s.bytesize)"
    }

}

output {
if ([Kafka][SubComponent]=="PPPPP" or [Kafka][SubComponent]=="QQQQ" or [Kafka][SubComponent]=="RRRR") {
elasticsearch {
codec => json
hosts=> "http://xx.xxx.xxx.xx:9200"
index=>"test-logs-%{[@metadata][DateSuffix]}"
document_type => "logs"
}
}
else {
elasticsearch {
codec => json
hosts=> "http://xx.xx.xxx.xx:9200"
index=>"rest-logs-%{[@metadata][DateSuffix]}"
document_type => "logs"
}
}
stdout {
codec => rubydebug
}
}input {
kafka {
codec => 'json'
bootstrap_servers =>['xx.xxx.xxx.xx:9092']
topics => 'test-logs'
auto_offset_reset => 'earliest'
group_id => 'logstash-XXXX'
}
}
filter {

    date {
            match => ["[Kafka][EventTimestamp]", "yyyy-MM-dd-HH:mm:ss.SSSSSS"]
            timezone => "Asia/Kolkata"
    }

    if [org_name] == "XXX-XXXXX-XX" {
            if [status_code] == "200 OK" {
                    mutate {
                            add_field => {"[Kafka][EventType]" => "LOG"}
                    }
            } else {
                    mutate {
                            add_field => {"[Kafka][EventType]" => "ERROR"}
                    }
            }
            ruby {
                    code => "event.set('datetime2', DateTime.parse(event.get('[datetime]')).strftime('%Y-%m-%d-%H:%M:%S.%L'))"
            }
            mutate {
                    add_field => {"[Kafka][Channel]" => "APIConnect"}
                    add_field => {"[Kafka][TransactionTimeStamp]" => "%{[datetime2]}"}
                    add_field => {"[Kafka][TransactionID]" => "%{[transaction_id]}"}
                    add_field => {"[Kafka][EventTimestamp]" => "%{[datetime2]}"}
                    add_field => {"[Kafka][Code]" => "%{[status_code]}"}
                    add_field => {"[Kafka][Comoponent]" => "%{[api_name]}"}
                    add_field => {"[Kafka][Application]" => "%{[org_name]}"}
                    add_field => {"[Kafka][Node]" => "%{[uri_path]}"}
                    add_field => {"[Kafka][SourceIP]" => "%{[client_ip]}"}
                    add_field => {"[Kafka][Server]" => "%{[host]}"}
                    add_field => {"[Kafka][Description]" => "%{status_code}"}
            }               
    }

    if ![Kafka]  {
            drop {}
    }
    if ![Kafka][EventType] {
            drop{}
    }
    if [Kafka][Channel] == "" {
            drop {}
    }
    mutate {
            add_field => ["[@metadata][DateSuffix]", ""]
    }
    ruby {
            code => "event.set('[@metadata][DateSuffix]', DateTime.now.strftime('%Y-%m-%d')) "
    }
    ruby {                
            code => "event.set('message_size', event.get('[Kafka]').to_s.bytesize)"
    }

}

output {
if ([Kafka][SubComponent]=="PPPPP" or [Kafka][SubComponent]=="QQQQ" or [Kafka][SubComponent]=="RRRR") {
elasticsearch {
codec => json
hosts=> "http://xx.xxx.xxx.xx:9200"
index=>"test-logs-%{[@metadata][DateSuffix]}"
document_type => "logs"
}
}
else {
elasticsearch {
codec => json
hosts=> "http://xx.xx.xxx.xx:9200"
index=>"rest-logs-%{[@metadata][DateSuffix]}"
document_type => "logs"
}
}
stdout {
codec => rubydebug
}
}


Thanks and Regards,

Nandita Rasam.