Does anyone know if it is possible to mount a key through values obtained from other keys in logstash.
My use case:
I have data in a relational database in which I am trying to take to elasticsearch through logstash-jdbc.
I have 3 tables called: aut, lan and log
The name of each column of the tables mentioned above, is preceded by the table name, model:
TABLE AUT
Nome Nulo Tipo
---------------- -------- -------------
AUTNSU NOT NULL NUMBER(12)
AUTTERCOD VARCHAR2(8)
AUTDATREF NOT NULL DATE
AUTDATHORINICIO NOT NULL DATE
AUTDATHORFIM NOT NULL DATE
AUTDATHORULTTENT NOT NULL DATE
AUTSTA NOT NULL NUMBER(2)
AUTMOTCAN NUMBER(6)
TABLE LOG
Nome Nulo Tipo
------------ -------- -------------
LOGNSUGRL NOT NULL NUMBER(12)
LOGTERCOD NOT NULL VARCHAR2(8)
LOGNSU NUMBER(12)
LOGDATREF NOT NULL DATE
LOGDATHOR NOT NULL DATE
LOGDATHORGRV NOT NULL DATE
TABLE LAN
Nome Nulo Tipo
------------ -------- ------------
LANID NOT NULL CHAR(36)
LANDATHOR NOT NULL DATE
LANDATREF NOT NULL DATE
LANDES NOT NULL VARCHAR2(40)
LANNUMDOC NOT NULL NUMBER(10)
LANIDCBL NOT NULL NUMBER(1)
LANVAL NOT NULL NUMBER(17,2)
LANDATHORCRG DATE
LANDET VARCHAR2(60)
LANCMVIDE NUMBER(12)
Note that there is a *DATREF
attribute that matches the date the event occurred.
For each table I have an input jdbc that in its settings I define the type according to the name of the table, example
jdbc {
jdbc_driver_library => "/etc/logstash/jdbc/ojdbc7.jar"
jdbc_driver_class => "Java::oracle.jdbc.driver.OracleDriver"
jdbc_connection_string => "jdbc:oracle:thin:@IP:PORTA/SID"
jdbc_user => ""
jdbc_password => ""
jdbc_validate_connection => true
jdbc_fetch_size => 50000
statement => "select autnsublk as id_sync, aut.* from aut"
schedule => "* * * * * America/Sao_Paulo"
sql_log_level => "debug"
type => "aut"
}
jdbc {
jdbc_driver_library => "/etc/logstash/jdbc/ojdbc7.jar"
jdbc_driver_class => "Java::oracle.jdbc.driver.OracleDriver"
jdbc_connection_string => "jdbc:oracle:thin:@IP:PORTA/SID"
jdbc_user => ""
jdbc_password => ""
jdbc_validate_connection => true
jdbc_fetch_size => 50000
statement => "select lognsugrl as id_sync, log.* from log"
schedule => "* * * * * America/Sao_Paulo"
sql_log_level => "debug"
type => "log"
}
jdbc {
jdbc_driver_library => "/etc/logstash/jdbc/ojdbc7.jar"
jdbc_driver_class => "Java::oracle.jdbc.driver.OracleDriver"
jdbc_connection_string => "jdbc:oracle:thin:@IP:PORTA/SID"
jdbc_user => ""
jdbc_password => ""
jdbc_validate_connection => true
jdbc_fetch_size => 50000
statement => "select lanid as id_sync, lan.* from lan"
schedule => "* * * * * America/Sao_Paulo"
sql_log_level => "debug"
type => "lan"
}
I'm trying to get the value of the attribute that is in *datref
and set it to a new attribute called from datref since I know I can not modify the attribute @timestamp
However, I'm having a hard time working with variables inside the filter.
My current filter.
filter {
mutate {
add_field => {
"datref" => "%{"%{type}datref"}"
}
}
}
O exemplo acima não funciona, porém quando defino isso:
filter {
mutate {
add_field => {
"datref" => "%{type}datref"
}
}
}
I can get exactly the name of the attribute having the following output "datref" => "landatref" or "datref" => "autdatref" or "datref" => "logdatref"
plus I'm needing the value inside the key.
I would like this to work because handling this with if
the filter would be painful because the application has more than 200 tables to be migrated to the elasticsearch.
All that I'm trying to do is for Kibana to work correctly with the Time Range because as the data is imported and the @timestamp
does not match the *datref
the filtering is not being correct.
Ideally I could put the content of *date
in @timestamp, but I could not do it.