Logstash pipeline grok issue with regex

Hi ,
we have a Logstash pipeline , for Kafka on-premise Confluent Platform logs - shipped using Filebeat Kafka module ,
We are using a grok pattern to extract some of the entries in the data in order to use that data in some Dashboard , and the issue is -
If we enable the following grok pattern it actually cause the pipeline to stop working and processing the logs , with no indications or information on the log file.
while if we are using a more simplified grok pattern it works correctly and the logs are processing again and viewed in the Kibana ...

Both grok patterns are validated correctly in a grok debugger with a sample data so the grok is actually valid with the data of the logs being sent and only the more simplified version works ...

The more simplified grok pattern that works -

grok {
            match => { "message" => '\[(?<kafkaTime>\d{4}-\d{2}-\d{2}\s\d{2}:\d{2}:\d{2},\d{3})\]\s+%{LOGLEVEL:log.Level}'}
        }

The grok pattern that cause failure in Logstash pipeline and no data is being streamed for that pipeline -

grok {
            match => { "message" => '\[(?<kafkaTime>\d{4}-\d{2}-\d{2}\s\d{2}:\d{2}:\d{2},\d{3})\]\s+%{LOGLEVEL:log.Level}\s(?<messagePart>((.|\n)*))\s(?<class>([\w]+[\.])+[\w]+Exception)'}
        }

Sample data -

[2022-06-27 14:38:17,707] ERROR Failed to run query for table io.confluent.kafka.connect.XXXX.QueueQueryRunner@1ff9d40b: {} (io.confluent.kafka.connect.XXXX.JdbcQSourceConnector:183)
java.sql.SQLException: ORA-01403: no data found
ORA-06512: at "XXX.PK_BIZ_", line 88
ORA-06512: at "XXX.BIZ_EVENT_KAFKA_CONNECT_PCK", line 94
ORA-06512: at line 1

Again - that sample data is parsed correctly in a grok debugger with that grok pattern

Edit - wasn't able to show the code parts here as you request with </> or backtick , didn't work , however the code parts here are the 2 grok patterns ...

Thanks

1 Like

It is not clear if the sample data you shared is a single line or a multiline.

Can you try to reformat your message? Just edit, select the text and click in the </> button.

You can also create a code block using markdown putting ``` one line above and one line below the text you want to show as code.

Also, in which grok debugger did your patterns worked? Just tested in Kibana grok debugger and none of them worked.

Hi ,
was just able to edit it as you suggested , thanks ...
The entry in the log is a 1 line , and copied it here from the Kibana so maybe in the Kibana it somehow shows it as a multi line , originally it's 1 line
The grok debugger that used is - https://grokdebug.herokuapp.com
Are you saying that in the Dev Tools it's parsed differently ?
Thanks

No, the message before the edit was different, your pattern was not the same, the one you shared now works.

But I cannot replicate the issue, your message was correctly parsed for me:

{
      "kafkaTime" => "2022-06-27 14:38:17,707",
           "host" => "elk-lab",
      "log.Level" => "ERROR",
          "class" => "java.sql.SQLException",
    "messagePart" => "Failed to run query for table io.confluent.kafka.connect.XXXX.QueueQueryRunner@1ff9d40b: {} (io.confluent.kafka.connect.XXXX.JdbcQSourceConnector:183)",
        "message" => "[2022-06-27 14:38:17,707] ERROR Failed to run query for table io.confluent.kafka.connect.XXXX.QueueQueryRunner@1ff9d40b: {} (io.confluent.kafka.connect.XXXX.JdbcQSourceConnector:183) java.sql.SQLException: ORA-01403: no data found\\nORA-06512: at \"XXX.PK_BIZ_\", line 88\\nORA-06512: at \"XXX.BIZ_EVENT_KAFKA_CONNECT_PCK\", line 94\\nORA-06512: at line 1",
       "@version" => "1",
     "@timestamp" => 2022-06-27T14:23:08.101Z
}

Do you have anything in logstash logs? A grok error will not stop your data from being sent to elasticsearch, it will carry a _grokparsefailure tag, if you are not receiving any data because of this error, then there is something else wrong in your pipeline.

Please share your entire logstash pipeline and check logstash logs to see if there is some error that could give some hint.

Also, if you want the field log.level to be a nested field, like { "log": { "level" : "ERROR" } }, then your field name in the grok pattern should be [log][level] not log.level, for logstash these are different things.

The logstash grok filter sometimes parses regular expressions in a different way to the grok testing tool in Kibana. (How greedy a GREEDYDATA gets in a multiline match being one I have noticed.) herokuapp is a whole other question. It is not the same codebase, so it may not do exactly the same thing.

Hi ,
nothing in Logstash logs , once the server starts no issue in related to that pipeline.
_grokparsefailure is not relevant because there is no data at all coming to that index , as said , once activating that grok pattern data ingestion stops and the index is empty.
After removing that grok and either or not adding the simpler grok no issue , and data is sent to the index.
log.level - understood that comment , however - it's not related to the issue described here , is it ?

The pipeline -

input {
    pipeline {
         address => Kafka_DEV_filebeat_logs
    }
}
filter
{   
    if ("kafka-rest" in [log][file][path]) {
        mutate
        {
            add_field => { "index_name" => "zim-kafka-rest-proxy-dev-logs" }
            add_tag => [ "daily" ]
        }
    }
    else if ("kafka-dev1" in [log][file][path]) {
        mutate
        {
            add_field => { "index_name" => "zim-kafka-connect-dev1-logs" }
            add_tag => [ "daily" ]
        }
    }
    else if ("kafka" in [log][file][path]) {
        mutate
        {
            add_field => { "index_name" => "zim-kafka-connect-dev0-logs" }
            add_tag => [ "daily" ]
        }
    }

    grok {
         match => { "message" => '\[(?<kafkaTime>\d{4}-\d{2}-\d{2}\s\d{2}:\d{2}:\d{2},\d{3})\]\s+%{LOGLEVEL:log.level}'}
    }

 
    if ([log.level] == "ERROR") 
    {
        mutate
        {
            add_tag => [ "err_entry1" ]
        }
        
        grok {
            match => { "message" => '\[(?<kafkaTime>\d{4}-\d{2}-\d{2}\s\d{2}:\d{2}:\d{2},\d{3})\]\s+%{LOGLEVEL:log.Level}'}
        }

        #grok {
        #    match => { "message" => '\[(?<kafkaTime>\d{4}-\d{2}-\d{2}\s\d{2}:\d{2}:\d{2},\d{3})\]\s+%{LOGLEVEL:log.Level}\s(?<messagePart>((.|\n)*))\s(?<class>([\w]+[\.])+[\w]+Exception)'}
        #}    

        mutate
        {
            add_tag => [ "err_entry2" ]
        }
    }
 

}
output 
{
  #stdout {
  #  codec => rubydebug
  #}
  pipeline 
  {
    send_to => "elasticOutput"
  }

Thanks

Hi ,
I've put the grok pattern + sample data on the Kibana Grok Debugger and it worked correctly -

{
  "log": {
    "Level": "ERROR"
  },
  "messagePart": "Something happened during dequeue event,  EventQueueManager.dequeueEvent()  :  (io.confluent.kafka.connect.IPMSourceConnector.util.EventQueueManager:90)\r",
  "class": "java.sql.SQLException",
  "kafkaTime": "2022-06-27 13:34:36,588"
}

However - for some of the entries , it does result in match failure -
Provided Grok patterns do not match data in the input

Can that be the cause for such issue ?
Because as explained - as far as understood - in case of such issue there is a _grokparsefailure and the data shouldd continue to stream , and in our case here - once that grok pattern is enabled there is no data streaming ...

Thanks

You need to share your full pipeline, since your input and output are pipeline, you need to share what is your input in the Kafka_DEV_filebeat_logs and what is your output in the elasticOutput pipeline, also their filters and outputs.

To understand what could be happening it is necessary to look at your inputs, filters and outputs. Are you doing any filtering in the output? With your full pipeline it is possible to try to replicate your issue to find what is the cause.

As I said, if your grok fails, logstash will add a tag named _grokparsefailure to the document, but it will not drop the message, since you said that there are no messages with _grokparsefailure in your index, then there is something else happening in your pipeline before the output.

Also, share some entries that does not match the grok.

Hi ,
understood -
the input is -

input {
    beats {
        port => 5044
		ssl => true 
		ssl_key => 'C:\logstash\config\xxxx.pkcs8.key' 
		ssl_certificate => 'C:\logstash\config\xxxx.crt' 
		client_inactivity_timeout => 86400
    }
}
output 
{
    #if [type] == "xxxx" {
    #    if [logType] == "xxx" {
    #        pipeline {
    #            send_to => "xxx"
    #        }
    #    }
    #    else {
    #        pipeline {
    #            send_to => "osbLog"
    #        }
    #    }
    #}
    if [type] == "xxxx" {
        pipeline {
            send_to => "xxxx"
        }
    }
    else if [type] == "xxxx" {
        pipeline {
            send_to => "xxxx"
        }
    }
    else if [type] == "xxxx" {
        pipeline {
            send_to => "xxxx"
        }
    }
    else if [type] == "xxxx" {
        pipeline {
            send_to => "xxxx"
        }
    }
    else if [type] == "xxxx" {
        pipeline {
            send_to => "xxxx"
        }
    }
    else if [type] == "xxxx" {
        pipeline {
            send_to => ["xxxx","xxxx"]
            #send_to => "xxxx"
        }
    }
    else if [type] == "xxxx" {
        pipeline {
            send_to => "xxxx"
        }
    }
    else if [type] == "xxx" {
        pipeline {
            send_to => "xxxx"
        }
    }
    else if [type] == "xxx" {
        pipeline {
            send_to => ["xxxx","xxxx"]
        }
    }
    else if [event][module] == "kafka" and [agent][hostname] == "KafkaServer.xxx.xxxx" {
        pipeline {
            send_to => "Kafka_DEV_filebeat_logs"
        }
    }
    else if [event][module] == "kafka" and ([agent][hostname] == "KafkaServer1.xxx.xxxx" or [agent][hostname] == "KafkaServer2.xxx.xxxx") {
        pipeline {
            send_to => "Kafka_QA_filebeat_logs"
        }
    }
    else if [event][module] == "xxx" {
        pipeline {
            send_to => "xxxx"
        }
    }
    else {
        pipeline {
            send_to => "default"
        }
    }
}

The output is -

input {
    pipeline {
        address => elasticOutput
    }
}
output 
{
    if ("daily" in [tags]) {
        elasticsearch
        {
            hosts => ["https://xxxx:9200"]
            index => "%{index_name}-%{+yyyy.MM.dd}"
            user => "xxxx"
            password => "${xxxxx}"
        }
    } else if ("monthly" in [tags]) {
        elasticsearch
        {
            hosts => ["https://xxxx:9200"]
            index => "%{index_name}-%{+yyyy.MM}"
            user => "xxxx"
            password => "${xxxx}"
        }
    } else if ("yearly" in [tags]) {
        elasticsearch
        {
            hosts => ["https://xxxxx:9200"]
            index => "%{index_name}-%{+yyyy}"
            user => "xxxx"
            password => "${xxxx}"
        }
    }
}

That's the entire pipeline ...

Some entry data that doesn't match in the grok is -

[2022-06-27 14:38:17,700] ERROR maybeStartQuery() error : ORA-01403: no data found
ORA-06512: at "IPM.BIZ_EVENT_KAFKA_CONNECT_PCK", line 78
ORA-06512: at line 1
 (io.confluent.kafka.connect.IPMSourceConnector.QueueQueryRunner:222)

Reason it doesn't match the grok is probably because it doesn't have Exception in it ...

Thanks

I don't think the grok is the issue. In the output, if the [tags] field on the event contains "daily", "monthly", or "yearly" then you send it to elasticsearch, otherwise you discard it. In the pipeline, you sometimes add the tag [daily], sometimes not. I suspect all the events are falling through the cracks and being discarded

Thanks for your input, however that's not the issue , we're inspecting the data only for these specific indices , like -connect- and no data is streaming to those indices after adding that grok ...
and after removal of it data is streaming, thousands of records per 15 minutes, so that's not the issue.
The issue is specific for that grok pattern

That would suggest that grok is creating a field that causes elasticsearch to reject the event. Possibly it causes a mapping exception. That would show up in the logstash logs.

nothing is shown at the logs , any idea on how to debug it , or maybe changing the grok to fix it ?

If it is a mapping exception then changing the field name may get at least one event into elasticsearch

(?<XYZmessagePart>((.|\n)*))\s(?<XYZclass>([\w]+[\.])+[\w]+Exception)

Hi , tried that and that also didn't work , no entry is streamed and no indication or entry in the Logstash logs ... any other idea on how to debug it ?
thanks

No. Sorry about that...

You are filtering your output and from your pipeline it is not clear how one of the tags, daily, monthly or yearly is being added to the documents that are failing.

Also, from where does the field index_name comes from? What are the possible values of it? How are you checking the data, are you using Kibana? Did you check if for some reason you have an index that has the literal %{index_name} as part of its name? This can happen if the field index_name does not exists in a document and you won't see the data in Kibana unless you create a index pattern to look at it.

But, since you are filtering your output and after adding the grok filter you do not have anything, I think it's safe to assume that for some reason you do not have none of those tags in the tag field of your document, so it will not be sent to anywhere and simple dropped at the output stage.

To try to find the issue I think that you need first to remove the filtering in your ouput or at least add a conditional that will index events that do not have the daily, monthly or yearly tag.

You can add this in your output to see if you get some data:

else {
    elasticsearch {
        hosts => ["https://xxxxx:9200"]
        index => "troubleshoot-%{+yyyy}"
        user => "xxxx"
        password => "${xxxx}"
    }
}

Can you share an example of the source event from filebeat of one of those messages that are failing your grok so your full pipeline could be replicate?

Hi ,
the reason you don't see the index_name is because omitted it from here , basically it's added by add_field in the pipeline based on some logic ...
It does work as without the grok it's parsed correctly and we get thousands of records ,
tried to add the elasticsearch output you suggested and it doesn't write anything to that new index ,
again - it shows that the issue with the grok pattern cause the entire pipeline to stop working and not ingesting any new entries

what do you mean by - example of the source event from filebeat ?
sent an example of log entries , do you need something else ?

Thanks

From what you already shared there is nothing that would make Logstash behave this way.

As already said, a grok error will not drop an event, the event will still be present in the output stage, but it would have a tag indicating if it failed because of a parse error or maybe because of a timeout.

You are filtering your output, so this would make sense for the event to not show up in your elasticsearch if the grok failed because maybe it didn't have any of the tags you are using in the output filter, but an output without filtering should have your event.

The other explanation for an event to not get indexed, as Badger already said, is if for some reason your failing grok leads to a mapping exception, but this is logged in logstash logs and if you do not have any error in logstash logs this is really very weird, but again, it is not possible to try to replicate without the full pipeline.

My suggestion to troubleshoot this would be to add another output in your listening pipeline, the one where you redirect your beats input, add a file input to get the raw event that beats is sending.

Just edit that part and put something like this:

    else if [event][module] == "kafka" and [agent][hostname] == "KafkaServer.xxx.xxxx" {
        pipeline {
            send_to => "Kafka_DEV_filebeat_logs"
        }
        file {
                path => "/tmp/troubleshoot-discuss.log"
        }
    }

This will catch everything that should go to the Kafka_DEV_filebeat_logs pipeline, then share some events from the file, some that would match and some that you are not sure that would match.

Also share the entire Kafka_DEV_filebeat_logs and elasticOutput pipelines, do not omitt any filter. With the source event from the file output and the full pipelines it is then possible to recreate the steps your message is going through and see what may be happening.

You can also add a file output here:

else {
    elasticsearch {
        hosts => ["https://xxxxx:9200"]
        index => "troubleshoot-%{+yyyy}"
        user => "xxxx"
        password => "${xxxx}"
    }
    file {
        path => "/tmp/troubleshoot-elasticOutput.log"
    }
}

This will get anything, if it is an event that will be rejected by elasticsearch.

Hi ,
thanks for your assistance ,
so - adding a file output in the input pipeline does generate the entries in the logs , which are as they should be ... also - we could tell that without that step because , as said , the issue is only when adding / removing the grok with the issue , so the issue is not at that stage ...
When trying to add the file output in the output section of the Kafka_DEV pipeline it doesn't write anything to a file (as we could have told also , as the issue is in the grok which comes before it)

the issue is only with that grok pattern and no idea how to debug it ...

the log entries in the file are as we told -
for example -

{"message":"[2022-07-03 13:46:54,477] ERROR Something happened during dequeue event, EventQueueManager.dequeueEvent() : (io.confluent.kafka.connect.IPMSourceConnector.util.EventQueueManager:90)\njava.sql.SQLException: ORA-01403: no data found\nORA-06512: at \"BIZ_EVENT_EXT_HANDLER\", line 88\nORA-06512: at \"EVENT_KAFKA_CONNECT_PCK\", line 94\nORA-06512: at line 1\n\n\tat oracle.jdbc.driver.T4CTTIoer11.processError(T4CTTIoer11.java:628)\n\tat oracle.jdbc.driver.T4CTTIoer11.processError(T4CTTIoer11.java:562)\n\tat oracle.jdbc.driver.T4C8Oall.processError(T4C8Oall.java:1145)\n\tat oracle.jdbc.driver.T4CTTIfun.receive(T4CTTIfun.java:726)\n\tat oracle.jdbc.driver.T4CTTIfun.doRPC(T4CTTIfun.java:291)\n\tat oracle.jdbc.driver.T4C8Oall.doOALL(T4C8Oall.java:492)\n\tat oracle.jdbc.driver.T4CCallableStatement.doOall8(T4CCallableStatement.java:144)\n\tat oracle.jdbc.driver.T4CCallableStatement.executeForRows(T4CCallableStatement.java:1034)\n\tat oracle.jdbc.driver.OracleStatement.executeSQLStatement(OracleStatement.java:1507)\n\tat oracle.jdbc.driver.OracleStatement.doExecuteWithTimeout(OracleStatement.java:1287)\n\tat oracle.jdbc.driver.OraclePreparedStatement.executeInternal(OraclePreparedStatement.java:3735)\n\tat oracle.jdbc.driver.OraclePreparedStatement.execute(OraclePreparedStatement.java:3933)\n\tat oracle.jdbc.driver.OracleCallableStatement.execute(OracleCallableStatement.java:4279)\n\tat oracle.jdbc.driver.OraclePreparedStatementWrapper.execute(OraclePreparedStatementWrapper.java:1010)\n\tat io.confluent.kafka.connect.IPMSourceConnector.util.EventQueueManager.dequeueEvent(EventQueueManager.java:85)\n\tat io.confluent.kafka.connect.IPMSourceConnector.QueueQueryRunner.removeCurrentEventFromQueueToLog(QueueQueryRunner.java:234)\n\tat io.confluent.kafka.connect.IPMSourceConnector.QueueQueryRunner.maybeStartQuery(QueueQueryRunner.java:223)\n\tat io.confluent.kafka.connect.IPMSourceConnector.JdbcQSourceTask.pollResults(JdbcQSourceTask.java:169)\n\tat io.confluent.kafka.connect.IPMSourceConnector.JdbcQSourceTask.poll(JdbcQSourceTask.java:138)\n\tat org.apache.kafka.connect.runtime.WorkerSourceTask.poll(WorkerSourceTask.java:308)\n\tat org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:263)\n\tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:199)\n\tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:254)\n\tat java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)\n\tat java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)\n\tat java.base/java.lang.Thread.run(Thread.java:829)\nCaused by: Error : 1403, Position : 0, Sql = BEGIN :1 := IPM.biz_event_kafka_connect_pck.dequeue_event(:2 ,:3 ,:4 ,:5 ,:6 ,:7 ); END;, OriginalSql = { ? = call IPM.biz_event_kafka_connect_pck.dequeue_event(?,?,?,?,?,?)}, Error Msg = ORA-01403: no data found\nORA-06512: at \"BIZ_EVENT_EXT_HANDLER\", line 88\nORA-06512: at \"KAFKA_CONNECT_PCK\", line 94\nORA-06512: at line 1\n\n\tat oracle.jdbc.driver.T4CTTIoer11.processError(T4CTTIoer11.java:632)\n\t... 27 more","fileset":{"name":"log"},"@timestamp":"2022-07-03T10:46:54.494Z","@version":"1","service":{"type":"kafka"},"tags":["beats_input_codec_plain_applied"],"host":{"mac":["00:50:56:ab:1c:96"],"ip":["10.54.0.48","fe80::250:56ff:feab:1c96"],"architecture":"x86_64","name":"server1.corp","os":{"codename":"Maipo","version":"7.9 (Maipo)","platform":"rhel","family":"redhat","type":"linux","name":"Red Hat Enterprise Linux Server","kernel":"3.10.0-1160.66.1.el7.x86_64"},"containerized":false,"id":"074182ff2fef46489c42724a019bef71","hostname":"server1.corp"},"event":{"timezone":"+03:00","module":"kafka","dataset":"kafka.log"},"input":{"type":"log"},"log":{"file":{"path":"/var/log/kafka-dev1/connect.log"},"flags":["multiline"],"offset":42469192},"agent":{"ephemeral_id":"2785784a-4a9c-4ef2-9670-729c1d89cb0b","version":"7.15.1","type":"filebeat","name":"server1.corp","hostname":"server1.corp","id":"35bf0a3c-4028-4035-8dc6-03cc8bfe9669"},"ecs":{"version":"1.11.0"}}

It's a 1 line and it does add some data from the filebeat agent

another example -

{"message":"[2022-07-03 13:46:54,469] ERROR maybeStartQuery() error : ORA-16000: database or pluggable database open for read-only access\nORA-06512: at \"BIZ_EVENT_EXT_HANDLER\", line 12\nORA-06512: at \"EVENT_KAFKA_CONNECT_PCK\", line 68\nORA-06512: at line 1\n (io.confluent.kafka.connect.IPMSourceConnector.QueueQueryRunner:222)","fileset":{"name":"log"},"@timestamp":"2022-07-03T10:46:54.494Z","@version":"1","service":{"type":"kafka"},"tags":["beats_input_codec_plain_applied"],"host":{"mac":["00:50:56:ab:1c:96"],"ip":["10.54.0.48","fe80::250:56ff:feab:1c96"],"architecture":"x86_64","name":"server1.corp","os":{"codename":"Maipo","version":"7.9 (Maipo)","platform":"rhel","family":"redhat","type":"linux","name":"Red Hat Enterprise Linux Server","kernel":"3.10.0-1160.66.1.el7.x86_64"},"hostname":"server1.corp","id":"074182ff2fef46489c42724a019bef71","containerized":false},"event":{"timezone":"+03:00","module":"kafka","dataset":"kafka.log"},"input":{"type":"log"},"log":{"file":{"path":"/var/log/kafka-dev1/connect.log"},"flags":["multiline"],"offset":42468864},"agent":{"ephemeral_id":"2785784a-4a9c-4ef2-9670-729c1d89cb0b","version":"7.15.1","type":"filebeat","name":"server1.corp","hostname":"server1.corp","id":"35bf0a3c-4028-4035-8dc6-03cc8bfe9669"},"ecs":{"version":"1.11.0"}}

Thanks