NumberFormatException stops logstash from working


(Guenther Grill) #1

Hi,

We have logstash running for a while now, but yesterday something happened that stopped it from working.

[2017-01-18T10:56:16,553][DEBUG][logstash.inputs.beats    ] config LogStash::Inputs::Beats/@client_inactivity_timeout = 60
deploy@apollo:/etc/logstash$ docker logs logstash-services.1.7ilmsb0gdld6wkpd0j309g6es
[2017-01-18T10:56:25,425][DEBUG][logstash.filters.json    ] Running json filter {:event=>2017-01-18T08:16:46.209Z orion {"level":"DEBUG","message":"InvoicesWorkflow/anonymous-step-2: WaitingForExecution","log_source":"smarter.ecommerce.commons.spring.task.ServiceLogTaskRunnerPlugin","properties":{"service":{"name":"Whoop"},"workflowExecution":{"workflowId":"InvoicesWorkflow","id":"InvoicesWorkflow:regenerateInvoice:7b802029-1719-4dfc-ba06-41d45b591347","stepId":"anonymous-step-2"},"taskExecution":{"group":"InvoicesWorkflow","name":"anonymous-step-2","id":"taskRunner://smecTaskRunner/InvoicesWorkflow/anonymous-step-2/8591c1aa-72b7-4cae-a4ed-8dac1d146f49_4","createdAt":1484667736438,"status":"WaitingForExecution","endStatus":null,"priority":100,"startedAt":null,"finishedAt":null,"input":[{"accountId":527,"invoiceId":3259,"hasInvoiceNumberCounter":true,"hasChanges":true}],"lockKey":null,"output":null,"cancellation":null,"exception":null,"duration":null}},"createdAt":1484667736438}}
[2017-01-18T10:56:25,425][DEBUG][logstash.filters.json    ] Event after json filter {:event=>2017-01-18T08:16:46.209Z orion InvoicesWorkflow/anonymous-step-2: WaitingForExecution}
[2017-01-18T10:56:25,425][DEBUG][logstash.util.decorators ] filters/LogStash::Filters::Mutate: adding value to field {"field"=>"status", "value"=>["%{[taskExecution][status]}"]}
[2017-01-18T10:56:25,425][DEBUG][logstash.util.decorators ] filters/LogStash::Filters::Mutate: adding value to field {"field"=>"title", "value"=>["%{[workflowExecution][workflowId]} - %{[workflowExecution][stepId]}"]}
[2017-01-18T10:56:25,425][DEBUG][logstash.filters.mutate  ] filters/LogStash::Filters::Mutate: removing tag {:tag=>"[taskExecution][exception]"}
...
Exception in pipelineworker, the pipeline stopped processing new events, please check your filter configuration and restart Logstash.
...
Exception in thread "[main]>worker0" java.lang.NumberFormatException: For input string: "bidderId"
	at java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)
	at java.lang.Integer.parseInt(Integer.java:580)
	at java.lang.Integer.parseInt(Integer.java:615)
	at org.logstash.Accessors.fetch(Accessors.java:130)
	at org.logstash.Accessors.get(Accessors.java:20)
	at org.logstash.Event.getUnconvertedField(Event.java:160)
	at org.logstash.ext.JrubyEventExtLibrary$RubyEvent.ruby_get_field(JrubyEventExtLibrary.java:113)
	at org.logstash.ext.JrubyEventExtLibrary$RubyEvent$INVOKER$i$1$0$ruby_get_field.call(JrubyEventExtLibrary$RubyEvent$INVOKER$i$1$0$ruby_get_field.gen)
	at org.jruby.runtime.callsite.CachingCallSite.call(CachingCallSite.java:168)
	at org.jruby.ast.CallOneArgNode.interpret(CallOneArgNode.java:57)
	at org.jruby.ast.NewlineNode.interpret(NewlineNode.java:105)
	at org.jruby.ast.IfNode.interpret(IfNode.java:110)
	at org.jruby.ast.NewlineNode.interpret(NewlineNode.java:105)
	at org.jruby.ast.BlockNode.interpret(BlockNode.java:71)
	at org.jruby.evaluator.ASTInterpreter.INTERPRET_BLOCK(ASTInterpreter.java:112)
	at org.jruby.runtime.Interpreted19Block.evalBlockBody(Interpreted19Block.java:206)
	at org.jruby.runtime.Interpreted19Block.yield(Interpreted19Block.java:157)
	at org.jruby.runtime.Block.yield(Block.java:142)
	at org.jruby.RubyArray.eachCommon(RubyArray.java:1606)
	at org.jruby.RubyArray.each(RubyArray.java:1613)
	at org.jruby.RubyArray$INVOKER$i$0$0$each.call(RubyArray$INVOKER$i$0$0$each.gen)
	at org.jruby.runtime.callsite.CachingCallSite.callBlock(CachingCallSite.java:143)
	at org.jruby.runtime.callsite.CachingCallSite.callIter(CachingCallSite.java:154)
	at org.jruby.ast.CallNoArgBlockNode.interpret(CallNoArgBlockNode.java:64)
	at org.jruby.ast.NewlineNode.interpret(NewlineNode.java:105)
	at org.jruby.ast.BlockNode.interpret(BlockNode.java:71)
	at org.jruby.evaluator.ASTInterpreter.INTERPRET_BLOCK(ASTInterpreter.java:112)
	at org.jruby.runtime.Interpreted19Block.evalBlockBody(Interpreted19Block.java:206)
	at org.jruby.runtime.Interpreted19Block.yield(Interpreted19Block.java:194)
	at org.jruby.runtime.Interpreted19Block.call(Interpreted19Block.java:125)
	at org.jruby.runtime.Block.call(Block.java:101)
	at org.jruby.RubyProc.call(RubyProc.java:300)
	at org.jruby.RubyProc.call19(RubyProc.java:281)
	at org.jruby.RubyProc$INVOKER$i$0$0$call19.call(RubyProc$INVOKER$i$0$0$call19.gen)
	at org.jruby.internal.runtime.methods.DynamicMethod.call(DynamicMethod.java:210)
	at org.jruby.internal.runtime.methods.DynamicMethod.call(DynamicMethod.java:206)
	at org.jruby.runtime.callsite.CachingCallSite.call(CachingCallSite.java:168)
	at org.jruby.ast.CallOneArgNode.interpret(CallOneArgNode.java:57)
	at org.jruby.ast.DAsgnNode.interpret(DAsgnNode.java:110)
	at org.jruby.ast.NewlineNode.interpret(NewlineNode.java:105)
	at org.jruby.ast.BlockNode.interpret(BlockNode.java:71)
	at org.jruby.evaluator.ASTInterpreter.INTERPRET_BLOCK(ASTInterpreter.java:112)
	at org.jruby.runtime.Interpreted19Block.evalBlockBody(Interpreted19Block.java:206)
	at org.jruby.runtime.Interpreted19Block.yield(Interpreted19Block.java:194)
	at org.jruby.runtime.Interpreted19Block.call(Interpreted19Block.java:125)
	at org.jruby.runtime.Block.call(Block.java:101)
	at org.jruby.RubyProc.call(RubyProc.java:300)
	at org.jruby.internal.runtime.methods.ProcMethod.call(ProcMethod.java:64)
	at org.jruby.internal.runtime.methods.DynamicMethod.call(DynamicMethod.java:210)
	at org.jruby.internal.runtime.methods.DynamicMethod.call(DynamicMethod.java:206)
	at org.jruby.runtime.callsite.CachingCallSite.call(CachingCallSite.java:168)
	...

Seems like an bug to me. Any suggestions?


(Magnus Bäck) #2

What does your configuration look like? And a sample event?


(Guenther Grill) #3

Config:

input {
  beats {
    port => "5045"
  }
}

filter {
  if [type] in [ "ServiceLogEntry", "TaskLogEntry" ] {
    json {
      source => "message"
    }
  }

  # Move all elements under properties on level up because it doesn't look very pretty in kibana
  if [type] in [ "ServiceLogEntry", "TaskLogEntry" ] and [properties] {
    mutate {
      rename => {"[properties][customer]" => "customer"}
      rename => {"[properties][service]" => "service"}
      rename => {"[properties][taskExecution]" => "taskExecution"}
      rename => {"[properties][workflowExecution]" => "workflowExecution"}
      rename => {"[properties][exception]" => "exception"}
    }
  }

  if [type] == "TaskLogEntry" {
    if [taskExecution][endStatus] {
      mutate {
        add_field => { "status" => "%{[taskExecution][endStatus]}" }
      }
    } else if [taskExecution][status] {
      mutate {
        add_field => { "status" => "%{[taskExecution][status]}" }
      }
    }

    if [workflowExecution][workflowId] {
      mutate {
        add_field => { "title" => "%{[workflowExecution][workflowId]} - %{[workflowExecution][stepId]}" }
      }
    }

    mutate {
      rename => { "[taskExecution][startedAt]" => "startedAt" }
      rename => { "[taskExecution][finishedAt]" => "finishedAt" }
      rename => { "[service][name]" => "service" }

      # The exception is already available on top level
      remove_tag => [ "[taskExecution][exception]" ]
    }
  }

  if [type] == "ServiceLogEntry" {
    mutate {
      rename => { "[service][name]" => "service" }
    }
  }

  if [type] == "JobExecution" {
    mutate {
      # map the old fields to the new ones
      rename => { "system" => "service" }
      rename => { "[data][parameters][strategy]" => "strategy" }
      rename => { "name" => "title" }
      rename => { "executionTimeMs" => "duration" }
      rename => { "[id]" => "[taskExecution][id]" }
      rename => { "[parameters][arguments.request.bidderId]" => "bidderId" }
      rename => { "[parameters][arguments.request.parameters.bidderId]" => "bidderId" }
      rename => { "[parameters][arguments.request.catalogImportConfigurationId]" => "catalogImportConfigurationId" }
      rename => { "[parameters][arguments.request.parameters.catalogImportConfigurationId]" => "catalogImportConfigurationId" }
      rename => { "[parameters][arguments.feeds.catalogImportConfigurationId]" => "catalogImportConfigurationId" }
      rename => { "[parameters][strategy]" => "batch_strategy" }

      # move the remaining fields to old_data
      rename => { "data" => "[old_data][data]" }
      rename => { "parameters" => "[old_data][parameters]" }
      rename => { "type" => "[old_data][type]" }

      add_field => { "type" => "TaskLogEntry" }
    }
  }

  if [type] == "Message" {
    mutate {
      # map the old fields to the new ones
      rename => { "system" => "service" }
      rename => { "[data][event][id]" => "[taskExecution][id]" }
      rename => { "[data][event][bidderId]" => "bidderId" }
      rename => { "[data][event][customerId]" => "customerId" }
      rename => { "[data][event][correlationId]" => "correlationId" }
      rename => { "[data][event][correlationIndex]" => "correlationIndex" }

      rename => { "[data][request][bidderId]" => "bidderId" }

      rename => { "[data][configuration.id]" => "catalogImportConfigurationId" }
      rename => { "[data][jobParameter.arguments.feeds.catalogImportConfigurationId]" => "catalogImportConfigurationId" }
      rename => { "[data][jobParameter.arguments.request.catalogImportConfigurationId]" => "catalogImportConfigurationId" }

      # move the remaining fields to old_data
      rename => { "context" => "[old_data][context]" }
      rename => { "data" => "[old_data][data]" }
      rename => { "tags" => "[old_data][tags]" }
      rename => { "type" => "[old_data][type]" }

      add_field => { "type" => "ServiceLogEntry" }
    }

    # new logs have upper case log-level
    mutate {
      uppercase => [ "level" ]
    }

  }

  if [type] in [ "ServiceLogEntry", "TaskLogEntry" ] and ! [exception] {

    if [old_data][data][exceptionMessage] {
      mutate {
        rename => { "[old_data][data][exceptionMessage]" => "[exception][message]" }
      }
    }
    if [old_data][data][exceptionStackTrace] {
      mutate {
        rename => { "[old_data][data][exceptionStackTrace]" => "[exception][stackTrace]" }
      }
    }

  }

  if [type] in [ "ServiceLogEntry", "TaskLogEntry" ] and [customer] {
    mutate {
      rename => {"[customer][id]" => "customerId" }
      rename => {"[customer][name]" => "customerName" }
    }
  }
  if [taskExecution][input][bidderId] {
    mutate {
      rename => {"[taskExecution][input][bidderId]" => "bidderId" }
    }
  }

  if ! [service] {
    mutate {
      add_field => { "service" => "UNKNOWN" }
    }
  }
}

output {
  if [type] == "ServiceLogEntry" {
    if [taskExecution][id] {
      elasticsearch {
          hosts => ["es-master-1:9200"]
          manage_template => false
          index => "servicelog-data-%{+YYYY.MM.dd}"
          document_id => "%{[taskExecution][id]}"
      }
    } else {
      elasticsearch {
        hosts => ["es-master-1:9200"]
        manage_template => false
        index => "servicelog-data-%{+YYYY.MM.dd}"
      }
    }
  } else if [type] == "TaskLogEntry" {
    if [taskExecution][id] {
      elasticsearch {
          hosts => ["es-master-1:9200"]
          manage_template => false
          index => "tasklog-data"
          document_id => "%{[taskExecution][id]}"
      }
    } else {
      elasticsearch {
          hosts => ["es-master-1:9200"]
          manage_template => false
          index => "tasklog-data"
      }
    }

    if ([level] != "DEBUG") {
      elasticsearch {
        hosts => ["es-master-1:9200"]
        manage_template => false
        index => "servicelog-data-%{+YYYY.MM.dd}"
      }
    }
  } else {
    stdout {
      codec => "rubydebug"
    }
  }
}

(Guenther Grill) #4

Seems like that the problematic event (last event in the log) is:

{
  "level": "DEBUG",
  "message": "InvoicesWorkflow/anonymous-step-2: WaitingForExecution",
  "log_source": "smarter.ecommerce.commons.spring.task.ServiceLogTaskRunnerPlugin",
  "properties": {
    "service": {
      "name": "Whoop"
    },
    "workflowExecution": {
      "workflowId": "InvoicesWorkflow",
      "id": "InvoicesWorkflow:regenerateInvoice:7b802029-1719-4dfc-ba06-41d45b591347",
      "stepId": "anonymous-step-2"
    },
    "taskExecution": {
      "group": "InvoicesWorkflow",
      "name": "anonymous-step-2",
      "id": "taskRunner://smecTaskRunner/InvoicesWorkflow/anonymous-step-2/8591c1aa-72b7-4cae-a4ed-8dac1d146f49_4",
      "createdAt": 1484667736438,
      "status": "WaitingForExecution",
      "endStatus": null,
      "priority": 100,
      "startedAt": null,
      "finishedAt": null,
      "input": [
        {
          "accountId": 527,
          "invoiceId": 3259,
          "hasInvoiceNumberCounter": true,
          "hasChanges": true
        }
      ],
      "lockKey": null,
      "output": null,
      "cancellation": null,
      "exception": null,
      "duration": null
    }
  },
  "createdAt": 1484667736438
}

(Magnus Bäck) #5

Hmm, not sure what's up here. Perhaps you can narrow down the configuration and see what triggers the crash?


(Guenther Grill) #6

Yeah, I think that's a good idea. Thanks.


(Guenther Grill) #7

Narrowed down the Problem:

This part in the configuration causes logstash stop working:

if [taskExecution][input][bidderId] {
    ...
  }

The reason is that different systems report to logstash and the systems can have a different structure of their documents: In this case, the difference is, that the element input can be a list or just have sub-elements:

Working doc:

{
  ...
  "properties": {
    "taskExecution": {
      "input": {
        "bidderId": 1096,
        "changeType": "RulesChanged"
      }
    }
}

Not working

{
  ...
  "properties": {
    "taskExecution": {
      "input": [
        {
          "accountId": 527,
          "invoiceId": 3259,
          "hasInvoiceNumberCounter": true,
          "hasChanges": true
        }
      ]
    }
  }
}

If this document is passed to Elasticsearch no problem will happen there, because the mapping of this field is

"input": {
                "type": "object",
                "enabled": false
              },

So this still seems like a bug to me.


(Magnus Bäck) #8

Yeah, it should handle this more gracefully. Feel free to file a GitHub issue.


(Guenther Grill) #9

I opened an issue.

But is there any workaround in Logstash to prevent this by e.g. checking first if the data inside the input is a List?


(Magnus Bäck) #10

I'm pretty sure you have to use a ruby filter for that.


(Guenther Grill) #11

Thanks. I will try that until it is fixed.


(Guenther Grill) #12

Maybe a little hint, how this can be done with ruby? I searched for examples but couldn't find any.


(Magnus Bäck) #13

So you want

  if [taskExecution][input][bidderId] {
    mutate {
      rename => {"[taskExecution][input][bidderId]" => "bidderId" }
    }
  }

to do nothing if [taskExecution][input][bidderId] is an array? Something like this might work as a replacement:

ruby {
  code => "
    if not event.get('[taskExecution][input][bidderId]').is_a? Array
      event.set('bidderId', event.get('[taskExecution][input][bidderId]'))
      event.remove('[taskExecution][input][bidderId]')
    end
  "
}

(Colin Surprenant) #14

Here's another way of checking for the field presence which will not trigger this NumberFormatException on an array field.

The idea is to use the conditional "in" as with if "b" in [a] or in this case if "bidderId" in [taskExecution][input]

Here's a snippet what will show this using 4 different document formats with the first one the only valid one and the second that contains an array.

echo -e '{"a":{"b":1}}\n{"a":[1,2]}\n{"a":"1"}\n{"a": {}}' | bin/logstash -w1 -e 'input{stdin {codec => json_lines}} filter{if "b" in [a] {mutate {add_field => {"b" => "%{[a][b]}" }}}}'

And the output is:

{
             "a" => {
        "b" => 1
    },
             "b" => "1",
    "@timestamp" => 2017-01-20T18:02:53.322Z,
      "@version" => "1",
          "host" => "colin-mbp13r-2.local"
}
{
             "a" => [
        [0] 1,
        [1] 2
    ],
    "@timestamp" => 2017-01-20T18:02:53.392Z,
      "@version" => "1",
          "host" => "colin-mbp13r-2.local"
}
{
             "a" => "1",
    "@timestamp" => 2017-01-20T18:02:53.417Z,
      "@version" => "1",
          "host" => "colin-mbp13r-2.local"
}
{
             "a" => {},
    "@timestamp" => 2017-01-20T18:02:53.418Z,
      "@version" => "1",
          "host" => "colin-mbp13r-2.local"
}

So we see that the only event which has the "b" field added is the one where the input document had a [a][b] field value, all others where ignored.

Also, I agree we should have a better exception report for this NumberFormatException.

Colin


(Guenther Grill) #15

@colinsurprenant

That's the solution I was looking for! Thanks.


(system) #16

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.