Input from JDBC - Convert to JSON & Push to SNS Topic


(Wayne Taylor) #1

Team,
Not sure if this is possible via one logstash pipeline so appreciate feedback.

Data Source: Oracle DB. Extract using standard JDBC input plugin.
Destination: SNS

After extracting fields from DB e.g. lets say my order table I run: "select id, customer_id, amount from orders"
I will now have 3 fields to play with.

Taking those fields I want to create that into a JSON message to look something like:
{"id":1,"customer_id":1,"amount":16}

I would then rename this JSON to sns_message and use the SNS output plugin to create a SNS Message with a valid JSON object.

Appreciate any help

Wayne


(Magnus Bäck) #2

I would then rename this JSON to sns_message

What do you mean? Do you want to wrap the object inside another object, like this?

{"sns_message": {“id”:1,“customer_id”:1,“amount”:16}}

Have you tried setting codec => json for your sns output?


(Wayne Taylor) #3

@magnusbaeck - to clarify yes. SNS Output plugin expects a field called sns_message to be sent. I need that wrapped into a 2nd object as you mentioned and then output into JSON.

I tried the approach as you mentioned but I still see message output in plain. Is there an example you could supply?

Here is my config example:

input {
jdbc {
jdbc_connection_string => "myjdbcurl"
jdbc_user => "user"
jdbc_password => "password"
jdbc_driver_library => "ojdbc8.jar"
jdbc_driver_class => "Java::oracle.jdbc.driver.OracleDriver"
statement =>

  "select order_id from order_summary where datetimestamp_created >= sysdate - interval '5' minute"
  
}

}

filter{
mutate{
add_field => { "sns_subject" => "ORDER_SUMMARY" }
add_field => { "message_type" => "ORDER" }
add_field => { "sns_message" => "%{order_id}"}
}
}

output {
sns {
arn => "arn:aws:sns:us-east-1:myarn:ORDER_SUMMARY"
codec => json
}
stdout {
codec => json
}

}


(Magnus Bäck) #4

add_field => { “sns_message” => “%{order_id}”}

This doesn't make sense. You don't want sns_message to contain the order id. To create the

{"sns_message": {"id":1,"customer_id":1,"amount":16}}

structure I wrote about earlier you can e.g. do this (and the same for customer_id and amount):

mutate {
  rename {
    "order_id" => "[sns_message][id]"
  }
}

That'll turn sns_message into a nested object that'll get serialized by the plugin according to its codec setting.


(Wayne Taylor) #5

Sorry - confused but i am unable to get all 3 values printed. Can you provide example with that please.


(Magnus Bäck) #6

Please always provide what you have tried. Sometimes mistakes can be spotted right away.

mutate {
  rename {
    "order_id" => "[sns_message][id]"
    "name-of-customer-id-field" => "[sns_message][customer_id]"
    "name-of-amount-field" => "[sns_message][amount]"
  }
}

(Wayne Taylor) #7

@magnusbaeck - so close. If i output as stdout in json I see what I want. If i use output SNS I get the following error using logstash version 5.3.0

[2017-08-29T12:00:51,225][FATAL][logstash.runner ] An unexpected error occurred! {:error=>#<NoMethodError: undefined method get' for {"order_id"=>"118087420TPDA"}:Hash>, :backtrace=>["/Users/wtaylor/Downloads/logstash-5.3.0/vendor/bundle/jruby/1.9/gems/logstash-output-sns-4.0.5/lib/logstash/outputs/sns.rb:126:inevent_arn'", "/Users/wtaylor/Downloads/logstash-5.3.0/vendor/bundle/jruby/1.9/gems/logstash-output-sns-4.0.5/lib/logstash/outputs/sns.rb:66:in register'", "org/jruby/RubyProc.java:281:incall'", "/Users/wtaylor/Downloads/logstash-5.3.0/vendor/bundle/jruby/1.9/gems/logstash-codec-json-3.0.3/lib/logstash/codecs/json.rb:42:in encode'", "/Users/wtaylor/Downloads/logstash-5.3.0/vendor/bundle/jruby/1.9/gems/logstash-output-sns-4.0.5/lib/logstash/outputs/sns.rb:78:inreceive'", "/Users/wtaylor/Downloads/logstash-5.3.0/logstash-core/lib/logstash/outputs/base.rb:92:in multi_receive'", "org/jruby/RubyArray.java:1613:ineach'", "/Users/wtaylor/Downloads/logstash-5.3.0/logstash-core/lib/logstash/outputs/base.rb:92:in multi_receive'", "/Users/wtaylor/Downloads/logstash-5.3.0/logstash-core/lib/logstash/output_delegator_strategies/shared.rb:12:inmulti_receive'", "/Users/wtaylor/Downloads/logstash-5.3.0/logstash-core/lib/logstash/output_delegator.rb:47:in multi_receive'", "/Users/wtaylor/Downloads/logstash-5.3.0/logstash-core/lib/logstash/pipeline.rb:390:inoutput_batch'", "org/jruby/RubyHash.java:1342:in each'", "/Users/wtaylor/Downloads/logstash-5.3.0/logstash-core/lib/logstash/pipeline.rb:389:inoutput_batch'", "/Users/wtaylor/Downloads/logstash-5.3.0/logstash-core/lib/logstash/pipeline.rb:346:in worker_loop'", "/Users/wtaylor/Downloads/logstash-5.3.0/logstash-core/lib/logstash/pipeline.rb:306:instart_workers'"]}

If i remove the block mutate block all together it works.

Note: I modified field names so more meaningful for me.

My modified Config

input {
    jdbc {
        jdbc_connection_string => "jdbcurl"
                jdbc_user => "user"
                jdbc_password => "password"
                jdbc_driver_library => "ojdbc8.jar"
                jdbc_driver_class => "Java::oracle.jdbc.driver.OracleDriver"
                statement => 
		
		"select * from order_summary where datetimestamp_created >= sysdate - interval '5' minute and ROWNUM = 1"
		
    }
}


filter {
  mutate {
    rename => { "order_id" => "[sns_message][order_id]" }
    rename => { "customer_id" => "[sns_message][customer_id]"}
    rename => { "source_system" => "[sns_message][source_system]"}
    rename => { "tx_routing_id" => "[sns_message][tx_routing_id]"}
    rename => { "airline_code" => "[sns_message][airline_code]"}
    rename => { "tail_number" => "[sns_message][tail_number]"}
    rename => { "flight_number" => "[sns_message][flight_number]"}
    rename => { "departure_airport" => "[sns_message][departure_airport]"}
    rename => { "destination_airport" => "[sns_message][destination_airport]"}
    rename => { "purchase_amount" => "[sns_message][purchase_amount]"}
    rename => { "discount_amount" => "[sns_message][discount_amount]"}
    rename => { "tax_amount" => "[sns_message][tax_amount]"}
    rename => { "total_amount_due" => "[sns_message][total_amount_due]"}
    rename => { "division" => "[sns_message][division]"}
    rename => { "customer_group_id" => "[sns_message][customer_group_id]"}
    rename => { "datetimestamp_created" => "[sns_message][datetimestamp_created]"}
  }
}



output {
	sns {
		codec => "json"
		arn => "arn:aws:sns:us-east-1:account:ORDER_SUMMARY"
	}

}

(Magnus Bäck) #8

Aha. I think this is a bug:

As a workaround, have you tried not setting sns_message and having Logstash encode the whole event? Another workaround could be to set sns_message to a JSON string that you construct with the json_encode filter.


(system) #9

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.