Logstash jdbc input mantain sql row order

Hi,
I'm new to logstash and I'm trying to check if an sql job (and its steps) runs successfully.
The query gets the step 0 (job output) and all the steps that have errors or warnings.

I want to gather all the steps info first and fill the errors/warnings fields. For that I use aggregate.

The issue is that, despite the query "order by" to get the all the steps different than 0 first and then the step 0, the step 0 "event" is processed first most of the times. Why? Can't I define an order?

I have tried to define pipeline.workers: 1, on the pipelines.yml, but still facing the same issue.

The conf file is below.


input {
  jdbc {
   jdbc_connection_string => "jdbc:sqlserver://PT-SRVSQL1LX1;database=msdb"
   jdbc_user => "xxxxxxxxx"
   jdbc_password => "xxxxxxxxxxxxx"
   jdbc_driver_library => "C:/Program Files/sqljdbc_7.4/enu/mssql-jdbc-7.4.1.jre8.jar"
   jdbc_driver_class => "com.microsoft.sqlserver.jdbc.SQLServerDriver"
   schedule => "*/10 * * * *"
   parameters => { "job_id" => "xxxxxxxxxxxxxxxFE-FFC0370F682A" }
   statement_filepath => "../config/sql_queries/sql_job_last_status.sql"
   type => "sqljob"
  }
}
filter {
	mutate { 
		rename => { "instanceid" => "event_id" }
		rename => { "hostname" => "event_hostname" }
		rename => { "errormessage" => "message" }
	}
	mutate {
		convert => {"startdatetime" => "string"}
	}
	date {
		match => ["startdatetime","ISO8601"]
		target => "@timestamp"
	} 
	if [stepid] == 0 {
		#get general info from the job (step 0) 
		aggregate {
			task_id => "%{jobname}"
			code => "event.set('warnings', map['warnings']); event.set('errors', map['errors']);"
			end_of_task => true
		}
		if "Succeeded" in [runstatus]  {
			mutate { 
				replace => { "message" => "The job finish with success" }
				add_field => {"process_status" => 0 }
			}				
		}
		else {
			mutate { 
				replace => { "message" => "The job finish with error: %{message}" }
				add_field => {"process_status" => 1 }
			}	
		}
		mutate {
			add_field => { "[@metadata][nextstartdatetime]" => "%{nextstartdatetime}" } 
			add_field => { "[@metadata][hourssincelastexec]" => "%{hourssincelastexec}" } 
		}
	}
	else {
		#get info from the job steps
		if [sqlseverity] == 0 {
			aggregate {
				task_id => "%{jobname}"
				code => "map['warnings'] ||= ''; map['warnings'] += ('Step ' + event.get('stepid').to_s() + ' - ' + event.get('message'));"
			}
		}
		else {
			aggregate {
				task_id => "%{jobname}"
				code => "map['errors'] ||= ''; map['errors'] += ('Step ' + event.get('stepid').to_s() + ' - ' + event.get('message'));"
			}
		}
		drop {}
	}
	mutate { 
		#remove all additional fields
		remove_field => [ "hostname" ]
		remove_field => [ "jobid" ]
		#remove_field => [ "jobname" ]
		remove_field => [ "stepid" ]
		#remove_field => [ "stepname" ]
		remove_field => [ "sqlmessageid" ]
		remove_field => [ "sqlseverity" ]
		remove_field => [ "errormessage" ]
		remove_field => [ "startdatetime" ]
		remove_field => [ "runduration" ]
		remove_field => [ "instanceid" ]
		remove_field => [ "nextstartdatetime" ]
		remove_field => [ "hourssincelastexec" ]
	}

}
output {
	elasticsearch {
	document_id => "%{event_hostname}-%{type}-%{event_id}"
	index => "sqljobs"
	}
	
	stdout {
      codec => rubydebug
    }
}

Thank you in advance

Did you also specify pipeline.ordered?

No, I should put it on the pipelines.yml or on the logstash.yml?

Set it in logstash.yml

Ok, but keep in mind that I have three pipelines and I just want this behavior on this pipeline. It's ok to define it on logstash.yml?

If you set it to auto then it will only apply to pipelines that have pipeline.workers set to 1. If the other two pipelines have more than one worker they will not preserve order.

After I added "pipeline.ordered: auto" to logstash.yml I'm getting this error:

An unexpected error occurred! {:error=>#<ArgumentError: Setting "pipeline.ordered" hasn't been registered>, :backtrace=>["C:/ELK-Stack/logstash/logstash-core/lib/logstash/settings.rb:69:in `get_setting'", "C:/ELK-Stack/logstash/logstash-core/lib/logstash/settings.rb:102:in `set_value'", "C:/ELK-Stack/logstash/logstash-core/lib/logstash/settings.rb:121:in `block in merge'", "org/jruby/RubyHash.java:1417:in `each'", "C:/ELK-Stack/logstash/logstash-core/lib/logstash/settings.rb:121:in `merge'", "C:/ELK-Stack/logstash/logstash-core/lib/logstash/settings.rb:179:in `validate_all'", "C:/ELK-Stack/logstash/logstash-core/lib/logstash/runner.rb:284:in `execute'", "C:/ELK-Stack/logstash/vendor/bundle/jruby/2.5.0/gems/clamp-0.6.5/lib/clamp/command.rb:67:in `run'", "C:/ELK-Stack/logstash/logstash-core/lib/logstash/runner.rb:242:in `run'", "C:/ELK-Stack/logstash/vendor/bundle/jruby/2.5.0/gems/clamp-0.6.5/lib/clamp/command.rb:132:in `run'", "C:\ELK-Stack\logstash\lib\bootstrap\environment.rb:73:in `'"]}

Check the indentation. If you are picking up the new configuration using auto-restart then do a clean stop and start of logstash. There have been bugs where auto-restart does not completely reset the configuration state.

Hi @Badger,

The issue wasn't the indentation, it was related with the logstash version.
The version installed was very old, 7.5.1.After I install the version 8.6.1 it worked just fine.
Thank you for all the help.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.