Please help me to resolve this issue.
Issue Description: I want to create a logstash configuration which will put logs in two different index.
I tried with this configuration but not able to create index in logstash.
please find the logstash configuration here,
input {
kafka {
codec => 'json'
bootstrap_servers =>['']
topics => 'application-logs'
auto_offset_reset => 'earliest'
group_id => 'logstash-test'
}
}
filter {
date {
match => ["[Kafka][EventTimestamp]", "yyyy-MM-dd-HH:mm:ss.SSSSSS"]
timezone => "Asia/Kolkata"
}
if [org_name] == "***gg-apic-uat" {
if [status_code] == "200 OK" {
mutate {
add_field => {"[Kafka][EventType]" => "LOG"}
}
} else {
mutate {
add_field => {"[Kafka][EventType]" => "ERROR"}
}
}
ruby {
code => "event.set('datetime2', DateTime.parse(event.get('[datetime]')).strftime('%Y-%m-%d-%H:%M:%S.%L'))"
}
mutate {
add_field => {"[Kafka][Channel]" => "APIConnect"}
add_field => {"[Kafka][TransactionTimeStamp]" => "%{[datetime2]}"}
add_field => {"[Kafka][TransactionID]" => "%{[transaction_id]}"}
add_field => {"[Kafka][EventTimestamp]" => "%{[datetime2]}"}
add_field => {"[Kafka][Code]" => "%{[status_code]}"}
add_field => {"[Kafka][Comoponent]" => "%{[api_name]}"}
add_field => {"[Kafka][Application]" => "%{[org_name]}"}
add_field => {"[Kafka][Node]" => "%{[uri_path]}"}
add_field => {"[Kafka][SourceIP]" => "%{[client_ip]}"}
add_field => {"[Kafka][Server]" => "%{[host]}"}
add_field => {"[Kafka][Description]" => "%{status_code}"}
}
# prune {
# whitelist_names => ["Kafka","@version ","@timestamp "]
# }
}
if ![Kafka] {
drop {}
}
if ![Kafka][EventType] {
drop{}
}
if [Kafka][Channel] == "" {
drop {}
}
mutate {
add_field => ["[@metadata ][DateSuffix]", ""]
}
ruby {
code => "event.set('[@metadata ][DateSuffix]', DateTime.now.strftime('%Y-%m-%d')) "
}
ruby {
#code => "event.set('message_size', event.get('[Kafka][OriginalPayload]').bytesize)"
code => "event.set('message_size', event.get('[Kafka]').to_s.bytesize)"
}
}
output {
if ([Kafka][Channel] == "bulk-emal" or [Kafka][Channel] == "bulk-push") {
elasticsearch {
codec => json
hosts=> "http://"
index=>"test-bulk-logs-%{[@metadata ][DateSuffix]}"
document_type => "logs"
}
}
else
{
elasticsearch {
codec => json
hosts=> "http:"
index=>"test-logs-%{[@metadata ][DateSuffix]}"
document_type => "logs"
}
}
}
staodd
(Staale)
April 16, 2019, 6:41am
2
Did you try to replace both outputs with just a file to verify that your documents look like you expect? Always good to confirm that the docs are correct before you implement the if-then clauses.
1 Like
Hi sir,
Thanks for quick reply,
the else loop in oouput is getting executed and test-log-* index is getting created
but the if loop is simply ignored by logstash and not throwing any error or exception
Your guidance will be of great help as I am stuck here since 4 days.
Thanks and regards,
Nandita Rasam.
staodd
(Staale)
April 16, 2019, 10:56am
4
I dont think the IF loop is ignored, but maybe the conditions are not met.. Could you paste an anonymized version of a document so we get to see a real example of one.
1 Like
Hello Team,
Thanks for your help.
Issue is resolved.
Solution:
I checked kibana json and gave proper json fields in if condition.
Please find the solution here for future reference.
input {
kafka {
codec => 'json'
bootstrap_servers =>['xx.xxx.xxx.xx:9092']
topics => 'test-logs'
auto_offset_reset => 'earliest'
group_id => 'logstash-XXXX'
}
}
filter {
date {
match => ["[Kafka][EventTimestamp]", "yyyy-MM-dd-HH:mm:ss.SSSSSS"]
timezone => "Asia/Kolkata"
}
if [org_name] == "XXX-XXXXX-XX" {
if [status_code] == "200 OK" {
mutate {
add_field => {"[Kafka][EventType]" => "LOG"}
}
} else {
mutate {
add_field => {"[Kafka][EventType]" => "ERROR"}
}
}
ruby {
code => "event.set('datetime2', DateTime.parse(event.get('[datetime]')).strftime('%Y-%m-%d-%H:%M:%S.%L'))"
}
mutate {
add_field => {"[Kafka][Channel]" => "APIConnect"}
add_field => {"[Kafka][TransactionTimeStamp]" => "%{[datetime2]}"}
add_field => {"[Kafka][TransactionID]" => "%{[transaction_id]}"}
add_field => {"[Kafka][EventTimestamp]" => "%{[datetime2]}"}
add_field => {"[Kafka][Code]" => "%{[status_code]}"}
add_field => {"[Kafka][Comoponent]" => "%{[api_name]}"}
add_field => {"[Kafka][Application]" => "%{[org_name]}"}
add_field => {"[Kafka][Node]" => "%{[uri_path]}"}
add_field => {"[Kafka][SourceIP]" => "%{[client_ip]}"}
add_field => {"[Kafka][Server]" => "%{[host]}"}
add_field => {"[Kafka][Description]" => "%{status_code}"}
}
}
if ![Kafka] {
drop {}
}
if ![Kafka][EventType] {
drop{}
}
if [Kafka][Channel] == "" {
drop {}
}
mutate {
add_field => ["[@metadata][DateSuffix]", ""]
}
ruby {
code => "event.set('[@metadata][DateSuffix]', DateTime.now.strftime('%Y-%m-%d')) "
}
ruby {
code => "event.set('message_size', event.get('[Kafka]').to_s.bytesize)"
}
}
output {
if ([Kafka][SubComponent]=="PPPPP" or [Kafka][SubComponent]=="QQQQ" or [Kafka][SubComponent]=="RRRR") {
elasticsearch {
codec => json
hosts=> "http://xx.xxx.xxx.xx:9200 "
index=>"test-logs-%{[@metadata ][DateSuffix]}"
document_type => "logs"
}
}
else {
elasticsearch {
codec => json
hosts=> "http://xx.xx.xxx.xx:9200 "
index=>"rest-logs-%{[@metadata ][DateSuffix]}"
document_type => "logs"
}
}
stdout {
codec => rubydebug
}
}input {
kafka {
codec => 'json'
bootstrap_servers =>['xx.xxx.xxx.xx:9092']
topics => 'test-logs'
auto_offset_reset => 'earliest'
group_id => 'logstash-XXXX'
}
}
filter {
date {
match => ["[Kafka][EventTimestamp]", "yyyy-MM-dd-HH:mm:ss.SSSSSS"]
timezone => "Asia/Kolkata"
}
if [org_name] == "XXX-XXXXX-XX" {
if [status_code] == "200 OK" {
mutate {
add_field => {"[Kafka][EventType]" => "LOG"}
}
} else {
mutate {
add_field => {"[Kafka][EventType]" => "ERROR"}
}
}
ruby {
code => "event.set('datetime2', DateTime.parse(event.get('[datetime]')).strftime('%Y-%m-%d-%H:%M:%S.%L'))"
}
mutate {
add_field => {"[Kafka][Channel]" => "APIConnect"}
add_field => {"[Kafka][TransactionTimeStamp]" => "%{[datetime2]}"}
add_field => {"[Kafka][TransactionID]" => "%{[transaction_id]}"}
add_field => {"[Kafka][EventTimestamp]" => "%{[datetime2]}"}
add_field => {"[Kafka][Code]" => "%{[status_code]}"}
add_field => {"[Kafka][Comoponent]" => "%{[api_name]}"}
add_field => {"[Kafka][Application]" => "%{[org_name]}"}
add_field => {"[Kafka][Node]" => "%{[uri_path]}"}
add_field => {"[Kafka][SourceIP]" => "%{[client_ip]}"}
add_field => {"[Kafka][Server]" => "%{[host]}"}
add_field => {"[Kafka][Description]" => "%{status_code}"}
}
}
if ![Kafka] {
drop {}
}
if ![Kafka][EventType] {
drop{}
}
if [Kafka][Channel] == "" {
drop {}
}
mutate {
add_field => ["[@metadata][DateSuffix]", ""]
}
ruby {
code => "event.set('[@metadata][DateSuffix]', DateTime.now.strftime('%Y-%m-%d')) "
}
ruby {
code => "event.set('message_size', event.get('[Kafka]').to_s.bytesize)"
}
}
output {
if ([Kafka][SubComponent]=="PPPPP" or [Kafka][SubComponent]=="QQQQ" or [Kafka][SubComponent]=="RRRR") {
elasticsearch {
codec => json
hosts=> "http://xx.xxx.xxx.xx:9200 "
index=>"test-logs-%{[@metadata ][DateSuffix]}"
document_type => "logs"
}
}
else {
elasticsearch {
codec => json
hosts=> "http://xx.xx.xxx.xx:9200 "
index=>"rest-logs-%{[@metadata ][DateSuffix]}"
document_type => "logs"
}
}
stdout {
codec => rubydebug
}
}
Thanks and Regards,
Nandita Rasam.
system
(system)
Closed
May 15, 2019, 5:59am
6
This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.