Logstash 2.3.x how to load nested objects in elasticsearch index using logstash-jdbc plugin

Hi,
I am currently using elasticsearch 2.3.4 and logstash 2.3.4
I am trying to load relational data from Oracle db into my elasticsearch index using jdbc plugin. As suggested in various posts, I am using aggregate filter for this. Still I am not able to load the inner nested object in the document.

I have two related entities with following data:

CREATE TABLE DEPARTMENT (
	id NUMBER PRIMARY KEY,
	name VARCHAR2(4000) NOT NULL
)

CREATE TABLE EMPLOYEE (
	id NUMBER PRIMARY KEY,
	name VARCHAR2(4000) NOT NULL,
	departmentid NUMBER,
	CONSTRAINT EMPLOYEE_FK FOREIGN KEY (departmentid) REFERENCES DEPARTMENT(id)
) 


insert into DEPARTMENT values (1, 'dept1');
insert into DEPARTMENT values (2, 'dept2');
insert into DEPARTMENT values (3, 'dept3');
insert into DEPARTMENT values (4, 'dept4');

insert into EMPLOYEE values (1, 'emp1', 1);
insert into EMPLOYEE values (2, 'emp2', 1);
insert into EMPLOYEE values (3, 'emp3', 1);
insert into EMPLOYEE values (4, 'emp4', 2);
insert into EMPLOYEE values (5, 'emp5', 2);
insert into EMPLOYEE values (6, 'emp6', 3);

Here is my mapping:

{
	"mappings": {
		"departments": {
			"properties": {
				"id": {
					"type": "integer"
				},
				"deptName": {
					"type": "string"
				},			
				"employee_details": {
					"type": "nested",
					"properties": {
						"empId": {
							"type": "integer"
						},
						"empName": {
							"type": "string"
						}
					}
				}
			}
		}
	}
}

And this is my logstash configuration:

input{
	jdbc{
		jdbc_validate_connection => true
		jdbc_connection_string => "jdbc:oracle:thin:@host:port:db"
		jdbc_user => "user"
		jdbc_password => "pwd"
		jdbc_driver_library => "../vendor/jar/ojdbc14.jar"
		jdbc_driver_class => "Java::oracle.jdbc.driver.OracleDriver"
		statement => "SELECT 
						department.id AS id,
						department.name AS deptName,
						employee.id AS empId,
						employee.name AS empName
					FROM  department LEFT JOIN employee  
					ON department.id = employee.departmentid
					ORDER BY id"
	}
}
	
filter{
	aggregate {
		task_id => "%{id}"
		code => "
		map['id'] = event['id']
		map['deptName'] = event['deptName']
		map['employee_details'] ||= []
		map['employee_details'] << {'empId' => event['empId'], 'empName' => event['empName'] }
		"
		
		push_previous_map_as_event => true
		timeout => 5
		timeout_tags => ['aggregated']
	}		
}

output{
stdout{ codec => rubydebug }
	elasticsearch{
		action => "index"
		index => "my_index"
		document_type => "departments"
		document_id => "%{id}"
		hosts => "localhost:9200"
	}
}	

When i perform a XGET on all documents:
curl -XGET 'localhost:9200/my_index/_search/?pretty=true&q=:

The values are not mapped to fields and dsiplayed as NULL:

  "took": 1,
  "timed_out": false,
  "_shards": {
    "total": 5,
    "successful": 5,
    "failed": 0
  },
  "hits": {
    "total": 4,
    "max_score": 1,
    "hits": [
      {
        "_index": "my_index",
        "_type": "departments",
        "_id": "2",
        "_score": 1,
        "_source": {
          "id": 2,
          "deptName": null,
          "employee_details": [
            {
              "empId": null,
              "empName": null
            },
            {
              "empId": null,
              "empName": null
            }
          ],
          "@version": "1",
          "@timestamp": "2019-05-14T10:47:33.477Z",
          "tags": [
            "aggregated"
          ]
        }
      },
      {
        "_index": "my_index",
        "_type": "departments",
        "_id": "4",
        "_score": 1,
        "_source": {
          "id": 4,
          "deptname": "dept4",
          "empid": null,
          "empname": null,
          "@version": "1",
          "@timestamp": "2019-05-14T10:47:33.367Z",
          "deptName": null,
          "employee_details": [
            {
              "empId": null,
              "empName": null
            }
          ]
        }
      },
      {
        "_index": "my_index",
        "_type": "departments",
        "_id": "1",
        "_score": 1,
        "_source": {
          "id": 1,
          "deptName": null,
          "employee_details": [
            {
              "empId": null,
              "empName": null
            },
            {
              "empId": null,
              "empName": null
            },
            {
              "empId": null,
              "empName": null
            }
          ],
          "@version": "1",
          "@timestamp": "2019-05-14T10:47:33.477Z",
          "tags": [
            "aggregated"
          ]
        }
      },
      {
        "_index": "my_index",
        "_type": "departments",
        "_id": "3",
        "_score": 1,
        "_source": {
          "id": 3,
          "deptName": null,
          "employee_details": [
            {
              "empId": null,
              "empName": null
            }
          ],
          "@version": "1",
          "@timestamp": "2019-05-14T10:47:33.492Z",
          "tags": [
            "aggregated"
          ]
        }
      }
    ]
  }
}

rubydebug suggests the values are set to 'nil'. Could anyone please help me with what I am doing wrong here?

In the rubydebug output on stdout, what do you get for a given id?

Hi,
Here is a snippet of stdout for id= 1.

{
            "id" => 1.0,
      "deptname" => "dept1",
         "empid" => 1.0,
       "empname" => "emp1",
      "@version" => "1",
    "@timestamp" => "2019-05-14T12:32:14.272Z"
}
{
            "id" => 1.0,
      "deptname" => "dept1",
         "empid" => 2.0,
       "empname" => "emp2",
      "@version" => "1",
    "@timestamp" => "2019-05-14T12:32:15.272Z"
}
{
            "id" => 1.0,
      "deptname" => "dept1",
         "empid" => 3.0,
       "empname" => "emp3",
      "@version" => "1",
    "@timestamp" => "2019-05-14T12:32:15.272Z"
}
{
                  "id" => 1.0,
            "deptName" => nil,
    "employee_details" => [
        [0] {
              "empId" => nil,
            "empName" => nil
        },
        [1] {
              "empId" => nil,
            "empName" => nil
        },
        [2] {
              "empId" => nil,
            "empName" => nil
        }
    ],
            "@version" => "1",
          "@timestamp" => "2019-05-14T12:32:15.381Z",
                "tags" => [
        [0] "aggregated"
    ]
}

Even though you asked it to return deptName, the field on the message is called deptname, which does not match. Similarly for empId and empName.

Hey Badger,

Thank you so much! It worked for me. I totally overlooked this. Can't believe fields are case-sensitive here.