Error registering plugin with jdbc_static plugin

Hi,
This is a logstash (6.5.1) process that runs fine for months now, but in order to optimize its performance, I am trying to add the jdbc_static plugin, so I don't have to lookup in logstash a "domain table", since it will keep in the cache.

When I run it from systemctl, I get the below error:

[ERROR][logstash.pipeline ] Error registering plugin {:pipeline_id=>"main", :plugin=>"#<LogStash::FilterDelegator:0xbd2fd39 @metric_events_out=org.jruby.proxy.org.logstash.instrument.metrics.counter.LongCounter$Proxy2 - name: out value:0, @metric_events_in=org.jruby.proxy.org.logstash.instrument.metrics.counter.LongCounter$Proxy2 - name: in value:0, @metric_events_time=org.jruby.proxy.org.logstash.instrument.metrics.counter.LongCounter$Proxy2 - name: duration_in_millis value:0, @id=\"511111111111111111111111142ce947205b9953faa945d423d0096c68466405\", @klass=LogStash::Filters::JdbcStatic, @metric_events=#<LogStash::Instrument::NamespacedMetric:0x5e56b472>, @filter=<LogStash::Filters::JdbcStatic jdbc_user=>\"svc_eventlog_dev\", add_field=>{\"class_label\"=>\"%{[class][0][record_label]}\", \"source_label\"=>\"%{[source][0][record_label]}\"}, local_db_objects=>[{\"name\"=>\"local_class\", \"index_columns\"=>[\"record_id\"], \"columns\"=>[[\"record_id\", \"varchar(32)\"], [\"record_label\", \"varchar(128)\"], [\"base_record_id\", \"varchar(32)\"]]}, {\"name\"=>\"local_source\", \"index_columns\"=>[\"record_id\"], \"columns\"=>[[\"record_id\", \"varchar(32)\"], [\"record_label\", \"varchar(128)\"], [\"base_record_id\", \"varchar(32)\"]]}], remove_field=>[\"class\", \"source\"], loaders=>[{\"id\"=>\"remote-OL_CLASS\", \"query\"=>\"select record_id, record_label, base_record_id from IT.OL_CLASS order by record_label\", \"local_table\"=>\"local_class\"}, {\"id\"=>\"remote-OL_SOURCE\", \"query\"=>\"select record_id, record_label, base_record_id from ITA.OL_SOURCE order by record_label\", \"local_table\"=>\"local_source\"}], staging_directory=>\"/tmp/logstash/jdbc_static/import_data\", jdbc_password=><password>, jdbc_driver_library=>\"/opt/apps/oracle/ojdbc6.jar\", jdbc_connection_string=>\"jdbc:oracle:thin:@//database.otp.com:1521/OTP_DATABASE_DEFAULT.OTP.COM\", id=>\"511111111111111111111111142ce947205b9953faa945d423d0096c68466405\", loader_schedule=>\"* */2 * * *\", jdbc_driver_class=>\"Java::oracle.jdbc.driver.OracleDriver\", local_lookups=>[{\"id\"=>\"local-OL_CLASS\", \"query\"=>\"select record_label, base_record_id from local_class WHERE record_id = :record_id\", \"parameters\"=>{\"record_id\"=>\"[record_id]\"}, \"target\"=>\"class\"}, {\"id\"=>\"local-OL_SOURCE\", \"query\"=>\"select record_label, base_record_id from local_source WHERE record_id = :record_id\", \"parameters\"=>{\"record_id\"=>\"[record_id]\"}, \"target\"=>\"source\"}], enable_metric=>true, periodic_flush=>false, tag_on_failure=>[\"_jdbcstaticfailure\"], tag_on_default_use=>[\"_jdbcstaticdefaultsused\"]>>", :error=>"", :thread=>"#<Thread:0x4aa994ef run>"}

However, it works fine when running from the command line as the user configured (su). Therefore, I'm assuming that the code is fine.

Any idea where the problem could be? I didn't post more details to avoid too much information, but if you need any detail, please let me know.

Thanks,
Rob

Please post your config here, use ``` triple backticks above and below when you paste.

It sounds like a permissions problem.

Hi,

Thanks a lot for replying. I have the same feeling, but not sure where this permission issue is happening. All the files that I am aware of that could potentially cause this issue, I checked (logs folder/file, configuration file, data folder/files, etc) and they have the user/usergroup assigned.

Here is the process configuration file:

# Destination is the name of the queue we want to get messages from.
# Additional options can be included--in this case we strip the header and properties from the XML message.
input {
	jms {
		destination => "otp.tst.elasticsearch.writeevent.queue"
		include_body => true
		include_header => false
		include_properties => false
		interval => 10
		timeout => -1
		threads => 10
		use_jms_timestamp => false
		yaml_file => "/opt/apps/jms/jms.yml"
		yaml_section => "ems"
	}
}

# This is where we transform our input.
filter {
	# We go through the XML and extract field values using grok (regex).
	# You can find a list of grok regex patterns at https://github.com/elastic/logstash/blob/v1.4.2/patterns/grok-patterns.
	grok {
		match => {
			# Incoming event XMLs are stored in the message field.
			# You can assign and retrieve variables using [@metadata][variable_name].
			"message" => [
				'<RecordId>%{WORD:record_id}</RecordId>',
				'<RecordLabel>%{GREEDYDATA:record_label}</RecordLabel>',
				'<RecordTime>%{NUMBER:record_sequence}</RecordTime>',
				'<Level>%{WORD:[@metadata][event_level_word]}</Level>',
				'<Code>%{NUMBER:event_code}</Code>',
				'<OperationId>%{WORD:operation_id}</OperationId>',
				'<SourceId>%{WORD:source_id}</SourceId>',
				'<EndTime>%{NUMBER:[@metadata][end_sequence]}</EndTime>',
				'<ExpiryToken>%{WORD:[@metadata][expiry_token]}</ExpiryToken>',
				'<CausingSourceId>%{WORD:causing_source_id}</CausingSourceId>'
			]
		}
		# Set break_on_match to false, otherwise Logstash will automatically stop parsing after the first successful match (RecordId).
		break_on_match => false
		# Strip the message and tags fields from the document (reduces storage space).
		remove_field => ["message", "tags"]
		tag_on_failure => [ ]
	}
	
  jdbc_static {
    loaders => [ 
      {
        id => "remote-OL_SOURCE"
        query => "select record_id, record_label, base_record_id from IT.OL_SOURCE order by record_label"
        local_table => "local_source"
      }
    ]
    local_db_objects => [ 
      {
        name => "local_source"
        index_columns => ["record_id"]
        columns => [
          ["record_id", "varchar(32)"],
          ["record_label", "varchar(128)"],
          ["base_record_id", "varchar(32)"]
        ]
      }
    ]
    local_lookups => [ 
      {
        id => "local-OL_SOURCE"
        query => "select record_label, base_record_id from local_source WHERE record_id = :record_id"
        parameters => {record_id => "[record_id]"}
        target => "source"
      }
    ]

	# using add_field here to add & rename values to the event root
    add_field => { source_label => "%{[source][0][record_label]}" } 
    remove_field => ["class", "source"]
    staging_directory => "/tmp/logstash/jdbc_static/import_data"
    loader_schedule => "* */2 * * *" # run loaders every 2 hours
    jdbc_user => "service_event_user"
    jdbc_password => "<password removed>"
    jdbc_driver_class => "Java::oracle.jdbc.driver.OracleDriver"
    jdbc_driver_library => "/opt/apps/oracle/ojdbc6.jar"
    jdbc_connection_string => "jdbc:oracle:thin:@//database.otp.com:1521/OTP_DATABASE_DEFAULT.OTP.COM"
  }

	# The expiry token needs to be converted to an actual date.
	# Query el_expiration in Elasticsearch to get the number of milliseconds to expiry.
	elasticsearch {
		hosts => ["elesdatatst01.otp.com:9200", "elesdatatst02.otp.com:9200", "elesdatatst03.otp.com:9200", "elesdatatst04.otp.com:9200", "elesdatatst05.otp.com:9200"]
		index => "log_el_expiration"
		query => "record_id:%{[@metadata][expiry_token]}"
		fields => { "expiry_in_milliseconds" => "[@metadata][expiry_milliseconds]" }
		sort => "record_id"
	}
	
	# RecordTime is provided in .NET ticks, need to convert this into a datetime value.
	# Need to divide expiry_milliseconds by 1000 because Ruby calculates time differences in seconds.
	# Store the converted datetimes into the record_date and record_expiry fields.
	# Also adding the type_label content to the field.
	ruby {
		code => "
			record_sequence_date = Time.at((event.get('record_sequence').to_i - 621355968000000000)/10000000);
			expiry_date = Time.at((event.get('record_sequence').to_i - 621355968000000000)/10000000 + event.get('[@metadata][expiry_milliseconds]').to_i/1000);
			event.set('record_date', record_sequence_date.strftime('%FT%R:%S'));
			event.set('record_expiry', expiry_date.strftime('%FT%R:%S'));
			event.set('[@metadata][expiry_index]', expiry_date.strftime('%Y.%m'));
		"
	}
	
	# EventLevel arrives as a string, need to convert to integer.
	# Tried to put this in a ruby block but cannot handle case statements for some reason.
	# TODO: replace section with case statement.
	if [@metadata][event_level_word] == "INFORMATION" {
		mutate {
			add_field => ["event_level", 0]
		}
	}
	else if [@metadata][event_level_word] == "NOTIFICATION" {
		mutate {
			add_field => ["event_level", 1]
		}
	}
	else if [@metadata][event_level_word] == "WARNING" {
		mutate {
			add_field => ["event_level", 2]
		}
	}
	else {
		mutate {
			add_field => ["event_level", 3]
		}
	}
}

# Specify Elasticsearch endpoints and which index to store the document.
output {
	elasticsearch {
		document_type => "record"
		hosts => ["elesdatatst01.otp.com:9200", "elesdatatst02.otp.com:9200", "elesdatatst03.otp.com:9200", "elesdatatst04.otp.com:9200", "elesdatatst05.otp.com:9200"]
		index => "log_el_event_%{[@metadata][expiry_index]}"
		id => "ELESTST01-Logstash-Event"
	}
	# This section is optional.  I included it for debug purposes, but you can comment this part out if you want.
	stdout {
		codec => rubydebug {metadata => true}
	}
}

Here is the Logstash configuration file. Removed most part of the commented out lines. I checked permissions to the folders "/opt/apps/logstash/data" and "/OPTLog/logstash" before running.

# Settings file in YAML
#
# Settings can be specified either in hierarchical form, e.g.:
path.data: /opt/apps/logstash/data

queue.type: memory
#queue.type: persisted
#queue.drain: true

queue.page_capacity: 250mb

path.logs: /OTPLog/logstash

xpack.monitoring.enabled: true
xpack.monitoring.elasticsearch.url: ["http://elesdatatst01:9200", "http://elesdatatst02:9200", "http://elesdatatst03:9200", "http://elesdatatst04:9200", "http://elesdatatst05:9200"]

Thanks for your help.

Rob

Hi,

Issue is now resolved. The problem was related to the "logstash.service" file. Under the [Service] section, User and Group were all lower caps like this:

user=service_account_name
group=group_name

When it should be:

User=service_account_name
Group=group_name

Thanks,
Rob

@roblopes

I think this might be a bug. What distribution of Logstash are you using?

I'm using 6.5.1. Just downloaded:

curl -L -O https://artifacts.elastic.co/downloads/logstash/logstash-6.5.1.tar.gz

Meaning that you set up the service side yourself?

Yes that how it is done here. I see your point now. The distribution would do that for us, but this is the way it was configured for me, and how I was instructed to do when an update is required.
The only interesting thing is that the service was running just fine, until I added the jdbc_static plugin.
That said, there is no bug, and this thread can be closed :slight_smile:
Thanks for your help!
Rob

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.