Can statement in jdbc_streaming filter using a field from input?

I have a pipeline like below:

input {
  stdin {}

filter {
  kv {}
  jdbc_streaming {
    jdbc_driver_library => "D:\Work\mysql-connector-java-5.1.43-bin.jar"
    jdbc_driver_class => "com.mysql.jdbc.Driver"
    jdbc_connection_string => "jdbc:mysql://xxxxx:3306/testdb"
    jdbc_user => "xxx"
    jdbc_password => "xxx"
    statement => ':statement'
    parameters => { "statement" => "sql"}
    target => "country_details"

output {
    stdout { codec => "json"}


the input like " sql='select * from A' " and I'd like using the sql field as the statement in filter, but got the error blow:

[2017-09-28T18:24:09,258][WARN ][logstash.filters.jdbcstreaming] Exception when
executing JDBC query {:exception=>#<Sequel::DatabaseError: Java::ComMysqlJdbcExc
eptionsJdbc4::MySQLSyntaxErrorException: You have an error in your SQL syntax; c
heck the manual that corresponds to your MySQL server version for the right synt
ax to use near ''SELECT DEFECT_CODE FROM DEFECT_INFO limit 1'' at line 1>}
,"country_details":[{}],"message":"sql=\"SELECT DEFECT_CODE FROM DEFECT_INFO lim
it 1\"\r","sql":"SELECT DEFECT_CODE FROM DEFECT_INFO limit 1","tags":["_jdbcstre

Looks like the var replace is success but query executing failed, is there any way out?

