NumberFormatException stops logstash from working

Hi,

We have logstash running for a while now, but yesterday something happened that stopped it from working.

[2017-01-18T10:56:16,553][DEBUG][logstash.inputs.beats    ] config LogStash::Inputs::Beats/@client_inactivity_timeout = 60
deploy@apollo:/etc/logstash$ docker logs logstash-services.1.7ilmsb0gdld6wkpd0j309g6es
[2017-01-18T10:56:25,425][DEBUG][logstash.filters.json    ] Running json filter {:event=>2017-01-18T08:16:46.209Z orion {"level":"DEBUG","message":"InvoicesWorkflow/anonymous-step-2: WaitingForExecution","log_source":"smarter.ecommerce.commons.spring.task.ServiceLogTaskRunnerPlugin","properties":{"service":{"name":"Whoop"},"workflowExecution":{"workflowId":"InvoicesWorkflow","id":"InvoicesWorkflow:regenerateInvoice:7b802029-1719-4dfc-ba06-41d45b591347","stepId":"anonymous-step-2"},"taskExecution":{"group":"InvoicesWorkflow","name":"anonymous-step-2","id":"taskRunner://smecTaskRunner/InvoicesWorkflow/anonymous-step-2/8591c1aa-72b7-4cae-a4ed-8dac1d146f49_4","createdAt":1484667736438,"status":"WaitingForExecution","endStatus":null,"priority":100,"startedAt":null,"finishedAt":null,"input":[{"accountId":527,"invoiceId":3259,"hasInvoiceNumberCounter":true,"hasChanges":true}],"lockKey":null,"output":null,"cancellation":null,"exception":null,"duration":null}},"createdAt":1484667736438}}
[2017-01-18T10:56:25,425][DEBUG][logstash.filters.json    ] Event after json filter {:event=>2017-01-18T08:16:46.209Z orion InvoicesWorkflow/anonymous-step-2: WaitingForExecution}
[2017-01-18T10:56:25,425][DEBUG][logstash.util.decorators ] filters/LogStash::Filters::Mutate: adding value to field {"field"=>"status", "value"=>["%{[taskExecution][status]}"]}
[2017-01-18T10:56:25,425][DEBUG][logstash.util.decorators ] filters/LogStash::Filters::Mutate: adding value to field {"field"=>"title", "value"=>["%{[workflowExecution][workflowId]} - %{[workflowExecution][stepId]}"]}
[2017-01-18T10:56:25,425][DEBUG][logstash.filters.mutate  ] filters/LogStash::Filters::Mutate: removing tag {:tag=>"[taskExecution][exception]"}
...
Exception in pipelineworker, the pipeline stopped processing new events, please check your filter configuration and restart Logstash.
...
Exception in thread "[main]>worker0" java.lang.NumberFormatException: For input string: "bidderId"
	at java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)
	at java.lang.Integer.parseInt(Integer.java:580)
	at java.lang.Integer.parseInt(Integer.java:615)
	at org.logstash.Accessors.fetch(Accessors.java:130)
	at org.logstash.Accessors.get(Accessors.java:20)
	at org.logstash.Event.getUnconvertedField(Event.java:160)
	at org.logstash.ext.JrubyEventExtLibrary$RubyEvent.ruby_get_field(JrubyEventExtLibrary.java:113)
	at org.logstash.ext.JrubyEventExtLibrary$RubyEvent$INVOKER$i$1$0$ruby_get_field.call(JrubyEventExtLibrary$RubyEvent$INVOKER$i$1$0$ruby_get_field.gen)
	at org.jruby.runtime.callsite.CachingCallSite.call(CachingCallSite.java:168)
	at org.jruby.ast.CallOneArgNode.interpret(CallOneArgNode.java:57)
	at org.jruby.ast.NewlineNode.interpret(NewlineNode.java:105)
	at org.jruby.ast.IfNode.interpret(IfNode.java:110)
	at org.jruby.ast.NewlineNode.interpret(NewlineNode.java:105)
	at org.jruby.ast.BlockNode.interpret(BlockNode.java:71)
	at org.jruby.evaluator.ASTInterpreter.INTERPRET_BLOCK(ASTInterpreter.java:112)
	at org.jruby.runtime.Interpreted19Block.evalBlockBody(Interpreted19Block.java:206)
	at org.jruby.runtime.Interpreted19Block.yield(Interpreted19Block.java:157)
	at org.jruby.runtime.Block.yield(Block.java:142)
	at org.jruby.RubyArray.eachCommon(RubyArray.java:1606)
	at org.jruby.RubyArray.each(RubyArray.java:1613)
	at org.jruby.RubyArray$INVOKER$i$0$0$each.call(RubyArray$INVOKER$i$0$0$each.gen)
	at org.jruby.runtime.callsite.CachingCallSite.callBlock(CachingCallSite.java:143)
	at org.jruby.runtime.callsite.CachingCallSite.callIter(CachingCallSite.java:154)
	at org.jruby.ast.CallNoArgBlockNode.interpret(CallNoArgBlockNode.java:64)
	at org.jruby.ast.NewlineNode.interpret(NewlineNode.java:105)
	at org.jruby.ast.BlockNode.interpret(BlockNode.java:71)
	at org.jruby.evaluator.ASTInterpreter.INTERPRET_BLOCK(ASTInterpreter.java:112)
	at org.jruby.runtime.Interpreted19Block.evalBlockBody(Interpreted19Block.java:206)
	at org.jruby.runtime.Interpreted19Block.yield(Interpreted19Block.java:194)
	at org.jruby.runtime.Interpreted19Block.call(Interpreted19Block.java:125)
	at org.jruby.runtime.Block.call(Block.java:101)
	at org.jruby.RubyProc.call(RubyProc.java:300)
	at org.jruby.RubyProc.call19(RubyProc.java:281)
	at org.jruby.RubyProc$INVOKER$i$0$0$call19.call(RubyProc$INVOKER$i$0$0$call19.gen)
	at org.jruby.internal.runtime.methods.DynamicMethod.call(DynamicMethod.java:210)
	at org.jruby.internal.runtime.methods.DynamicMethod.call(DynamicMethod.java:206)
	at org.jruby.runtime.callsite.CachingCallSite.call(CachingCallSite.java:168)
	at org.jruby.ast.CallOneArgNode.interpret(CallOneArgNode.java:57)
	at org.jruby.ast.DAsgnNode.interpret(DAsgnNode.java:110)
	at org.jruby.ast.NewlineNode.interpret(NewlineNode.java:105)
	at org.jruby.ast.BlockNode.interpret(BlockNode.java:71)
	at org.jruby.evaluator.ASTInterpreter.INTERPRET_BLOCK(ASTInterpreter.java:112)
	at org.jruby.runtime.Interpreted19Block.evalBlockBody(Interpreted19Block.java:206)
	at org.jruby.runtime.Interpreted19Block.yield(Interpreted19Block.java:194)
	at org.jruby.runtime.Interpreted19Block.call(Interpreted19Block.java:125)
	at org.jruby.runtime.Block.call(Block.java:101)
	at org.jruby.RubyProc.call(RubyProc.java:300)
	at org.jruby.internal.runtime.methods.ProcMethod.call(ProcMethod.java:64)
	at org.jruby.internal.runtime.methods.DynamicMethod.call(DynamicMethod.java:210)
	at org.jruby.internal.runtime.methods.DynamicMethod.call(DynamicMethod.java:206)
	at org.jruby.runtime.callsite.CachingCallSite.call(CachingCallSite.java:168)
	...

Seems like an bug to me. Any suggestions?

What does your configuration look like? And a sample event?

Config:

input {
  beats {
    port => "5045"
  }
}

filter {
  if [type] in [ "ServiceLogEntry", "TaskLogEntry" ] {
    json {
      source => "message"
    }
  }

  # Move all elements under properties on level up because it doesn't look very pretty in kibana
  if [type] in [ "ServiceLogEntry", "TaskLogEntry" ] and [properties] {
    mutate {
      rename => {"[properties][customer]" => "customer"}
      rename => {"[properties][service]" => "service"}
      rename => {"[properties][taskExecution]" => "taskExecution"}
      rename => {"[properties][workflowExecution]" => "workflowExecution"}
      rename => {"[properties][exception]" => "exception"}
    }
  }

  if [type] == "TaskLogEntry" {
    if [taskExecution][endStatus] {
      mutate {
        add_field => { "status" => "%{[taskExecution][endStatus]}" }
      }
    } else if [taskExecution][status] {
      mutate {
        add_field => { "status" => "%{[taskExecution][status]}" }
      }
    }

    if [workflowExecution][workflowId] {
      mutate {
        add_field => { "title" => "%{[workflowExecution][workflowId]} - %{[workflowExecution][stepId]}" }
      }
    }

    mutate {
      rename => { "[taskExecution][startedAt]" => "startedAt" }
      rename => { "[taskExecution][finishedAt]" => "finishedAt" }
      rename => { "[service][name]" => "service" }

      # The exception is already available on top level
      remove_tag => [ "[taskExecution][exception]" ]
    }
  }

  if [type] == "ServiceLogEntry" {
    mutate {
      rename => { "[service][name]" => "service" }
    }
  }

  if [type] == "JobExecution" {
    mutate {
      # map the old fields to the new ones
      rename => { "system" => "service" }
      rename => { "[data][parameters][strategy]" => "strategy" }
      rename => { "name" => "title" }
      rename => { "executionTimeMs" => "duration" }
      rename => { "[id]" => "[taskExecution][id]" }
      rename => { "[parameters][arguments.request.bidderId]" => "bidderId" }
      rename => { "[parameters][arguments.request.parameters.bidderId]" => "bidderId" }
      rename => { "[parameters][arguments.request.catalogImportConfigurationId]" => "catalogImportConfigurationId" }
      rename => { "[parameters][arguments.request.parameters.catalogImportConfigurationId]" => "catalogImportConfigurationId" }
      rename => { "[parameters][arguments.feeds.catalogImportConfigurationId]" => "catalogImportConfigurationId" }
      rename => { "[parameters][strategy]" => "batch_strategy" }

      # move the remaining fields to old_data
      rename => { "data" => "[old_data][data]" }
      rename => { "parameters" => "[old_data][parameters]" }
      rename => { "type" => "[old_data][type]" }

      add_field => { "type" => "TaskLogEntry" }
    }
  }

  if [type] == "Message" {
    mutate {
      # map the old fields to the new ones
      rename => { "system" => "service" }
      rename => { "[data][event][id]" => "[taskExecution][id]" }
      rename => { "[data][event][bidderId]" => "bidderId" }
      rename => { "[data][event][customerId]" => "customerId" }
      rename => { "[data][event][correlationId]" => "correlationId" }
      rename => { "[data][event][correlationIndex]" => "correlationIndex" }

      rename => { "[data][request][bidderId]" => "bidderId" }

      rename => { "[data][configuration.id]" => "catalogImportConfigurationId" }
      rename => { "[data][jobParameter.arguments.feeds.catalogImportConfigurationId]" => "catalogImportConfigurationId" }
      rename => { "[data][jobParameter.arguments.request.catalogImportConfigurationId]" => "catalogImportConfigurationId" }

      # move the remaining fields to old_data
      rename => { "context" => "[old_data][context]" }
      rename => { "data" => "[old_data][data]" }
      rename => { "tags" => "[old_data][tags]" }
      rename => { "type" => "[old_data][type]" }

      add_field => { "type" => "ServiceLogEntry" }
    }

    # new logs have upper case log-level
    mutate {
      uppercase => [ "level" ]
    }

  }

  if [type] in [ "ServiceLogEntry", "TaskLogEntry" ] and ! [exception] {

    if [old_data][data][exceptionMessage] {
      mutate {
        rename => { "[old_data][data][exceptionMessage]" => "[exception][message]" }
      }
    }
    if [old_data][data][exceptionStackTrace] {
      mutate {
        rename => { "[old_data][data][exceptionStackTrace]" => "[exception][stackTrace]" }
      }
    }

  }

  if [type] in [ "ServiceLogEntry", "TaskLogEntry" ] and [customer] {
    mutate {
      rename => {"[customer][id]" => "customerId" }
      rename => {"[customer][name]" => "customerName" }
    }
  }
  if [taskExecution][input][bidderId] {
    mutate {
      rename => {"[taskExecution][input][bidderId]" => "bidderId" }
    }
  }

  if ! [service] {
    mutate {
      add_field => { "service" => "UNKNOWN" }
    }
  }
}

output {
  if [type] == "ServiceLogEntry" {
    if [taskExecution][id] {
      elasticsearch {
          hosts => ["es-master-1:9200"]
          manage_template => false
          index => "servicelog-data-%{+YYYY.MM.dd}"
          document_id => "%{[taskExecution][id]}"
      }
    } else {
      elasticsearch {
        hosts => ["es-master-1:9200"]
        manage_template => false
        index => "servicelog-data-%{+YYYY.MM.dd}"
      }
    }
  } else if [type] == "TaskLogEntry" {
    if [taskExecution][id] {
      elasticsearch {
          hosts => ["es-master-1:9200"]
          manage_template => false
          index => "tasklog-data"
          document_id => "%{[taskExecution][id]}"
      }
    } else {
      elasticsearch {
          hosts => ["es-master-1:9200"]
          manage_template => false
          index => "tasklog-data"
      }
    }

    if ([level] != "DEBUG") {
      elasticsearch {
        hosts => ["es-master-1:9200"]
        manage_template => false
        index => "servicelog-data-%{+YYYY.MM.dd}"
      }
    }
  } else {
    stdout {
      codec => "rubydebug"
    }
  }
}

Seems like that the problematic event (last event in the log) is:

{
  "level": "DEBUG",
  "message": "InvoicesWorkflow/anonymous-step-2: WaitingForExecution",
  "log_source": "smarter.ecommerce.commons.spring.task.ServiceLogTaskRunnerPlugin",
  "properties": {
    "service": {
      "name": "Whoop"
    },
    "workflowExecution": {
      "workflowId": "InvoicesWorkflow",
      "id": "InvoicesWorkflow:regenerateInvoice:7b802029-1719-4dfc-ba06-41d45b591347",
      "stepId": "anonymous-step-2"
    },
    "taskExecution": {
      "group": "InvoicesWorkflow",
      "name": "anonymous-step-2",
      "id": "taskRunner://smecTaskRunner/InvoicesWorkflow/anonymous-step-2/8591c1aa-72b7-4cae-a4ed-8dac1d146f49_4",
      "createdAt": 1484667736438,
      "status": "WaitingForExecution",
      "endStatus": null,
      "priority": 100,
      "startedAt": null,
      "finishedAt": null,
      "input": [
        {
          "accountId": 527,
          "invoiceId": 3259,
          "hasInvoiceNumberCounter": true,
          "hasChanges": true
        }
      ],
      "lockKey": null,
      "output": null,
      "cancellation": null,
      "exception": null,
      "duration": null
    }
  },
  "createdAt": 1484667736438
}

Hmm, not sure what's up here. Perhaps you can narrow down the configuration and see what triggers the crash?

Yeah, I think that's a good idea. Thanks.

Narrowed down the Problem:

This part in the configuration causes logstash stop working:

if [taskExecution][input][bidderId] {
    ...
  }

The reason is that different systems report to logstash and the systems can have a different structure of their documents: In this case, the difference is, that the element input can be a list or just have sub-elements:

Working doc:

{
  ...
  "properties": {
    "taskExecution": {
      "input": {
        "bidderId": 1096,
        "changeType": "RulesChanged"
      }
    }
}

Not working

{
  ...
  "properties": {
    "taskExecution": {
      "input": [
        {
          "accountId": 527,
          "invoiceId": 3259,
          "hasInvoiceNumberCounter": true,
          "hasChanges": true
        }
      ]
    }
  }
}

If this document is passed to Elasticsearch no problem will happen there, because the mapping of this field is

"input": {
                "type": "object",
                "enabled": false
              },

So this still seems like a bug to me.

Yeah, it should handle this more gracefully. Feel free to file a GitHub issue.

I opened an issue.

But is there any workaround in Logstash to prevent this by e.g. checking first if the data inside the input is a List?

I'm pretty sure you have to use a ruby filter for that.

Thanks. I will try that until it is fixed.

Maybe a little hint, how this can be done with ruby? I searched for examples but couldn't find any.

So you want

  if [taskExecution][input][bidderId] {
    mutate {
      rename => {"[taskExecution][input][bidderId]" => "bidderId" }
    }
  }

to do nothing if [taskExecution][input][bidderId] is an array? Something like this might work as a replacement:

ruby {
  code => "
    if not event.get('[taskExecution][input][bidderId]').is_a? Array
      event.set('bidderId', event.get('[taskExecution][input][bidderId]'))
      event.remove('[taskExecution][input][bidderId]')
    end
  "
}

Here's another way of checking for the field presence which will not trigger this NumberFormatException on an array field.

The idea is to use the conditional "in" as with if "b" in [a] or in this case if "bidderId" in [taskExecution][input]

Here's a snippet what will show this using 4 different document formats with the first one the only valid one and the second that contains an array.

echo -e '{"a":{"b":1}}\n{"a":[1,2]}\n{"a":"1"}\n{"a": {}}' | bin/logstash -w1 -e 'input{stdin {codec => json_lines}} filter{if "b" in [a] {mutate {add_field => {"b" => "%{[a][b]}" }}}}'

And the output is:

{
             "a" => {
        "b" => 1
    },
             "b" => "1",
    "@timestamp" => 2017-01-20T18:02:53.322Z,
      "@version" => "1",
          "host" => "colin-mbp13r-2.local"
}
{
             "a" => [
        [0] 1,
        [1] 2
    ],
    "@timestamp" => 2017-01-20T18:02:53.392Z,
      "@version" => "1",
          "host" => "colin-mbp13r-2.local"
}
{
             "a" => "1",
    "@timestamp" => 2017-01-20T18:02:53.417Z,
      "@version" => "1",
          "host" => "colin-mbp13r-2.local"
}
{
             "a" => {},
    "@timestamp" => 2017-01-20T18:02:53.418Z,
      "@version" => "1",
          "host" => "colin-mbp13r-2.local"
}

So we see that the only event which has the "b" field added is the one where the input document had a [a][b] field value, all others where ignored.

Also, I agree we should have a better exception report for this NumberFormatException.

Colin

@colinsurprenant

That's the solution I was looking for! Thanks.

1 Like

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.