Hi,
I'm new to logstash and I'm trying to check if an sql job (and its steps) runs successfully.
The query gets the step 0 (job output) and all the steps that have errors or warnings.
I want to gather all the steps info first and fill the errors/warnings fields. For that I use aggregate.
The issue is that, despite the query "order by" to get the all the steps different than 0 first and then the step 0, the step 0 "event" is processed first most of the times. Why? Can't I define an order?
I have tried to define pipeline.workers: 1, on the pipelines.yml, but still facing the same issue.
The conf file is below.
input {
jdbc {
jdbc_connection_string => "jdbc:sqlserver://PT-SRVSQL1LX1;database=msdb"
jdbc_user => "xxxxxxxxx"
jdbc_password => "xxxxxxxxxxxxx"
jdbc_driver_library => "C:/Program Files/sqljdbc_7.4/enu/mssql-jdbc-7.4.1.jre8.jar"
jdbc_driver_class => "com.microsoft.sqlserver.jdbc.SQLServerDriver"
schedule => "*/10 * * * *"
parameters => { "job_id" => "xxxxxxxxxxxxxxxFE-FFC0370F682A" }
statement_filepath => "../config/sql_queries/sql_job_last_status.sql"
type => "sqljob"
}
}
filter {
mutate {
rename => { "instanceid" => "event_id" }
rename => { "hostname" => "event_hostname" }
rename => { "errormessage" => "message" }
}
mutate {
convert => {"startdatetime" => "string"}
}
date {
match => ["startdatetime","ISO8601"]
target => "@timestamp"
}
if [stepid] == 0 {
#get general info from the job (step 0)
aggregate {
task_id => "%{jobname}"
code => "event.set('warnings', map['warnings']); event.set('errors', map['errors']);"
end_of_task => true
}
if "Succeeded" in [runstatus] {
mutate {
replace => { "message" => "The job finish with success" }
add_field => {"process_status" => 0 }
}
}
else {
mutate {
replace => { "message" => "The job finish with error: %{message}" }
add_field => {"process_status" => 1 }
}
}
mutate {
add_field => { "[@metadata][nextstartdatetime]" => "%{nextstartdatetime}" }
add_field => { "[@metadata][hourssincelastexec]" => "%{hourssincelastexec}" }
}
}
else {
#get info from the job steps
if [sqlseverity] == 0 {
aggregate {
task_id => "%{jobname}"
code => "map['warnings'] ||= ''; map['warnings'] += ('Step ' + event.get('stepid').to_s() + ' - ' + event.get('message'));"
}
}
else {
aggregate {
task_id => "%{jobname}"
code => "map['errors'] ||= ''; map['errors'] += ('Step ' + event.get('stepid').to_s() + ' - ' + event.get('message'));"
}
}
drop {}
}
mutate {
#remove all additional fields
remove_field => [ "hostname" ]
remove_field => [ "jobid" ]
#remove_field => [ "jobname" ]
remove_field => [ "stepid" ]
#remove_field => [ "stepname" ]
remove_field => [ "sqlmessageid" ]
remove_field => [ "sqlseverity" ]
remove_field => [ "errormessage" ]
remove_field => [ "startdatetime" ]
remove_field => [ "runduration" ]
remove_field => [ "instanceid" ]
remove_field => [ "nextstartdatetime" ]
remove_field => [ "hourssincelastexec" ]
}
}
output {
elasticsearch {
document_id => "%{event_hostname}-%{type}-%{event_id}"
index => "sqljobs"
}
stdout {
codec => rubydebug
}
}
Thank you in advance