I'm using Logstash to read data from Oracle DB & ingesting it into a Kafka topic, then reading the data from the topic & ingesting it into ES (basically Oracle -> Kafka, Kafka -> ES). The query used is capturing data for a specific timestamp which fetches 52 rows, everything works as expected till here but after inserting all the rows, Logstash starts inserting below garbage value:
2020-08-27T13:40:49.122Z %{host} 2020-08-27T13:40:48.799Z %{host} 2020-08-27T13:40:48.612Z %{host} 2020-08-27T13:40:48.323Z %{host} 2020-08-27T13:40:48.123Z %{host} 2020-08-27T13:40:47.723Z %{host} 2020-08-27T13:40:47.220Z %{host} %{message}
Configs that I'm using:
Oracle -> Kafka:
input {
jdbc {
jdbc_driver_library => "/usr/share/logstash/ojdbc8.jar"
jdbc_driver_class => "Java::oracle.jdbc.OracleDriver"
jdbc_connection_string => "jdbc:oracle:thin:@redacted"
jdbc_user => "redacted"
jdbc_password => "redacted"
tracking_column => "timestamp"
use_column_value => true
tracking_column_type => "timestamp"
jdbc_default_timezone => "Asia/Kolkata"
schedule => "*/10 * * * *"
statement_filepath => "redacted"
}
}
output {
kafka {
topic_id => "audit_data"
bootstrap_servers => "redacted"
acks => "0"
jaas_path => "/usr/share/logstash/jaas.conf"
sasl_kerberos_service_name => "kafka"
kerberos_config => "redacted"
codec => plain
security_protocol => "SASL_PLAINTEXT"
}
}
Kafka -> ES
input {
kafka{
# group_id => "logstash"
jaas_path => "/usr/share/logstash/jaas.conf"
sasl_kerberos_service_name => "kafka"
kerberos_config => "redacted"
auto_offset_reset => "latest"
topics => ["audit_data"]
codec => plain
bootstrap_servers => redacted
security_protocol => "SASL_PLAINTEXT"
# type => "syslog"
decorate_events => true
}
}
output {
#stdout { codec => "json"}
elasticsearch {
hosts => ["redacted"]
user => "redacted"
password => "redacted"
cacert => ["redacted"]
action => "index"
index => "kafka_logstash"
}
}
I checked Kafka topic data from consumer console & could see the garbage values continuously flowing in, so I removed the old data from topic(set the retention to 1000ms), used the same query & config parameters, this time directly from Oracle to ES & it worked fine without any garbage value. Below is the config I used:
input {
jdbc {
jdbc_driver_library => "/usr/share/logstash/ojdbc8.jar"
jdbc_driver_class => "Java::oracle.jdbc.OracleDriver"
jdbc_connection_string => "jdbc:oracle:thin:@redacted"
jdbc_user => "redacted"
jdbc_password => "redacted"
tracking_column => "timestamp"
use_column_value => true
tracking_column_type => "timestamp"
jdbc_default_timezone => "Asia/Kolkata"
schedule => "*/10 * * * *"
statement_filepath => "/usr/share/logstash/oracle.sql"
}
}
output {
#stdout { codec => "json"}
elasticsearch {
hosts => ["redacted"]
user => "redacted"
password => "redacted"
cacert => ["redacted"]
action => "index"
index => "test_kafka_logstash"
}
}
Please suggest how we can fix this.
Thanks!