Logstash pipeline grok issue with regex

Thanks for your input, however that's not the issue , we're inspecting the data only for these specific indices , like -connect- and no data is streaming to those indices after adding that grok ...

Thanks for your input, however that's not the issue , we're inspecting the data only for these specific indices , like -connect- and no data is streaming to those indices after adding that grok .... .

I had some issues with your grok, but regardless those issues I was not able to replicate the case where you do not get a message at the output stage, the message I tried always arrived at the output stage.

The issue I have was with this grok:

        grok {
            match => { "message" => '\[(?<kafkaTime>\d{4}-\d{2}-\d{2}\s\d{2}:\d{2}:\d{2},\d{3})\]\s+%{LOGLEVEL:log.Level}\s(?<messagePart>((.|\n)*))\s(?<class>([\w]+[\.])+[\w]+Exception)'}
        }  

This grok was timing out, I do not know if this could be the reason that they are not arriving at the output stage because I tested using the sample messages you shared, but maybe with some volume this could be the issue.

My suggestion to help with the troubleshoot would be to change this grok to something like this:

grok {
    match => { "message" => '^\[%{TIMESTAMP_ISO8601:kafkaTime}\]%{SPACE}%{LOGLEVEL:log.Level}%{SPACE}%{DATA:messagePart}:%{SPACE}%{GREEDYDATA:restOfMsg}'}
}

For the message:

"[2022-07-03 13:46:54,477] ERROR Something happened during dequeue event, EventQueueManager.dequeueEvent() : (io.confluent.kafka.connect.IPMSourceConnector.util.EventQueueManager:90)\njava.sql.SQLException: ORA-01403: no data found\nORA-06512: at \"BIZ_EVENT_EXT_HANDLER\", line 88\nORA-06512: at \"EVENT_KAFKA_CONNECT_PCK\", line 94\nORA-06512: at line 1\n\n\tat oracle.jdbc.driver.T4CTTIoer11.processError(T4CTTIoer11.java:628)\n\tat oracle.jdbc.driver.T4CTTIoer11.processError(T4CTTIoer11.java:562)\n\tat oracle.jdbc.driver.T4C8Oall.processError(T4C8Oall.java:1145)\n\tat oracle.jdbc.driver.T4CTTIfun.receive(T4CTTIfun.java:726)\n\tat oracle.jdbc.driver.T4CTTIfun.doRPC(T4CTTIfun.java:291)\n\tat oracle.jdbc.driver.T4C8Oall.doOALL(T4C8Oall.java:492)\n\tat oracle.jdbc.driver.T4CCallableStatement.doOall8(T4CCallableStatement.java:144)\n\tat oracle.jdbc.driver.T4CCallableStatement.executeForRows(T4CCallableStatement.java:1034)\n\tat oracle.jdbc.driver.OracleStatement.executeSQLStatement(OracleStatement.java:1507)\n\tat oracle.jdbc.driver.OracleStatement.doExecuteWithTimeout(OracleStatement.java:1287)\n\tat oracle.jdbc.driver.OraclePreparedStatement.executeInternal(OraclePreparedStatement.java:3735)\n\tat oracle.jdbc.driver.OraclePreparedStatement.execute(OraclePreparedStatement.java:3933)\n\tat oracle.jdbc.driver.OracleCallableStatement.execute(OracleCallableStatement.java:4279)\n\tat oracle.jdbc.driver.OraclePreparedStatementWrapper.execute(OraclePreparedStatementWrapper.java:1010)\n\tat io.confluent.kafka.connect.IPMSourceConnector.util.EventQueueManager.dequeueEvent(EventQueueManager.java:85)\n\tat io.confluent.kafka.connect.IPMSourceConnector.QueueQueryRunner.removeCurrentEventFromQueueToLog(QueueQueryRunner.java:234)\n\tat io.confluent.kafka.connect.IPMSourceConnector.QueueQueryRunner.maybeStartQuery(QueueQueryRunner.java:223)\n\tat io.confluent.kafka.connect.IPMSourceConnector.JdbcQSourceTask.pollResults(JdbcQSourceTask.java:169)\n\tat io.confluent.kafka.connect.IPMSourceConnector.JdbcQSourceTask.poll(JdbcQSourceTask.java:138)\n\tat org.apache.kafka.connect.runtime.WorkerSourceTask.poll(WorkerSourceTask.java:308)\n\tat org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:263)\n\tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:199)\n\tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:254)\n\tat java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)\n\tat java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)\n\tat java.base/java.lang.Thread.run(Thread.java:829)\nCaused by: Error : 1403, Position : 0, Sql = BEGIN :1 := IPM.biz_event_kafka_connect_pck.dequeue_event(:2 ,:3 ,:4 ,:5 ,:6 ,:7 ); END;, OriginalSql = { ? = call IPM.biz_event_kafka_connect_pck.dequeue_event(?,?,?,?,?,?)}, Error Msg = ORA-01403: no data found\nORA-06512: at \"BIZ_EVENT_EXT_HANDLER\", line 88\nORA-06512: at \"KAFKA_CONNECT_PCK\", line 94\nORA-06512: at line 1\n\n\tat oracle.jdbc.driver.T4CTTIoer11.processError(T4CTTIoer11.java:632)\n\t... 27 more"

You will have messagePart and restOfMsg as follows:

messagePart:

Something happened during dequeue event, EventQueueManager.dequeueEvent() 

restOfMsg

(io.confluent.kafka.connect.IPMSourceConnector.util.EventQueueManager:90)\njava.sql.SQLException: ORA-01403: no data found\nORA-06512: at \"BIZ_EVENT_EXT_HANDLER\", line 88\nORA-06512: at \"EVENT_KAFKA_CONNECT_PCK\", line 94\nORA-06512: at line 1\n\n\tat oracle.jdbc.driver.T4CTTIoer11.processError(T4CTTIoer11.java:628)\n\tat oracle.jdbc.driver.T4CTTIoer11.processError(T4CTTIoer11.java:562)\n\tat oracle.jdbc.driver.T4C8Oall.processError(T4C8Oall.java:1145)\n\tat oracle.jdbc.driver.T4CTTIfun.receive(T4CTTIfun.java:726)\n\tat oracle.jdbc.driver.T4CTTIfun.doRPC(T4CTTIfun.java:291)\n\tat oracle.jdbc.driver.T4C8Oall.doOALL(T4C8Oall.java:492)\n\tat oracle.jdbc.driver.T4CCallableStatement.doOall8(T4CCallableStatement.java:144)\n\tat oracle.jdbc.driver.T4CCallableStatement.executeForRows(T4CCallableStatement.java:1034)\n\tat oracle.jdbc.driver.OracleStatement.executeSQLStatement(OracleStatement.java:1507)\n\tat oracle.jdbc.driver.OracleStatement.doExecuteWithTimeout(OracleStatement.java:1287)\n\tat oracle.jdbc.driver.OraclePreparedStatement.executeInternal(OraclePreparedStatement.java:3735)\n\tat oracle.jdbc.driver.OraclePreparedStatement.execute(OraclePreparedStatement.java:3933)\n\tat oracle.jdbc.driver.OracleCallableStatement.execute(OracleCallableStatement.java:4279)\n\tat oracle.jdbc.driver.OraclePreparedStatementWrapper.execute(OraclePreparedStatementWrapper.java:1010)\n\tat io.confluent.kafka.connect.IPMSourceConnector.util.EventQueueManager.dequeueEvent(EventQueueManager.java:85)\n\tat io.confluent.kafka.connect.IPMSourceConnector.QueueQueryRunner.removeCurrentEventFromQueueToLog(QueueQueryRunner.java:234)\n\tat io.confluent.kafka.connect.IPMSourceConnector.QueueQueryRunner.maybeStartQuery(QueueQueryRunner.java:223)\n\tat io.confluent.kafka.connect.IPMSourceConnector.JdbcQSourceTask.pollResults(JdbcQSourceTask.java:169)\n\tat io.confluent.kafka.connect.IPMSourceConnector.JdbcQSourceTask.poll(JdbcQSourceTask.java:138)\n\tat org.apache.kafka.connect.runtime.WorkerSourceTask.poll(WorkerSourceTask.java:308)\n\tat org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:263)\n\tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:199)\n\tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:254)\n\tat java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)\n\tat java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)\n\tat java.base/java.lang.Thread.run(Thread.java:829)\nCaused by: Error : 1403, Position : 0, Sql = BEGIN :1 := IPM.biz_event_kafka_connect_pck.dequeue_event(:2 ,:3 ,:4 ,:5 ,:6 ,:7 ); END;, OriginalSql = { ? = call IPM.biz_event_kafka_connect_pck.dequeue_event(?,?,?,?,?,?)}, Error Msg = ORA-01403: no data found\nORA-06512: at \"BIZ_EVENT_EXT_HANDLER\", line 88\nORA-06512: at \"KAFKA_CONNECT_PCK\", line 94\nORA-06512: at line 1\n\n\tat oracle.jdbc.driver.T4CTTIoer11.processError(T4CTTIoer11.java:632)\n\t... 27 more

Then you could analyze the types of message in the restOfMsg field to try to build a new grok for this field to get the class field from it.

Hi Leandro ,
thanks for your replies ,
you probably close to the actual issue , because - yes , the issue is with that grok , and maybe it's timing out for some reason on large data scale ...
Have consulted about it with Elasticsearch support and they have found a grok pattern to replace that pattern which actually fixed the issue ,

They replaced -

\[(?<kafkaTime>\d{4}-\d{2}-\d{2}\s\d{2}:\d{2}:\d{2},\d{3})\]\s+%{LOGLEVEL:log.Level}\s(?<messagePart>((.|\n)*))\s(?<class>([\w]+[\.])+[\w]+Exception)

With -

\[(?<kafkaTime>\d{4}-\d{2}-\d{2}\s\d{2}:\d{2}:\d{2},\d{3})\]\s+(?<LOGLEVEL>[a-zA-Z]+)\s+(?<messagePart>([a-zA-Z0-9() :,.]*))\s+(?<class>[a-zA-Z.]+Exception)

That seems to have fixed the issue ,
they were able to get some indication from the log -

org.logstash.execution.ShutdownWatcherExt and stalled threads, the pipeline needs to be restarted but there’s a filter which might be still running, consuming the internal queue in memory.
As the current call shows grok-pure.rb:182:in match - as suggests it might the pathological regex against a long or complex line.
This confirms that your grok is probably the cause

Thanks

Hi ,
In continue to the thread - we were able to fix the initial issue by adjusting the grok pattern (the initial pattern is performance heavy so it seems , and that caused the entries not to be processed correctly and the pipeline to stop working)

so we fixed the issue , however it doesn't process correctly all kind of entries (able to detect that in a grok debugger) ...

following is the example -

The grok that didn't work -
\[(?<kafkaTime>\d{4}-\d{2}-\d{2}\s\d{2}:\d{2}:\d{2},\d{3})\]\s+%{LOGLEVEL:log.Level}\s(?<messagePart>((.|\n)*))\s(?<class>([\w]+[\.])+[\w]+Exception)

The grok that works -
\[(?<kafkaTime>\d{4}-\d{2}-\d{2}\s\d{2}:\d{2}:\d{2},\d{3})\]\s+(?<LOGLEVEL>[a-zA-Z]+)\s+(?<messagePart>([a-zA-Z0-9() :,.]*))\s+(?<class>[a-zA-Z.]+Exception)

Example entry that works correctly with the new grok pattern -

[2022-06-27 17:14:10,531] ERROR Something happened during getting an EventId, EventQueueManager.getEventId() : (io.confluent.kafka.connect.IPMSourceConnector.util.EventQueueManager:48)
java.sql.SQLException: ORA-01403: no data found
ORA-06512: at "IPM.BIZ_EVENT_KAFKA_CONNECT_PCK", line 78
ORA-06512: at line 1

Example entry that doesn't work with the new grok pattern -

[2022-06-27 14:38:17,707] ERROR Failed to run query for table io.confluent.kafka.connect.XXXX.QueueQueryRunner@1ff9d40b: {} (io.confluent.kafka.connect.XXXX.JdbcQSourceConnector:183)
java.sql.SQLException: ORA-01403: no data found
ORA-06512: at "XXX.PK_BIZ_", line 88
ORA-06512: at "XXX.BIZ_EVENT_KAFKA_CONNECT_PCK", line 94
ORA-06512: at line 1

And -

[2022-07-20 09:33:56,020] ERROR WorkerSinkTask{id=ElasticsearchSinkConnectorConnector_0-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:206)
org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
	at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:220)
	at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:142)
Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Unauthorized; error code: 401
	at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:297)
	at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:367)
	at io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:836)
	at io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:809)
	at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getSchemaByIdFromRegistry(CachedSchemaRegistryClient.java:277)
	at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getSchemaBySubjectAndId(CachedSchemaRegistryClient.java:406)
	at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer$DeserializationContext.schemaForDeserialize(AbstractKafkaAvroDeserializer.java:349)
	... 21 more

Can you find why it doesn't process correctly at the grok debugger ?

Thanks

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.