Configuration error in logstash pipeline for multiline sql statements

We have configured centralized pipeline feature for logstash. When we are using a multi line SQL statement we are getting error as below.

[2022-04-18T08:19:05,193][ERROR][logstash.agent ] Failed to execute action {:action=>LogStash::PipelineAction::Create/pipeline_id:eigl-pipeline, :exception=>"LogStash::ConfigurationError", :message=>"Expected one of [ \t\r\n], "#", "{", "}" at line 13, column 13 (byte 570) after input {\r\n jdbc {\r\n jdbc_driver_library => "/usr/share/logstash/logstash-core/lib/jars/mssql-jdbc-9.2.1.jre11.jar"\r\n jdbc_driver_class => "com.microsoft.sqlserver.jdbc.SQLServerDriver"\r\n jdbc_connection_string => "jdbc:sqlserver://10.204.209.69;databaseName="\r\n jdbc_user => "****"\r\n jdbc_password => ""\r\n schedule => "*/1 * * * *"\r\n\trecord_last_run => true\r\n statement => "SELECT top 5\r\n\t\tRIGHT('00' + DAY(u.CREATEDTS),2) DAYOFMONTH,\r\n\t\tFORMAT(u.CREATEDTS, 'MMMM') MONTH, \r\n\t\tDATEPART("", :backtrace=>["/usr/share/logstash/logstash-core/lib/logstash/compiler.rb:32:in compile_imperative'", "org/logstash/execution/AbstractPipelineExt.java:187:in initialize'", "org/logstash/execution/JavaBasePipelineExt.java:72:in initialize'", "/usr/share/logstash/logstash-core/lib/logstash/java_pipeline.rb:47:in initialize'", "/usr/share/logstash/logstash-core/lib/logstash/pipeline_action/create.rb:52:in execute'", "/usr/share/logstash/logstash-core/lib/logstash/agent.rb:383:in block in converge_state'"]}

The pipeline we are using is as below

input {
  jdbc {
    jdbc_driver_library => "/usr/share/logstash/logstash-core/lib/jars/mssql-jdbc-9.2.1.jre11.jar"
    jdbc_driver_class => "com.microsoft.sqlserver.jdbc.SQLServerDriver"
    jdbc_connection_string => "jdbc:sqlserver://*****;databaseName=*******"
    jdbc_user => "*****"
    jdbc_password => "*******"
    schedule => "*/1 * * * *"
	record_last_run => true
    statement => "SELECT  top 5
		RIGHT('00' + DAY(u.CREATEDTS),2) DAYOFMONTH,
		FORMAT(u.CREATEDTS, 'MMMM') MONTH,  
		DATEPART("yyyy",u.CREATEDTS) YEAR,
		FORMAT(u.CREATEDTS, 'dddd') WEEKDAY,
		RIGHT('00' + CONVERT("NVARCHAR",DATEPART(hh,u.CREATEDTS)),2) HOUR,
		RIGHT('00' + CONVERT("NVARCHAR",DATEPART(mi,u.CREATEDTS)),2) MINUTES,
		DATEPART("qq",u.CREATEDTS) QTROFYEAR,
		RIGHT('000' + CONVERT("NVARCHAR",DATEPART("dy",u.CREATEDTS)),3) DAYOFYEAR,
		u.USAGEDATAID ID, 
		u.EVENTID EVENT_ID, 
		u.PARENTEVENTID PARENT_EVENT_ID, 
		u.CREATEDTS CREATE_DTS, 
		u.REQUESTDTS REQUEST_DTS, 
		u.MPNAME CONTAINER_KEY, 
		u.CLIENTHOST CLIENT_HOSTNAME, 
		u.LISTENERURL LISTENER_URL, 
		u.NEXTHOPURL NEXTHOP_URL, 
		u.APPUSERNAME CONSUMER_USERID, 
		u.REQUSERNAME ENDUSER_USERID, 
		u.OTHERUSERNAMES OTHER_USERNAMES, 
		u.RESPONSETIME TOTAL_RESPONSETIME_MSEC, 
		u.NEXTHOPRESPTIME NEXTHOP_RESPONSETIME_MSEC, 
		u.REQMSGSIZE REQUEST_MSGSIZE_BYTES, 
		u.RESPMSGSIZE RESPONSE_MSGSIZE_BYTES, 
		u.NMREQMSGSIZE REQUEST_NORM_MSGSIZE_BYTES, 
		u.NMRESPMSGSIZE RESPONSE_NORM_MSGSIZE_BYTES, 
		u.VERB HTTP_VERB, 
		u.STATUS_CODE HTTP_STATUSCODE, 
		u.ISSOAPFLTBYMP IS_FAULT_BY_CONTAINER, 
		u.ISSOAPFLTBYNEXTHOP IS_FAULT_BY_NEXTHOP, 
		u.ERRCATEGORY ERROR_CATEGORY,
		u.ERRMESSAGE ERROR_MESSAGE, 
		u.ERRDETAILS ERROR_DETAILS, 
		u.CUSTOMFIELD1 CUSTOM_FIELD,
		c.CONTRACTNAME CONTRACT_NAME, 
		nn.NAME ORGANIZATION_NAME, 
		sn.NAME SERVICE_NAME, 
		op.NAME OPERATION_NAME 
		from (
			MO_USAGEDATA u 
			left outer join ( SVC_OPERATIONS sop inner join ENTITY_NAMES op on (op.ENTITYKEY = sop.OPERATIONKEY and op.TYPE = 'OP-')) on ( sop.OPERATIONID = u.OPERATIONID) 
			left outer join ( UDDI_SERVICE s inner join ENTITY_NAMES sn on (sn.ENTITYKEY = s.SERVICE_KEY and sn.TYPE = 'S-')) on (s.BUSINES_SERVICE_ID = u.SERVICEID) 
			left outer join CONTRACTS_VERSIONS c on c.CONTRACTVERSIONID = u.CONTRACTID 
			left outer join ( UDDI_BUSINESS ub inner join ENTITY_NAMES nn on ( nn.ENTITYKEY = ub.BUSINESS_KEY and nn.TYPE = 'O-')) on (ub.BUSINESS_ENTITY_ID = u.ORGID) 
			)"
    use_column_value => true
	tracking_column => "id"
  }
}
filter {
	mutate { convert => ["YEAR","integer"]}
	mutate { convert => ["QTROFYEAR","integer"]}	
	mutate { convert => ["ID","integer"]}
	mutate { convert => ["TOTAL_RESPONSETIME_MSEC","integer"]}
	mutate { convert => ["NEXTHOP_RESPONSETIME_MSEC","integer"]}
	mutate { convert => ["REQUEST_MSGSIZE_BYTES","integer"]}
	mutate { convert => ["RESPONSE_MSGSIZE_BYTES","integer"]}
	mutate { convert => ["REQUEST_NORM_MSGSIZE_BYTES","integer"]}
	mutate { convert => ["RESPONSE_NORM_MSGSIZE_BYTES","integer"]}
	mutate { convert => ["HTTP_STATUSCODE","integer"]}
	geoip  { source  => "CLIENT_HOSTNAME" }	
	translate {
        field => "[HTTP_STATUSCODE]"
        destination => "[HTTP_STATUS_DESCRIPTION]"
        dictionary => {
          "200" => "OK"
		  "202" => "Accepted"			  
          "500" => "Server Error"
		  "400" => "Not Found"		
		  "401" => "Unauthorised"
		  "403" => "Forbidden"
		  "404" => "Bad Request"		  	  
        }
        fallback => "Unknown"
      }
}
output {
  elasticsearch {
    ssl => true
    ssl_certificate_verification => false
    hosts => ["*****"]
    user => "*****"
    password => "*******"
    index => "dk.eigl.tst.%{+YYYY}"
    enable_metric => true
    ilm_enabled => false # Prevents the ILM policy from kicking in while Logstash is running
    "action" => "create" # Required for data streams
  }
}

Are we doing anything wrong here ? Cant we use a multi line SQL statement with JDBC plugin ?

You have double quotes in several lines of your SQL statement, and they are terminating the statement option. Use single quotes, or escape them using \.

Thanks. I modified the pipeline to use

last_run_metadata_path =>/usr/share/logstash/logstash_last_run_prod_archivedb.txt"
statement_filepath => "/usr/share/logstash/query_prod_archivedb.txt"

Now its working!

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.