Kibana logstash pipeline, can't create pipeline

HI,

I have this big logstash conf :

input{

jdbc {

jdbc_driver_class => "Java::oracle.jdbc.driver.OracleDriver"

jdbc_connection_string => "jdbc:oracle:thin:@CLORADZ-SCAN:1521/DONSC01"

jdbc_user => "NSC"

jdbc_password=> "adm"

schedule => "* * * * *"

statement => "SELECT ROWNUM, sdtcapture, sdatvac, squalifier1, dtdatestockage, squalifier2, spartievariable, sserveur,squalifier3 FROM TBL_CRE where dtdatestockage>:sql_last_value ORDER BY dtdatestockage ,sdtcapture,ROWNUM  FETCH FIRST 200 ROWS WITH TIES"

use_column_value => true

tracking_column => dtdatestockage

tracking_column_type => "timestamp"

clean_run => true

}

}

filter {

if ("JOB" in [squalifier3]) {

grok {

match => { "spartievariable" => "!%{DATA:dtdebutReel}!%{DATA:dtfinReel}!%{DATA:sStatut}!%{DATA:idureeReel}!%{DATA:sjob}!" }

}

ruby{

code => "if event.get('squalifier1').length < 6 then

event.set('new_msg','test_message')

end

"

}

mutate {

add_field => {"[attributes][dtdebutReel]" => "%{[dtdebutReel]}"}

add_field => {"[attributes][dtfinRéel]" => "%{[dtfinReel]}"}

add_field => {"[attributes][sStatut]" => "%{[sStatut]}"}

add_field  => { "[attributes][idureeReel]" => "%{[idureeReel]}"}

add_field => { "[attributes][sjob]" => "%{[sjob]}"}

}

mutate {

rename => {"squalifier1" => "sCleUniqueInstance"}

rename => {"squalifier2" => "sCleUniqueEtat"}

rename => {"squalifier3" => "sCleUniqueComplement"}

add_field  => { "sCleCorOrigin" => "MVS"}

add_field => { "sCleCorObjet" => "%{[sStatut]}"}

add_field => { "sCleCorApp" => ""}

}

####

mutate {

  convert => [ "dtdatestockage", "string" ]

}

date {

match => [ "dtdatestockage", "YYYY-MM-dd'T'HH:mm:ss.SSS'Z'" ]

target => "unixTime"

timezone => "UTC"

}

ruby {

code => "event.set('unixTime' , event.get('unixTime').to_i*1000)"

}

##</Setup unixTime>##

date {

    match => [ "sdtcapture", "YYYYMMddHH:mm:ss" ]

    target => "sdtcapture"

    timezone => "UTC"

}

ruby {

code => "event.set('sdtcapture' , event.get('sdtcapture').to_i*1000)"

}

mutate {

add_field  => { "dtbutoire" => "%{[unixTime]}"}

}

mutate{

convert => {"dtbutoire" =>"integer"}

}

uuid {

target => "uuid"

overwrite => true

}

mutate {

  add_field  => { "rowNumber" => "%{[rownum]}"}

}

mutate {

  add_field  => { "id" => "%{[uuid]}"}

  rename => {"dtdatestockage" => "dtDateStockage"}

  rename => {"sdtcapture" => "dtDateCapture"}

  convert => {"rowNumber" =>"integer"}

} 

##</Setup unixTime>##

}

else if ("FEX" in [squalifier1]) {

kv{

  source => "spartievariable"

  field_split => "|"

  value_split => "|"

  target => "attributes"

}

###<modification idf>

grok {

  match => { "[attributes][Idf]" => "%{DATA:[attributes][bonIdf]}-.*" }

}

mutate{

  rename => {"[attributes][bonIdf]" => "[attributes][Idf]"}

}

###</modification idf>

###Creation Destinataire

if ("E" in [attributes][SensT]){

  if ("" in [attributes][Part]){

    mutate {

      add_field => { "[attributes][Destinataire]" => "%{[attributes][Part]}"}

    }

  }else if("" in [attributes][Idf]){

    mutate {

      add_field => { "[attributes][Destinataire]" => "%{[attributes][Idf]}"}

    }

  }else if("" in [attributes][SrvD]){

    mutate {

      add_field => { "[attributes][Destinataire]" => "%{[attributes][SrvD]}"}

    }

  }

}else if("R" in [attributes][SensT]){

    mutate {

      add_field => { "[attributes][Destinataire]" => "%{[attributes][SrvL]}"}

    }

}

###Creation Emeteur

if ("R" in [attributes][SensT]){

  if ("" in [attributes][Part]){

    mutate {

      add_field => { "[attributes][Emeteur]" => "%{[attributes][Part]}"}

    }

  }else if("" in [attributes][Idf]){

    mutate {

      add_field => { "[attributes][Emeteur]" => "%{[attributes][Idf]}"}

    }

  }else if("" in [attributes][SrvD]){

    mutate {

      add_field => { "[attributes][Emeteur]" => "%{[attributes][SrvD]}"}

    }

  }

}else if("E" in [attributes][SensT]){

    mutate {

      add_field => { "[attributes][Emeteur]" => "%{[attributes][SrvL]}"}

    }

}

##Renomage valeurs dde Attributes

mutate{

  rename => {"[attributes][FicD]" => "[attributes][fichierEmis]"}

  rename => {"[attributes][DateF]" => "[attributes][dateFin]"}

  rename => {"[attributes][DateD]" => "[attributes][dateDebut]"}

  rename => {"[attributes][Statut]" => "[attributes][statut]"}

}

##<Setup Clés>

mutate {

  rename => {"squalifier1" => "sCleUniqueInstance"}

  rename => {"squalifier2" => "sCleUniqueEtat"}

  rename => {"squalifier3" => "sCleUniqueComplement"}

  add_field  => { "sCleCorOrigin" => "%{[attributes][Idf]}"}

  add_field => { "sCleCorObjet" => "%{[attributes][SrvD]}"}

  add_field => { "sCleCorApp" => "TRANSMIS (Ref:%{[attributes][SrvD]}-%{[attributes][FicL]} )"}

}

##</Setup Clés>

##<Setup unixTime>##

#  date {

#  match => [ "dtdatestockage", "dd/MM/YYYY HH:mm" ]

#  target => "unixTime"

#  timezone => "Europe/Paris"

#}

mutate {

   convert => [ "dtdatestockage", "string" ]

}

 date {

 match => [ "dtdatestockage", "YYYY-MM-dd'T'HH:mm:ss.SSS'Z'" ]

 target => "unixTime"

 timezone => "UTC"

 }

ruby {

  code => "event.set('unixTime' , event.get('unixTime').to_i*1000)"

}

##</Setup unixTime>##

 date {

     match => [ "sdtcapture", "YYYYMMddHH:mm:ss" ]

     target => "sdtcapture"

     timezone => "UTC"

 }

ruby {

  code => "event.set('sdtcapture' , event.get('sdtcapture').to_i*1000)"

}

mutate {

  add_field  => { "dtbutoire" => "%{[unixTime]}"}

}

mutate{

  convert => {"dtbutoire" =>"integer"}

}

uuid {

target => "uuid"

overwrite => true

}

mutate {

  add_field  => { "rowNumber" => "%{[rownum]}"}

}

mutate {

 add_field  => { "id" => "%{[uuid]}"}

 rename => {"dtdatestockage" => "dtDateStockage"}

 rename => {"sdtcapture" => "dtDateCapture"}

 convert => {"rowNumber" =>"integer"}

}

}

#else{

#drop{}

#}

}

output{

if ("" in [sCleCorApp]){

stdout {}

#file {

path => "/projets/nsc/home/nscusrm1/test.txt"

codec => rubydebug

#}

kafka {

    bootstrap_servers =>"nsc-mid-dev00003.angers.cnp.fr:9092"

    codec => json

    topic_id => "entryTopic"

}

elasticsearch {

hosts => ["https://dk-shr00-dev.intranet.cnp.fr:9200"]

cacert => '/projets/nsc/home/nscusrm1/livrable/jee/tomcat-nsc-middle/conf/local/ca.crt'

user => 'nsc00_3'

password =>'v5va29Wdp'

index => "dknsc00cre-2020.17"

}

}

else{

elasticsearch { hosts => ["https://dk-shr00-dev.intranet.cnp.fr:9200"] cacert => '/projets/nsc/home/nscusrm1/livrable/jee/tomcat-nsc-middle/conf/local/ca.crt'

user => 'nsc00_3'                                                                                                       password =>'v5va29Wdp'                                                                                                  index => "dknsc00inutilescre-2020.17"

}

}

}
It works fine launching logstash with this conf file but when I try to use kibana logstash pipeline, the button create and deploy does not do anything and my pipeline is not created nor deployed. I already succeded to deploy a pipeleine with a simpler conf :
input {
jdbc {
jdbc_driver_class => "Java::oracle.jdbc.driver.OracleDriver"
jdbc_connection_string => "jdbc:oracle:thin:@CLORADZ-SCAN:1521/DONSC01"
jdbc_user => "NSC"
jdbc_password=> "adm"
schedule => "* * * * *"
statement => "SELECT ROWNUM, sdtcapture, sdatvac, squalifier1, dtdatestockage, squalifier2, spartievariable, sserveur,squalifier3 FROM TBL_CRE where dtdatestockage>:sql_last_value ORDER BY dtdatestockage ,sdtcapture,ROWNUM FETCH FIRST 200 ROWS WITH TIES"
use_column_value => true
tracking_column => dtdatestockage
tracking_column_type => "timestamp"
clean_run => true
}
}
filter {
if ("JOB" in [squalifier3]) {
grok{
match => {"spartievariable" => "!%{DATA:dtdebutReel}!%{DATA:dtfinReel}!%{DATA:sStatut}!%{DATA:idureeReel}!%{DATA:sjob}!"}
}
}
}
output {
stdout{}
}

I hope you can help me solve this issue :pray:t2:

note: I do not have access to the elastic logs.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.