Hello, i am using the kafka output and input plugins in logstash. The log route is like this: logstash -> kafka -> logstash (indexing) -> elasticsearch. logstash encodes additional data (such as file path, hostname, etc)to the logs output. How do i prevent logstash from adding additional data (only the raw log sent to kafka)? or can i make the additional data follow my raw log separator "|"?
Remove unnecessary fields:
mutate{ remove_field => [ "path", "hostname" ] }
If you like to add data, use join
should i add it in filter or output?
Only filter supports mutate.
What are the logstash inputs in this first logstash, the one that is sending to kafka?
If you want to send just the source message that this logstash is collecting, you could change the codec in the kafka
output.
codec => plain { format => "%{message}" }
This will make logstash outputs to kafka only the message field, this will work assuming that you are not doing any filtering or transformation in this first logstash as it is probably this logstash that is adding the fields.
The data send to the first logstash is a raw data with a "|" seperator. i want the first logstash just sending the raw data to kafka. The one that indexes the log to elasticsearch will be at the logstash (indexing)
Expanding on the answer by Rios -
In your logstash.conf
file, you can do something like
input {
kafka {
bootstrap_servers => "<your_kafka_node_1:9093>,<kafka_node_2:9093>..."
topics => ["your-topic"]
#security_protocol => "SSL"
#ssl_truststore_location => "path_to_truststore.jks"
#ssl_truststore_password => "Top@Secret"
client_id => "kafka_logstash_raw"
group_id => "kafka_logstash_raw"
consumer_threads => 2
codec => "json"
}
}
If ssl
is enabled, you can uncomment
the SSL related things.
Consumer_threads
depends on the no of partitions in kafka topic.
filter {
## PARSE DATA ##
## GROK MESSAGE FIELD DATA
grok {
match => [ "message", "(?m)%{TIMESTAMP_ISO8601:CreateDate} %{LOGLEVEL:Severity} \[%{NOTSPACE:Thread}\] %{JAVACLASS:ClassName} - \{%{GREEDYDATA:LogMsg}\}" ]
tag_on_failure => [ "_failure", "_grokparsefailure" ]
}
## IF NO FAILURES perform VALIDATIONS
if "_failure" not in [tags] {
date {
# use timestamp from the log "ISO8601" "YYYY-MM-dd HH:mm:ss,SSS"
match => [ "CreateDate", "ISO8601", "YYYY-MM-dd HH:mm:ss,SSS", "YYYY-MM-dd HH:mm:ss" ]
target => "@timestamp"
timezone => "UTC"
tag_on_failure => [ "_failure", "_timestampfailure" ]
}
<here you can look to remove the fields>
<and also split all your fields by delimiter "|">
}
}
Hope this helps. You can modify the grok
pattern to match your log line.
This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.