Grok issue in Logstash input - data from Kafka logs using Filebeat Kafka module

Hi ,
we are ingesting Kafka logs from on-premise servers running Kafka Connect and Kafka Rest-Proxy ,
We have 3 Logstash Windows servers ...
we're using the built-in Dashboard you supply with the Filebeat Kafka module which requires some fields such as - log.level and kafka.log.trace.class
we have all the needed in pipeline, index template etc however there is some issue processing in the pipeline that getting all the entries and populating these fields ...
we are using grok to parse those fields and the log.level we are able to parse correctly , however when we use the log.level field looking for errors and then try to parse the kafka.log.trace.class field it's failing ...
actually - no failure , simply no events are being processed ,

in details - if you look at the pipeline we have you can find out we're able to get the log.level data ,
after that we're trying to extract the kafka.log.trace.class data ...
if we remove the following part highlighted of extracting kafka.log.trace.class using grok then the data is streamed correctly ,

the grok with the issue is -
grok {
**match => { "message" => '[(?\d{4}-\d{2}-\d{2}\s\d{2}:\d{2}:\d{2},\d{3})]\s+%{LOGLEVEL:log.Level}\s(?((.|\n)
))\s(?([\w]+[.])+[\w]+Exception)'}
*
}**

if we add it back - no data is being streamed and looking at the discover page it shows that no data is being saved to Elasticsearch.
Additionally - no issue on the logs of Logstash and no grokParseFailure tag is being added ,
we also tried to reset the Logstash service and that doesn't work

When removing that grok from the pipeline it resumes working (saving the log.level field) , and when adding it back it stop streaming data and no data is shown in Discover page , no exceptions ....

one more comment - in order to rule out we added a different grok line in the same line discussed here and it's working , so specifically the grok like which causing the data is -
grok {
match => { "message" => '[(?\d{4}-\d{2}-\d{2}\s\d{2}:\d{2}:\d{2},\d{3})]\s+%{LOGLEVEL:log.Level}\s(?((.|\n)*))\s(?([\w]+[.])+[\w]+Exception)'}
}

when we added something else like -
grok {
match => { "message" => '[(?\d{4}-\d{2}-\d{2}\s\d{2}:\d{2}:\d{2},\d{3})]\s+%{LOGLEVEL:log.Level}'}
}
then - it's working correctly

To explain - we tested and that grok is valid and acceptable (we also used it with some other http input with the same log structure and it worked correctly)
we verified it in a grok pattern validator and it works correctly

A sample log entry is -

[2022-04-28 11:52:24,477] ERROR Failed to run query for table io.confluent.kafka.connect.IPMSourceConnector.QueueQueryRunner@6e38b3c8: {} (io.confluent.kafka.connect.IPMSourceConnector.JdbcQSourceConnector:183)
java.sql.SQLException: ORA-01403: no data found
ORA-06512: at "IPM.PK_BIZ_EVENT_EXT_HANDLER", line 88
ORA-06512: at "IPM.BIZ_EVENT_KAFKA_CONNECT_PCK", line 94
ORA-06512: at line 1

        at oracle.jdbc.driver.T4CTTIoer11.processError(T4CTTIoer11.java:628)
        at oracle.jdbc.driver.T4CTTIoer11.processError(T4CTTIoer11.java:562)
        at oracle.jdbc.driver.T4C8Oall.processError(T4C8Oall.java:1145)
        at oracle.jdbc.driver.T4CTTIfun.receive(T4CTTIfun.java:726)
        at oracle.jdbc.driver.T4CTTIfun.doRPC(T4CTTIfun.java:291)
        at oracle.jdbc.driver.T4C8Oall.doOALL(T4C8Oall.java:492)
        at oracle.jdbc.driver.T4CCallableStatement.doOall8(T4CCallableStatement.java:144)
        at oracle.jdbc.driver.T4CCallableStatement.executeForRows(T4CCallableStatement.java:1034)
        at oracle.jdbc.driver.OracleStatement.executeSQLStatement(OracleStatement.java:1507)
        at oracle.jdbc.driver.OracleStatement.doExecuteWithTimeout(OracleStatement.java:1287)
        at oracle.jdbc.driver.OraclePreparedStatement.executeInternal(OraclePreparedStatement.java:3735)
        at oracle.jdbc.driver.OraclePreparedStatement.execute(OraclePreparedStatement.java:3933)
        at oracle.jdbc.driver.OracleCallableStatement.execute(OracleCallableStatement.java:4279)
        at oracle.jdbc.driver.OraclePreparedStatementWrapper.execute(OraclePreparedStatementWrapper.java:1010)
        at io.confluent.kafka.connect.IPMSourceConnector.util.EventQueueManager.dequeueEvent(EventQueueManager.java:85)
        at io.confluent.kafka.connect.IPMSourceConnector.QueueQueryRunner.removeCurrentEventFromQueueToLog(QueueQueryRunner.java:243)
        at io.confluent.kafka.connect.IPMSourceConnector.QueueQueryRunner.maybeStartQuery(QueueQueryRunner.java:232)
        at io.confluent.kafka.connect.IPMSourceConnector.JdbcQSourceTask.pollResults(JdbcQSourceTask.java:169)
        at io.confluent.kafka.connect.IPMSourceConnector.JdbcQSourceTask.poll(JdbcQSourceTask.java:138)
        at org.apache.kafka.connect.runtime.WorkerSourceTask.poll(WorkerSourceTask.java:308)
        at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:263)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:199)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:254)
        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: Error : 1403, Position : 0, Sql = BEGIN :1 := IPM.biz_event_kafka_connect_pck.dequeue_event(:2 ,:3 ,:4 ,:5 ,:6 ,:7 ); END;, OriginalSql = { ? = call IPM.biz_event_kafka_connect_pck.dequeue_event(?,?,?,?,?,?)}, Error Msg = ORA-01403: no data found
ORA-06512: at "IPM.PK_BIZ_EVENT_EXT_HANDLER", line 88
ORA-06512: at "IPM.BIZ_EVENT_KAFKA_CONNECT_PCK", line 94
ORA-06512: at line 1

        at oracle.jdbc.driver.T4CTTIoer11.processError(T4CTTIoer11.java:632)
        ... 27 more

The pipeline filter is -

filter
{   
    if ("kafka-rest" in [log][file][path]) {
        mutate
        {
            add_field => { "index_name" => "zim-kafka-rest-proxy-dev-logs" }
            add_tag => [ "daily" ]
        }
    }
    else if ("kafka-dev1" in [log][file][path]) {
        mutate
        {
            add_field => { "index_name" => "zim-kafka-connect-dev1-logs" }
            add_tag => [ "daily" ]
        }
    }
    else if ("kafka" in [log][file][path]) {
        mutate
        {
            add_field => { "index_name" => "zim-kafka-connect-dev0-logs" }
            add_tag => [ "daily" ]
        }
    }

    grok {
         match => { "message" => '\[(?<kafkaTime>\d{4}-\d{2}-\d{2}\s\d{2}:\d{2}:\d{2},\d{3})\]\s+%{LOGLEVEL:log.level}'}
    }


    if ([log.level] == "ERROR") 
    {
        mutate
        {
            add_tag => [ "err_entry1" ]
        }
        
        #grok {
        #    match => { "message" => '\[(?<kafkaTime>\d{4}-\d{2}-\d{2}\s\d{2}:\d{2}:\d{2},\d{3})\]\s+%{LOGLEVEL:log.Level}'}
        #}

        grok {
            match => { "message" => '\[(?<kafkaTime>\d{4}-\d{2}-\d{2}\s\d{2}:\d{2}:\d{2},\d{3})\]\s+%{LOGLEVEL:log.Level}\s(?<messagePart>((.|\n)*))\s(?<class>([\w]+[\.])+[\w]+Exception)'}
        }    

        mutate
        {
            add_tag => [ "err_entry2" ]
        }
    }
 

}

Can you explain why it stop working when adding that grok or how to get the kafka.log.trace.class correctly ?

Thanks

Hi , appreciate if someone can give some hint here
thanks

Please edit your post, select the text of the grok filters, and click on </> in the toolbar above the edit panel. This will change

grok { match => { foo => "(?.foobarbaz)" }

into

grok { match => { foo => "(?<custom>.*foo*_bar_**baz**)" }

The fact that parts of your grok expressions are italic or bold suggests that some characters are being consumed as formatting, and that means we cannot be sure what you grok expressions are, so we cannot possibly diagnose a problem.

Hi , since not able to edit that post then re-posting here ,
the snippet with the issue is -

if ([log.level] == "ERROR")
{
mutate
{
add_tag => [ "err_entry1" ]
}

grok {
            match => { "message" => '\[(?<kafkaTime>\d{4}-\d{2}-\d{2}\s\d{2}:\d{2}:\d{2},\d{3})\]\s+%{LOGLEVEL:log.Level}'}
        }
grok {
            match => { "message" => '\[(?<kafkaTime>\d{4}-\d{2}-\d{2}\s\d{2}:\d{2}:\d{2},\d{3})\]\s+%{LOGLEVEL:log.Level}\s(?<messagePart>((.|\n)*))\s(?<class>([\w]+[\.])+[\w]+Exception)'}
        }
    mutate
    {
        add_tag => [ "err_entry2" ]
    }
}

As explained - the first grok is working with no issue , the second grok is causing the issue (in our environment it's being commented so we can view that for each entry with log.level Error - we get both tags added ,

Thanks

@Badger got any idea ?
Thanks

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.