Filter and Output plugin not working

Hi,

I need some help regarding JDBC input plugin.

The documentation says : * You can periodically schedule ingestion using a cron syntax (see schedule setting) or run the query one time to load data into Logstash*

I would like to run the query only once to make a huge insert in ES.
But in the below configuration, if i remove the schedule configuration, filter and output plugin don't work.. If i let the schedule configuration, it will work correctly but insert several times the same jdbc rows.

Here is the configuration :

input{
  jdbc{
    jdbc_driver_library => 'postgresql-42.2.12.jar'
    jdbc_driver_class => 'org.postgresql.Driver'
    jdbc_connection_string => 'jdbc:postgresql://host:port/instanceName' 
    jdbc_user => 'Username'
    jdbc_password => 'PasswordMegaSecured'
    schedule => "* * * * *"
    statement => 'select * from "Schema"."dblogs"'
  }
}
filter{
  aggregate{
    task_id => "%{id}"
    code => "
      map['id'] = event.get('id')
      map['source'] = event.get('source')
      map['target'] = event.get('target')
      map['logs'] ||= []
      map['logs'] << {
        'stacktrace' => event.get('stacktrace')
      }
    "
    push_map_as_event_on_timeout => true
    timeout => 10
    timeout_code => "
      event.set('[@aggregated]', 1)
    "
  }
  if [@aggregated] != 1 { drop {} }
  mutate{
    remove_field => ['@timestamp','@version','@aggregated']
  }
}
output{
  elasticsearch {
    hosts => ["192.168.119.136:9200"]
    index => "myindex"
  }
}

Can you help me ?
Thanks you.

In what way do they not work?

Hi Badger,

Thanks for your reply.

I don't have any output in ES. I tried to output with stdout and after the sql request, i got logstash shutdown.

Try removing the timeout_code and instead adding

event.cancel()

to the end of the code option.

Hi Badger,

I updated my configuration and still the same result..

Here is the conf :

filter{
  aggregate{
    task_id => "%{id}"
    code => "
      map['id'] = event.get('id')
      map['source'] = event.get('source')
      map['target'] = event.get('target')
      map['logs'] ||= []
      map['logs'] << {
        'stacktrace' => event.get('stacktrace')
      }
      event.cancel()
    "
    push_map_as_event_on_timeout => true
    timeout => 10
#    timeout_code => "
#      event.set('[@aggregated]', 1)
#    "
  }
#  if [@aggregated] != 1 { drop {} }
#  mutate{
#    remove_field => ['@timestamp','@version','@aggregated']
#  }
}
output{
  elasticsearch {
    hosts => ["192.168.119.136:9200"]
    index => "nested3"
  }
}

Thanks for helping !

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.