Logstash.filters.json throw warn exception:java.lang.ClassCastException: org.jruby.RubyArray cannot be cast to org.jruby.RubyIO


(ocean.pang) #1

elasticsearch version : 6.4.3
logstash version : 6.4.3

what i want to do?
import relationship data from mysql to elasticsearch

issue:
the relationship data in mysql like this:
rid resumeid eid orid schoolname
53 3649931 60 53 xxxUniversity
53 3649931 61 53 yyyUniversity
but when the import is completed, the data in elasticsearch like this:

    {
  "_index": "resumeindex",
  "_type": "_doc",
  "_id": "53",
  "_version": 406,
  "_score": 1,
  "_source": {
    "@timestamp": "2018-11-23T02:50:00.356Z",
    "resumeid": 3649931,
    "rid": 53,
    "tags": [
      "_jsonparsefailure"
    ],
    "education": [
      {
        "eid": 61,
        "orid": 53,
        "schoolname": "yyyUniversity"
      }
    ],
    "@version": "1"
  },
  "fields": {
    "@timestamp": [
      "2018-11-23T02:50:00.356Z"
    ]
  }
}

The education array does not contain xxxUniversity information.
And I got one warn information like this:
[2018-11-22T11:35:01,980][WARN ][logstash.filters.json ] Error parsing json {:source=>"education", :raw=>[{"orid"=>53, "schoolname"=>"xxxUniversity", "eid"=>60}], :exception=>java.lang.ClassCastException: org.jruby.RubyArray cannot be cast to org.jruby.RubyIO}

logstash conf :

input {
    jdbc {
        jdbc_driver_library => "/Users/pangyang/Documents/study/elasticSearch/mysql-connector-java-5.1.47.jar"
        jdbc_driver_class => "com.mysql.jdbc.Driver"
        jdbc_connection_string => "jdbc:mysql://localhost:3306/resume"
        jdbc_user => "root"
        jdbc_password => "py_060537"
        schedule => "*/5 * * * *"
        use_column_value => true
	    tracking_column_type => "numeric"
	    tracking_column => rid
	    record_last_run => true
        codec => json { charset => "UTF-8"}
        jdbc_default_timezone => "Asia/Shanghai"
        statement => "SELECT origin.main_id rid,origin.resume_id resumeid,
               oue.main_id eid,oue.origin_resume_id orid,oue.school_name schoolname
        FROM origin_resume origin
          JOIN origin_user_education oue on oue.origin_resume_id = origin.main_id"
    }
}
filter{
    aggregate{
        task_id => "%{rid}"
        code => "
        map['rid']=event.get('rid')
        map['resumeid']=event.get('resumeid')
        map['education'] ||= []
        map['education'] << {'eid'=>event.get('eid'),
        'orid'=>event.get('orid'),'schoolname' =>event.get('schoolname')}
        event.cancel()
      "
      push_previous_map_as_event => true
      timeout => 3
    }
    json {
         source => "education"
         target => "education"
    }
}
output {
    elasticsearch {
        index => "resumeindex"
        document_type => "_doc"
        document_id => "%{rid}"
        hosts => "http://localhost:9200"
    }
     stdout {
         codec => json { charset => "UTF-8"}
    }
}

elasticsearch mapping :

{
  "mappings": {
    "_doc": {
      "properties": {
        "rid": { "type": "long"  },
        "resumeid": { "type": "long"  },
        "education": {
          "type":"nested",
          "properties": {
            "eid": { "type": "long"  },
            "orid": {"type": "long"},
            "schoolname": { "type": "text"  }
          }
        }
      }
    }
  }
} 

could someone help me?