HI,
I have this big logstash conf :
input{
jdbc {
jdbc_driver_class => "Java::oracle.jdbc.driver.OracleDriver" jdbc_connection_string => "jdbc:oracle:thin:@CLORADZ-SCAN:1521/DONSC01" jdbc_user => "NSC" jdbc_password=> "adm" schedule => "* * * * *" statement => "SELECT ROWNUM, sdtcapture, sdatvac, squalifier1, dtdatestockage, squalifier2, spartievariable, sserveur,squalifier3 FROM TBL_CRE where dtdatestockage>:sql_last_value ORDER BY dtdatestockage ,sdtcapture,ROWNUM FETCH FIRST 200 ROWS WITH TIES" use_column_value => true tracking_column => dtdatestockage tracking_column_type => "timestamp" clean_run => true
}
}
filter {
if ("JOB" in [squalifier3]) {
grok {
match => { "spartievariable" => "!%{DATA:dtdebutReel}!%{DATA:dtfinReel}!%{DATA:sStatut}!%{DATA:idureeReel}!%{DATA:sjob}!" }
}
ruby{
code => "if event.get('squalifier1').length < 6 then event.set('new_msg','test_message') end "
}
mutate {
add_field => {"[attributes][dtdebutReel]" => "%{[dtdebutReel]}"} add_field => {"[attributes][dtfinRéel]" => "%{[dtfinReel]}"} add_field => {"[attributes][sStatut]" => "%{[sStatut]}"} add_field => { "[attributes][idureeReel]" => "%{[idureeReel]}"} add_field => { "[attributes][sjob]" => "%{[sjob]}"}
}
mutate {
rename => {"squalifier1" => "sCleUniqueInstance"} rename => {"squalifier2" => "sCleUniqueEtat"} rename => {"squalifier3" => "sCleUniqueComplement"} add_field => { "sCleCorOrigin" => "MVS"} add_field => { "sCleCorObjet" => "%{[sStatut]}"} add_field => { "sCleCorApp" => ""}
}
####
mutate { convert => [ "dtdatestockage", "string" ]
}
date { match => [ "dtdatestockage", "YYYY-MM-dd'T'HH:mm:ss.SSS'Z'" ] target => "unixTime" timezone => "UTC" }
ruby {
code => "event.set('unixTime' , event.get('unixTime').to_i*1000)"
}
##</Setup unixTime>##
date { match => [ "sdtcapture", "YYYYMMddHH:mm:ss" ] target => "sdtcapture" timezone => "UTC" }
ruby {
code => "event.set('sdtcapture' , event.get('sdtcapture').to_i*1000)"
}
mutate {
add_field => { "dtbutoire" => "%{[unixTime]}"}
}
mutate{
convert => {"dtbutoire" =>"integer"}
}
uuid {
target => "uuid"
overwrite => true
}
mutate {
add_field => { "rowNumber" => "%{[rownum]}"}
}
mutate {
add_field => { "id" => "%{[uuid]}"} rename => {"dtdatestockage" => "dtDateStockage"} rename => {"sdtcapture" => "dtDateCapture"} convert => {"rowNumber" =>"integer"} }
##</Setup unixTime>##
}
else if ("FEX" in [squalifier1]) {
kv{ source => "spartievariable" field_split => "|" value_split => "|" target => "attributes" } ###<modification idf> grok { match => { "[attributes][Idf]" => "%{DATA:[attributes][bonIdf]}-.*" } } mutate{ rename => {"[attributes][bonIdf]" => "[attributes][Idf]"} } ###</modification idf> ###Creation Destinataire if ("E" in [attributes][SensT]){ if ("" in [attributes][Part]){ mutate { add_field => { "[attributes][Destinataire]" => "%{[attributes][Part]}"} } }else if("" in [attributes][Idf]){ mutate { add_field => { "[attributes][Destinataire]" => "%{[attributes][Idf]}"} } }else if("" in [attributes][SrvD]){ mutate { add_field => { "[attributes][Destinataire]" => "%{[attributes][SrvD]}"} } } }else if("R" in [attributes][SensT]){ mutate { add_field => { "[attributes][Destinataire]" => "%{[attributes][SrvL]}"} } } ###Creation Emeteur if ("R" in [attributes][SensT]){ if ("" in [attributes][Part]){ mutate { add_field => { "[attributes][Emeteur]" => "%{[attributes][Part]}"} } }else if("" in [attributes][Idf]){ mutate { add_field => { "[attributes][Emeteur]" => "%{[attributes][Idf]}"} } }else if("" in [attributes][SrvD]){ mutate { add_field => { "[attributes][Emeteur]" => "%{[attributes][SrvD]}"} } } }else if("E" in [attributes][SensT]){ mutate { add_field => { "[attributes][Emeteur]" => "%{[attributes][SrvL]}"} } } ##Renomage valeurs dde Attributes mutate{ rename => {"[attributes][FicD]" => "[attributes][fichierEmis]"} rename => {"[attributes][DateF]" => "[attributes][dateFin]"} rename => {"[attributes][DateD]" => "[attributes][dateDebut]"} rename => {"[attributes][Statut]" => "[attributes][statut]"} } ##<Setup Clés> mutate { rename => {"squalifier1" => "sCleUniqueInstance"} rename => {"squalifier2" => "sCleUniqueEtat"} rename => {"squalifier3" => "sCleUniqueComplement"} add_field => { "sCleCorOrigin" => "%{[attributes][Idf]}"} add_field => { "sCleCorObjet" => "%{[attributes][SrvD]}"} add_field => { "sCleCorApp" => "TRANSMIS (Ref:%{[attributes][SrvD]}-%{[attributes][FicL]} )"} } ##</Setup Clés> ##<Setup unixTime>## # date { # match => [ "dtdatestockage", "dd/MM/YYYY HH:mm" ] # target => "unixTime" # timezone => "Europe/Paris" #} mutate { convert => [ "dtdatestockage", "string" ] } date { match => [ "dtdatestockage", "YYYY-MM-dd'T'HH:mm:ss.SSS'Z'" ] target => "unixTime" timezone => "UTC" } ruby { code => "event.set('unixTime' , event.get('unixTime').to_i*1000)" } ##</Setup unixTime>## date { match => [ "sdtcapture", "YYYYMMddHH:mm:ss" ] target => "sdtcapture" timezone => "UTC" } ruby { code => "event.set('sdtcapture' , event.get('sdtcapture').to_i*1000)" } mutate { add_field => { "dtbutoire" => "%{[unixTime]}"} } mutate{ convert => {"dtbutoire" =>"integer"} } uuid { target => "uuid" overwrite => true }
mutate {
add_field => { "rowNumber" => "%{[rownum]}"}
}
mutate {
add_field => { "id" => "%{[uuid]}"} rename => {"dtdatestockage" => "dtDateStockage"} rename => {"sdtcapture" => "dtDateCapture"} convert => {"rowNumber" =>"integer"}
}
}
#else{
#drop{}
#}
}
output{
if ("" in [sCleCorApp]){
stdout {}
#file {
path => "/projets/nsc/home/nscusrm1/test.txt"
codec => rubydebug
#}
kafka {
bootstrap_servers =>"nsc-mid-dev00003.angers.cnp.fr:9092" codec => json topic_id => "entryTopic" }
elasticsearch {
hosts => ["https://dk-shr00-dev.intranet.cnp.fr:9200"] cacert => '/projets/nsc/home/nscusrm1/livrable/jee/tomcat-nsc-middle/conf/local/ca.crt' user => 'nsc00_3' password =>'v5va29Wdp' index => "dknsc00cre-2020.17"
}
}
else{
elasticsearch { hosts => ["https://dk-shr00-dev.intranet.cnp.fr:9200"] cacert => '/projets/nsc/home/nscusrm1/livrable/jee/tomcat-nsc-middle/conf/local/ca.crt'
user => 'nsc00_3' password =>'v5va29Wdp' index => "dknsc00inutilescre-2020.17"
}
}
}
It works fine launching logstash with this conf file but when I try to use kibana logstash pipeline, the buttoncreate and deploy
does not do anything and my pipeline is not created nor deployed. I already succeded to deploy a pipeleine with a simpler conf :
input {
jdbc {
jdbc_driver_class => "Java::oracle.jdbc.driver.OracleDriver"
jdbc_connection_string => "jdbc:oracle:thin:@CLORADZ-SCAN:1521/DONSC01"
jdbc_user => "NSC"
jdbc_password=> "adm"
schedule => "* * * * *"
statement => "SELECT ROWNUM, sdtcapture, sdatvac, squalifier1, dtdatestockage, squalifier2, spartievariable, sserveur,squalifier3 FROM TBL_CRE where dtdatestockage>:sql_last_value ORDER BY dtdatestockage ,sdtcapture,ROWNUM FETCH FIRST 200 ROWS WITH TIES"
use_column_value => true
tracking_column => dtdatestockage
tracking_column_type => "timestamp"
clean_run => true
}
}
filter {
if ("JOB" in [squalifier3]) {
grok{
match => {"spartievariable" => "!%{DATA:dtdebutReel}!%{DATA:dtfinReel}!%{DATA:sStatut}!%{DATA:idureeReel}!%{DATA:sjob}!"}
}
}
}
output {
stdout{}
}
I hope you can help me solve this issue
note: I do not have access to the elastic logs.