Import one-to-many relation from SQL

I try to import job ads form two MySQL tables (job data and locations) but I'm facing a problem when a job ad has multiple locations. I'm using this MySQL query:

SELECT id, company, jobtitle, description, priority, DATE_FORMAT(date, '%Y-%m-%d %T') AS date, sa_locations.location AS location_name, sa_locations.lat AS location_lat, sa_locations.lon AS location_lon
FROM sa_data JOIN sa_locations
ON sa_data.id = sa_locations.id
ORDER BY id

Ignoring the location problem everything is ok, and I receive results like this:

 {
    "_index" : "jk",
    "_type" : "jobposting",
    "_id" : "26362",
    "_score" : 1.0,
    "_source" : {
      "date" : "2017-04-22 00:00:00",
      "location_name" : "Berlin",
      "location_lat" : "52.520007",
      "location_lon" : "13.404954",
      "@timestamp" : "2017-04-24T07:50:31.660Z",
      "@version" : "1",
      "description" : "Some text here",
      "company" : "Test Company",
      "id" : 26362,
      "jobtitle" : "Architect Data Center Network & Security",
      "priority" : 10,
    }
  },
 {
    "_index" : "jk",
    "_type" : "jobposting",
    "_id" : "26363",
    "_score" : 1.0,
    "_source" : {
      "date" : "2017-04-22 00:00:00",
      "location_name" : "Hamburg",
      "location_lat" : "53.551085",
      "location_lon" : "9.993682",
      "@timestamp" : "2017-04-24T07:50:31.660Z",
      "@version" : "1",
      "description" : "Some text here",
      "company" : "Test Company",
      "id" : 26363,
      "jobtitle" : "Architect Data Center Network & Security",
      "priority" : 10,
    }
  }

What I'm trying to get is something like this:

 {
    "_index" : "jk",
    "_type" : "jobposting",
    "_id" : "26362",
    "_score" : 1.0,
    "_source" : {
      "date" : "2017-04-22 00:00:00",
      "locations" : [ {  "name": "Berlin", "lat" : "52.520007", "lon" : "13.404954" }, {  "name": "Hamburg", "lat" : "53.551085", "lon" : "9.993682" } ]
      "@timestamp" : "2017-04-24T07:50:31.660Z",
      "@version" : "1",
      "description" : "Some text here",
      "company" : "Test Company",
      "id" : 26362,
      "jobtitle" : "Architect Data Center Network & Security",
      "priority" : 10,
    }
  }

So that if I'm going to search for jobs near Berlin or Hamburg by using a geo_distance filter this job should appear. Is there any way to import data in that way with logstash?

My logstash.conf looks like this:

input {
jdbc {
jdbc_connection_string => "jdbc:mysql://localhost:3306/jk"
jdbc_user => "..."
jdbc_password => "..."
jdbc_driver_library => "/etc/logstash/mysql-connector-java-5.1.41/mysql-connector-java-5.1.41-bin.jar"
jdbc_driver_class => "com.mysql.jdbc.Driver"
statement => "SELECT id, company, jobtitle, description, priority, DATE_FORMAT(date, '%Y-%m-%d %T') AS date, sa_locations.location AS location_name, sa_locations.lat AS location_lat, sa_locations.lon AS location_lon
FROM sa_data JOIN sa_locations
ON sa_data.id = sa_locations.id
ORDER BY id
}
}

#filter {
# aggregate {
# task_id => "%{id}"
# code => "
# map['location_name'] = event.get('location_name')
# map['location_lat'] = event.get('location_lat')
# map['location_lon'] = event.get('location_lon')
# map['locations'] ||=
# map['locations'] << {'location_name' => event.get('location_name')}
# map['locations'] << {'location_lat' => event.get('location_lat')}
# map['locations'] << {'location_lon' => event.get('location_lon')}
# event.cancel()
# "
# push_previous_map_as_event => true
# timeout => 3
# }
#}

output {
elasticsearch {
index => "jk"
document_type => "jobposting"
document_id => "%{id}"
hosts => ["localhost:9200"]
}
}

The filter seemed to be a wrong approach.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.