Logstash 2.3.x how to load nested objects in elasticsearch index using logstash-jdbc plugin

Hi,
I am currently using elasticsearch 2.3.4 and logstash 2.3.4
I am trying to load relational data from Oracle db into my elasticsearch index using jdbc plugin. As suggested in various posts, I am using aggregate filter for this. Still I am not able to load the inner nested object in the document.

I have two related entities with following data:

CREATE TABLE DEPARTMENT (
	id NUMBER PRIMARY KEY,
	name VARCHAR2(4000) NOT NULL
)

CREATE TABLE EMPLOYEE (
	id NUMBER PRIMARY KEY,
	name VARCHAR2(4000) NOT NULL,
	departmentid NUMBER,
	CONSTRAINT EMPLOYEE_FK FOREIGN KEY (departmentid) REFERENCES DEPARTMENT(id)
) 


insert into DEPARTMENT values (1, 'dept1');
insert into DEPARTMENT values (2, 'dept2');
insert into DEPARTMENT values (3, 'dept3');
insert into DEPARTMENT values (4, 'dept4');

insert into EMPLOYEE values (1, 'emp1', 1);
insert into EMPLOYEE values (2, 'emp2', 1);
insert into EMPLOYEE values (3, 'emp3', 1);
insert into EMPLOYEE values (4, 'emp4', 2);
insert into EMPLOYEE values (5, 'emp5', 2);
insert into EMPLOYEE values (6, 'emp6', 3);

Here is my mapping:

{
	"mappings": {
		"departments": {
			"properties": {
				"id": {
					"type": "integer"
				},
				"deptName": {
					"type": "string"
				},			
				"employee_details": {
					"type": "nested",
					"properties": {
						"empId": {
							"type": "integer"
						},
						"empName": {
							"type": "string"
						}
					}
				}
			}
		}
	}
}

And this is my logstash configuration:

input{
	jdbc{
		jdbc_validate_connection => true
		jdbc_connection_string => "jdbc:oracle:thin:@host:port:db"
		jdbc_user => "user"
		jdbc_password => "pwd"
		jdbc_driver_library => "../vendor/jar/ojdbc14.jar"
		jdbc_driver_class => "Java::oracle.jdbc.driver.OracleDriver"
		statement => "SELECT 
						department.id AS id,
						department.name AS deptName,
						employee.id AS empId,
						employee.name AS empName
					FROM  department LEFT JOIN employee  
					ON department.id = employee.departmentid
					ORDER BY id"
	}
}
	
filter{
	aggregate {
		task_id => "%{id}"
		code => "
		map['id'] = event['id']
		map['deptName'] = event['deptName']
		map['employee_details'] ||= []
		map['employee_details'] << {'empId' => event['empId'], 'empName' => event['empName'] }
		"
		
		push_previous_map_as_event => true
		timeout => 5
		timeout_tags => ['aggregated']
	}		
}

output{
stdout{ codec => rubydebug }
	elasticsearch{
		action => "index"
		index => "my_index"
		document_type => "departments"
		document_id => "%{id}"
		hosts => "localhost:9200"
	}
}	

When i perform a XGET on all documents:
curl -XGET 'localhost:9200/my_index/_search/?pretty=true&q=:

The values are not mapped to fields and dsiplayed as NULL:

  "took": 1,
  "timed_out": false,
  "_shards": {
    "total": 5,
    "successful": 5,
    "failed": 0
  },
  "hits": {
    "total": 4,
    "max_score": 1,
    "hits": [
      {
        "_index": "my_index",
        "_type": "departments",
        "_id": "2",
        "_score": 1,
        "_source": {
          "id": 2,
          "deptName": null,
          "employee_details": [
            {
              "empId": null,
              "empName": null
            },
            {
              "empId": null,
              "empName": null
            }
          ],
          "@version": "1",
          "@timestamp": "2019-05-14T10:47:33.477Z",
          "tags": [
            "aggregated"
          ]
        }
      },
      {
        "_index": "my_index",
        "_type": "departments",
        "_id": "4",
        "_score": 1,
        "_source": {
          "id": 4,
          "deptname": "dept4",
          "empid": null,
          "empname": null,
          "@version": "1",
          "@timestamp": "2019-05-14T10:47:33.367Z",
          "deptName": null,
          "employee_details": [
            {
              "empId": null,
              "empName": null
            }
          ]
        }
      },
      {
        "_index": "my_index",
        "_type": "departments",
        "_id": "1",
        "_score": 1,
        "_source": {
          "id": 1,
          "deptName": null,
          "employee_details": [
            {
              "empId": null,
              "empName": null
            },
            {
              "empId": null,
              "empName": null
            },
            {
              "empId": null,
              "empName": null
            }
          ],
          "@version": "1",
          "@timestamp": "2019-05-14T10:47:33.477Z",
          "tags": [
            "aggregated"
          ]
        }
      },
      {
        "_index": "my_index",
        "_type": "departments",
        "_id": "3",
        "_score": 1,
        "_source": {
          "id": 3,
          "deptName": null,
          "employee_details": [
            {
              "empId": null,
              "empName": null
            }
          ],
          "@version": "1",
          "@timestamp": "2019-05-14T10:47:33.492Z",
          "tags": [
            "aggregated"
          ]
        }
      }
    ]
  }
}

rubydebug suggests the values are set to 'nil'. Could anyone please help me with what I am doing wrong here?

In the rubydebug output on stdout, what do you get for a given id?

Hi,
Here is a snippet of stdout for id= 1.

{
            "id" => 1.0,
      "deptname" => "dept1",
         "empid" => 1.0,
       "empname" => "emp1",
      "@version" => "1",
    "@timestamp" => "2019-05-14T12:32:14.272Z"
}
{
            "id" => 1.0,
      "deptname" => "dept1",
         "empid" => 2.0,
       "empname" => "emp2",
      "@version" => "1",
    "@timestamp" => "2019-05-14T12:32:15.272Z"
}
{
            "id" => 1.0,
      "deptname" => "dept1",
         "empid" => 3.0,
       "empname" => "emp3",
      "@version" => "1",
    "@timestamp" => "2019-05-14T12:32:15.272Z"
}
{
                  "id" => 1.0,
            "deptName" => nil,
    "employee_details" => [
        [0] {
              "empId" => nil,
            "empName" => nil
        },
        [1] {
              "empId" => nil,
            "empName" => nil
        },
        [2] {
              "empId" => nil,
            "empName" => nil
        }
    ],
            "@version" => "1",
          "@timestamp" => "2019-05-14T12:32:15.381Z",
                "tags" => [
        [0] "aggregated"
    ]
}

Even though you asked it to return deptName, the field on the message is called deptname, which does not match. Similarly for empId and empName.

1 Like

Hey Badger,

Thank you so much! It worked for me. I totally overlooked this. Can't believe fields are case-sensitive here.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.