Hi ,
we are ingesting Kafka logs from on-premise servers running Kafka Connect and Kafka Rest-Proxy ,
We have 3 Logstash Windows servers ...
we're using the built-in Dashboard you supply with the Filebeat Kafka module which requires some fields such as - log.level and kafka.log.trace.class
we have all the needed in pipeline, index template etc however there is some issue processing in the pipeline that getting all the entries and populating these fields ...
we are using grok to parse those fields and the log.level we are able to parse correctly , however when we use the log.level field looking for errors and then try to parse the kafka.log.trace.class field it's failing ...
actually - no failure , simply no events are being processed ,
in details - if you look at the pipeline we have you can find out we're able to get the log.level data ,
after that we're trying to extract the kafka.log.trace.class data ...
if we remove the following part highlighted of extracting kafka.log.trace.class using grok then the data is streamed correctly ,
the grok with the issue is -
grok {
**match => { "message" => '[(?\d{4}-\d{2}-\d{2}\s\d{2}:\d{2}:\d{2},\d{3})]\s+%{LOGLEVEL:log.Level}\s(?((.|\n)))\s(?([\w]+[.])+[\w]+Exception)'}*
}**
if we add it back - no data is being streamed and looking at the discover page it shows that no data is being saved to Elasticsearch.
Additionally - no issue on the logs of Logstash and no grokParseFailure tag is being added ,
we also tried to reset the Logstash service and that doesn't work
When removing that grok from the pipeline it resumes working (saving the log.level field) , and when adding it back it stop streaming data and no data is shown in Discover page , no exceptions ....
one more comment - in order to rule out we added a different grok line in the same line discussed here and it's working , so specifically the grok like which causing the data is -
grok {
match => { "message" => '[(?\d{4}-\d{2}-\d{2}\s\d{2}:\d{2}:\d{2},\d{3})]\s+%{LOGLEVEL:log.Level}\s(?((.|\n)*))\s(?([\w]+[.])+[\w]+Exception)'}
}
when we added something else like -
grok {
match => { "message" => '[(?\d{4}-\d{2}-\d{2}\s\d{2}:\d{2}:\d{2},\d{3})]\s+%{LOGLEVEL:log.Level}'}
}
then - it's working correctly
To explain - we tested and that grok is valid and acceptable (we also used it with some other http input with the same log structure and it worked correctly)
we verified it in a grok pattern validator and it works correctly
A sample log entry is -
[2022-04-28 11:52:24,477] ERROR Failed to run query for table io.confluent.kafka.connect.IPMSourceConnector.QueueQueryRunner@6e38b3c8: {} (io.confluent.kafka.connect.IPMSourceConnector.JdbcQSourceConnector:183)
java.sql.SQLException: ORA-01403: no data found
ORA-06512: at "IPM.PK_BIZ_EVENT_EXT_HANDLER", line 88
ORA-06512: at "IPM.BIZ_EVENT_KAFKA_CONNECT_PCK", line 94
ORA-06512: at line 1
at oracle.jdbc.driver.T4CTTIoer11.processError(T4CTTIoer11.java:628)
at oracle.jdbc.driver.T4CTTIoer11.processError(T4CTTIoer11.java:562)
at oracle.jdbc.driver.T4C8Oall.processError(T4C8Oall.java:1145)
at oracle.jdbc.driver.T4CTTIfun.receive(T4CTTIfun.java:726)
at oracle.jdbc.driver.T4CTTIfun.doRPC(T4CTTIfun.java:291)
at oracle.jdbc.driver.T4C8Oall.doOALL(T4C8Oall.java:492)
at oracle.jdbc.driver.T4CCallableStatement.doOall8(T4CCallableStatement.java:144)
at oracle.jdbc.driver.T4CCallableStatement.executeForRows(T4CCallableStatement.java:1034)
at oracle.jdbc.driver.OracleStatement.executeSQLStatement(OracleStatement.java:1507)
at oracle.jdbc.driver.OracleStatement.doExecuteWithTimeout(OracleStatement.java:1287)
at oracle.jdbc.driver.OraclePreparedStatement.executeInternal(OraclePreparedStatement.java:3735)
at oracle.jdbc.driver.OraclePreparedStatement.execute(OraclePreparedStatement.java:3933)
at oracle.jdbc.driver.OracleCallableStatement.execute(OracleCallableStatement.java:4279)
at oracle.jdbc.driver.OraclePreparedStatementWrapper.execute(OraclePreparedStatementWrapper.java:1010)
at io.confluent.kafka.connect.IPMSourceConnector.util.EventQueueManager.dequeueEvent(EventQueueManager.java:85)
at io.confluent.kafka.connect.IPMSourceConnector.QueueQueryRunner.removeCurrentEventFromQueueToLog(QueueQueryRunner.java:243)
at io.confluent.kafka.connect.IPMSourceConnector.QueueQueryRunner.maybeStartQuery(QueueQueryRunner.java:232)
at io.confluent.kafka.connect.IPMSourceConnector.JdbcQSourceTask.pollResults(JdbcQSourceTask.java:169)
at io.confluent.kafka.connect.IPMSourceConnector.JdbcQSourceTask.poll(JdbcQSourceTask.java:138)
at org.apache.kafka.connect.runtime.WorkerSourceTask.poll(WorkerSourceTask.java:308)
at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:263)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:199)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:254)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: Error : 1403, Position : 0, Sql = BEGIN :1 := IPM.biz_event_kafka_connect_pck.dequeue_event(:2 ,:3 ,:4 ,:5 ,:6 ,:7 ); END;, OriginalSql = { ? = call IPM.biz_event_kafka_connect_pck.dequeue_event(?,?,?,?,?,?)}, Error Msg = ORA-01403: no data found
ORA-06512: at "IPM.PK_BIZ_EVENT_EXT_HANDLER", line 88
ORA-06512: at "IPM.BIZ_EVENT_KAFKA_CONNECT_PCK", line 94
ORA-06512: at line 1
at oracle.jdbc.driver.T4CTTIoer11.processError(T4CTTIoer11.java:632)
... 27 more
The pipeline filter is -
filter
{
if ("kafka-rest" in [log][file][path]) {
mutate
{
add_field => { "index_name" => "zim-kafka-rest-proxy-dev-logs" }
add_tag => [ "daily" ]
}
}
else if ("kafka-dev1" in [log][file][path]) {
mutate
{
add_field => { "index_name" => "zim-kafka-connect-dev1-logs" }
add_tag => [ "daily" ]
}
}
else if ("kafka" in [log][file][path]) {
mutate
{
add_field => { "index_name" => "zim-kafka-connect-dev0-logs" }
add_tag => [ "daily" ]
}
}
grok {
match => { "message" => '\[(?<kafkaTime>\d{4}-\d{2}-\d{2}\s\d{2}:\d{2}:\d{2},\d{3})\]\s+%{LOGLEVEL:log.level}'}
}
if ([log.level] == "ERROR")
{
mutate
{
add_tag => [ "err_entry1" ]
}
#grok {
# match => { "message" => '\[(?<kafkaTime>\d{4}-\d{2}-\d{2}\s\d{2}:\d{2}:\d{2},\d{3})\]\s+%{LOGLEVEL:log.Level}'}
#}
grok {
match => { "message" => '\[(?<kafkaTime>\d{4}-\d{2}-\d{2}\s\d{2}:\d{2}:\d{2},\d{3})\]\s+%{LOGLEVEL:log.Level}\s(?<messagePart>((.|\n)*))\s(?<class>([\w]+[\.])+[\w]+Exception)'}
}
mutate
{
add_tag => [ "err_entry2" ]
}
}
}
Can you explain why it stop working when adding that grok or how to get the kafka.log.trace.class correctly ?
Thanks