Filter - key through values obtained from other keys

Does anyone know if it is possible to mount a key through values obtained from other keys in logstash.

My use case:

I have data in a relational database in which I am trying to take to elasticsearch through logstash-jdbc.

I have 3 tables called: aut, lan and log

The name of each column of the tables mentioned above, is preceded by the table name, model:

TABLE AUT

Nome             Nulo     Tipo          
---------------- -------- ------------- 
AUTNSU           NOT NULL NUMBER(12)    
AUTTERCOD                 VARCHAR2(8)   
AUTDATREF        NOT NULL DATE          
AUTDATHORINICIO  NOT NULL DATE          
AUTDATHORFIM     NOT NULL DATE          
AUTDATHORULTTENT NOT NULL DATE          
AUTSTA           NOT NULL NUMBER(2)     
AUTMOTCAN                 NUMBER(6)     

TABLE LOG

Nome         Nulo     Tipo          
------------ -------- ------------- 
LOGNSUGRL    NOT NULL NUMBER(12)    
LOGTERCOD    NOT NULL VARCHAR2(8)   
LOGNSU                NUMBER(12)    
LOGDATREF    NOT NULL DATE          
LOGDATHOR    NOT NULL DATE          
LOGDATHORGRV NOT NULL DATE          

TABLE LAN

Nome         Nulo     Tipo         
------------ -------- ------------ 
LANID        NOT NULL CHAR(36)     
LANDATHOR    NOT NULL DATE         
LANDATREF    NOT NULL DATE         
LANDES       NOT NULL VARCHAR2(40) 
LANNUMDOC    NOT NULL NUMBER(10)   
LANIDCBL     NOT NULL NUMBER(1)    
LANVAL       NOT NULL NUMBER(17,2) 
LANDATHORCRG          DATE         
LANDET                VARCHAR2(60) 
LANCMVIDE             NUMBER(12)   

Note that there is a *DATREF attribute that matches the date the event occurred.

For each table I have an input jdbc that in its settings I define the type according to the name of the table, example

  jdbc {
      jdbc_driver_library => "/etc/logstash/jdbc/ojdbc7.jar"
      jdbc_driver_class => "Java::oracle.jdbc.driver.OracleDriver"
      jdbc_connection_string => "jdbc:oracle:thin:@IP:PORTA/SID"
      jdbc_user => ""
      jdbc_password => ""
      jdbc_validate_connection => true
      jdbc_fetch_size => 50000
      statement => "select autnsublk as id_sync, aut.* from aut"
      schedule => "* * * * * America/Sao_Paulo"
      sql_log_level => "debug"
      type => "aut"
  }
  
  jdbc {
      jdbc_driver_library => "/etc/logstash/jdbc/ojdbc7.jar"
      jdbc_driver_class => "Java::oracle.jdbc.driver.OracleDriver"
      jdbc_connection_string => "jdbc:oracle:thin:@IP:PORTA/SID"
      jdbc_user => ""
      jdbc_password => ""
      jdbc_validate_connection => true
      jdbc_fetch_size => 50000
      statement => "select lognsugrl as id_sync, log.* from log"
      schedule => "* * * * * America/Sao_Paulo"
      sql_log_level => "debug"
      type => "log"
  }
  
  jdbc {
      jdbc_driver_library => "/etc/logstash/jdbc/ojdbc7.jar"
      jdbc_driver_class => "Java::oracle.jdbc.driver.OracleDriver"
      jdbc_connection_string => "jdbc:oracle:thin:@IP:PORTA/SID"
      jdbc_user => ""
      jdbc_password => ""
      jdbc_validate_connection => true
      jdbc_fetch_size => 50000
      statement => "select lanid as id_sync, lan.* from lan"
      schedule => "* * * * * America/Sao_Paulo"
      sql_log_level => "debug"
      type => "lan"
  }

I'm trying to get the value of the attribute that is in *datref and set it to a new attribute called from datref since I know I can not modify the attribute @timestamp

However, I'm having a hard time working with variables inside the filter.

My current filter.

filter {
  mutate {
    add_field => {
      "datref" => "%{"%{type}datref"}"
    }
  }
}

O exemplo acima não funciona, porém quando defino isso:

filter {
  mutate {
    add_field => {
      "datref" => "%{type}datref"
    }
  }
}

I can get exactly the name of the attribute having the following output "datref" => "landatref" or "datref" => "autdatref" or "datref" => "logdatref" plus I'm needing the value inside the key.

I would like this to work because handling this with if the filter would be painful because the application has more than 200 tables to be migrated to the elasticsearch.

All that I'm trying to do is for Kibana to work correctly with the Time Range because as the data is imported and the @timestamp does not match the *datref the filtering is not being correct.

Ideally I could put the content of *date in @timestamp, but I could not do it.

Does this help?

mutate { add_field => { "autdatref" => "foo" "type" => "aut" } }
ruby {
    code => '
        fieldname = event.get("type") + "datref"
        event.set("datref", event.get(fieldname))
    '
}

Perfect @Badger

For information. the filter was as follows:

filter {
  ruby {
    code => '
      fieldname = event.get("type") + "datref"
      event.set("datref", event.get(fieldname))
    '
  }
}

You agree that by migrating from a relational database to elasticsearch using logstash-jdbc, you could leave @timestamp overlapping, so you would avoid having that extra field.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.