We have configured centralized pipeline feature for logstash. When we are using a multi line SQL statement we are getting error as below.
[2022-04-18T08:19:05,193][ERROR][logstash.agent ] Failed to execute action {:action=>LogStash::PipelineAction::Create/pipeline_id:eigl-pipeline, :exception=>"LogStash::ConfigurationError", :message=>"Expected one of [ \t\r\n], "#", "{", "}" at line 13, column 13 (byte 570) after input {\r\n jdbc {\r\n jdbc_driver_library => "/usr/share/logstash/logstash-core/lib/jars/mssql-jdbc-9.2.1.jre11.jar"\r\n jdbc_driver_class => "com.microsoft.sqlserver.jdbc.SQLServerDriver"\r\n jdbc_connection_string => "jdbc:sqlserver://10.204.209.69;databaseName="\r\n jdbc_user => "****"\r\n jdbc_password => ""\r\n schedule => "*/1 * * * *"\r\n\trecord_last_run => true\r\n statement => "SELECT top 5\r\n\t\tRIGHT('00' + DAY(u.CREATEDTS),2) DAYOFMONTH,\r\n\t\tFORMAT(u.CREATEDTS, 'MMMM') MONTH, \r\n\t\tDATEPART("", :backtrace=>["/usr/share/logstash/logstash-core/lib/logstash/compiler.rb:32:in
compile_imperative'", "org/logstash/execution/AbstractPipelineExt.java:187:in
initialize'", "org/logstash/execution/JavaBasePipelineExt.java:72:ininitialize'", "/usr/share/logstash/logstash-core/lib/logstash/java_pipeline.rb:47:in
initialize'", "/usr/share/logstash/logstash-core/lib/logstash/pipeline_action/create.rb:52:inexecute'", "/usr/share/logstash/logstash-core/lib/logstash/agent.rb:383:in
block in converge_state'"]}
The pipeline we are using is as below
input {
jdbc {
jdbc_driver_library => "/usr/share/logstash/logstash-core/lib/jars/mssql-jdbc-9.2.1.jre11.jar"
jdbc_driver_class => "com.microsoft.sqlserver.jdbc.SQLServerDriver"
jdbc_connection_string => "jdbc:sqlserver://*****;databaseName=*******"
jdbc_user => "*****"
jdbc_password => "*******"
schedule => "*/1 * * * *"
record_last_run => true
statement => "SELECT top 5
RIGHT('00' + DAY(u.CREATEDTS),2) DAYOFMONTH,
FORMAT(u.CREATEDTS, 'MMMM') MONTH,
DATEPART("yyyy",u.CREATEDTS) YEAR,
FORMAT(u.CREATEDTS, 'dddd') WEEKDAY,
RIGHT('00' + CONVERT("NVARCHAR",DATEPART(hh,u.CREATEDTS)),2) HOUR,
RIGHT('00' + CONVERT("NVARCHAR",DATEPART(mi,u.CREATEDTS)),2) MINUTES,
DATEPART("qq",u.CREATEDTS) QTROFYEAR,
RIGHT('000' + CONVERT("NVARCHAR",DATEPART("dy",u.CREATEDTS)),3) DAYOFYEAR,
u.USAGEDATAID ID,
u.EVENTID EVENT_ID,
u.PARENTEVENTID PARENT_EVENT_ID,
u.CREATEDTS CREATE_DTS,
u.REQUESTDTS REQUEST_DTS,
u.MPNAME CONTAINER_KEY,
u.CLIENTHOST CLIENT_HOSTNAME,
u.LISTENERURL LISTENER_URL,
u.NEXTHOPURL NEXTHOP_URL,
u.APPUSERNAME CONSUMER_USERID,
u.REQUSERNAME ENDUSER_USERID,
u.OTHERUSERNAMES OTHER_USERNAMES,
u.RESPONSETIME TOTAL_RESPONSETIME_MSEC,
u.NEXTHOPRESPTIME NEXTHOP_RESPONSETIME_MSEC,
u.REQMSGSIZE REQUEST_MSGSIZE_BYTES,
u.RESPMSGSIZE RESPONSE_MSGSIZE_BYTES,
u.NMREQMSGSIZE REQUEST_NORM_MSGSIZE_BYTES,
u.NMRESPMSGSIZE RESPONSE_NORM_MSGSIZE_BYTES,
u.VERB HTTP_VERB,
u.STATUS_CODE HTTP_STATUSCODE,
u.ISSOAPFLTBYMP IS_FAULT_BY_CONTAINER,
u.ISSOAPFLTBYNEXTHOP IS_FAULT_BY_NEXTHOP,
u.ERRCATEGORY ERROR_CATEGORY,
u.ERRMESSAGE ERROR_MESSAGE,
u.ERRDETAILS ERROR_DETAILS,
u.CUSTOMFIELD1 CUSTOM_FIELD,
c.CONTRACTNAME CONTRACT_NAME,
nn.NAME ORGANIZATION_NAME,
sn.NAME SERVICE_NAME,
op.NAME OPERATION_NAME
from (
MO_USAGEDATA u
left outer join ( SVC_OPERATIONS sop inner join ENTITY_NAMES op on (op.ENTITYKEY = sop.OPERATIONKEY and op.TYPE = 'OP-')) on ( sop.OPERATIONID = u.OPERATIONID)
left outer join ( UDDI_SERVICE s inner join ENTITY_NAMES sn on (sn.ENTITYKEY = s.SERVICE_KEY and sn.TYPE = 'S-')) on (s.BUSINES_SERVICE_ID = u.SERVICEID)
left outer join CONTRACTS_VERSIONS c on c.CONTRACTVERSIONID = u.CONTRACTID
left outer join ( UDDI_BUSINESS ub inner join ENTITY_NAMES nn on ( nn.ENTITYKEY = ub.BUSINESS_KEY and nn.TYPE = 'O-')) on (ub.BUSINESS_ENTITY_ID = u.ORGID)
)"
use_column_value => true
tracking_column => "id"
}
}
filter {
mutate { convert => ["YEAR","integer"]}
mutate { convert => ["QTROFYEAR","integer"]}
mutate { convert => ["ID","integer"]}
mutate { convert => ["TOTAL_RESPONSETIME_MSEC","integer"]}
mutate { convert => ["NEXTHOP_RESPONSETIME_MSEC","integer"]}
mutate { convert => ["REQUEST_MSGSIZE_BYTES","integer"]}
mutate { convert => ["RESPONSE_MSGSIZE_BYTES","integer"]}
mutate { convert => ["REQUEST_NORM_MSGSIZE_BYTES","integer"]}
mutate { convert => ["RESPONSE_NORM_MSGSIZE_BYTES","integer"]}
mutate { convert => ["HTTP_STATUSCODE","integer"]}
geoip { source => "CLIENT_HOSTNAME" }
translate {
field => "[HTTP_STATUSCODE]"
destination => "[HTTP_STATUS_DESCRIPTION]"
dictionary => {
"200" => "OK"
"202" => "Accepted"
"500" => "Server Error"
"400" => "Not Found"
"401" => "Unauthorised"
"403" => "Forbidden"
"404" => "Bad Request"
}
fallback => "Unknown"
}
}
output {
elasticsearch {
ssl => true
ssl_certificate_verification => false
hosts => ["*****"]
user => "*****"
password => "*******"
index => "dk.eigl.tst.%{+YYYY}"
enable_metric => true
ilm_enabled => false # Prevents the ILM policy from kicking in while Logstash is running
"action" => "create" # Required for data streams
}
}
Are we doing anything wrong here ? Cant we use a multi line SQL statement with JDBC plugin ?