Inserting a Complex Nested Json from Postgres to Elasticsearch via Logstash

Hi All,

I am inserting a table into ES which contains a column that is of type Nested Jsonb (I am converting it to Json while extracting the data) in Postgres.
This is my logstash conf file:

input {
jdbc {
jdbc_connection_string => "jdbc:postgresql://myhostname:5432/mydb"
jdbc_user => "myusr"
jdbc_password => "mypwd"
jdbc_validate_connection => true
jdbc_driver_library => "/ELK/postgres/postgresql-9.4.1208.jar"
jdbc_driver_class => "org.postgresql.Driver"
statement => "SELECT to_json(business_det) from business limit 1"
}
}

filter{
json{
source => "%{[to_json][value]}"
}
}

output {
elasticsearch {
index => "abc"
document_type => "details"
# document_id => "%{business_id}"
# hosts => "localhost"
}
stdout{}
}

Ideally, I would like the Nested Json object (business_det) to be stored in the same nested json format in root document in ES.
But it's being stored in the field "value" within "to_json" and I am unable to do the above.
Here's how it looks:

{
"took" : 4,
"_shards" : {
"failed" : 0,
"successful" : 5,
"total" : 5
},
"timed_out" : false,
"hits" : {
"hits" : [
{
"_score" : 1,
"_index" : "abc",
"_source" : {
"@version" : "1",
"@timestamp" : "2016-08-18T21:45:29.222Z",
"to_json" : {
"value" : "{"gps": {"latitude": "41.15432", "longitude": "-74.35413"}, "name": "US Post Office", "aboutMe": "", "address": {"city": "Hewitt", "line1": "1926 Union Valley Rd Ste 3", "state": "NJ", "country": "USA", "zipCode": "07421"}, "category": "Post Offices", "phoneNum": [{"work": "(800) 275-8777"}], "openHours": ["Mon - Fri 8:30 am - 5:00 pm", " Sat 8:30 am - 12:30 pm", " Sun Closed"], "searchTags": [" Post Offices", "Mail & Shipping Services"]}",
"type" : "json"
}
},
"_type" : "details",
"_id" : "AVafnb8ftbvkzTn2myuI"
}

Here's my mapping:

curl -XPUT "http://localhost:9200/abc/_mapping/details" -d'
{
"details": {
"properties": {
"to_json": {
"type": "nested",
"properties": {
"name": {
"type" : "string",
"analyzer": "autocomplete"
},
"category": {
"type" : "string",
"analyzer": "autocomplete"
},
"aboutMe": {
"type" : "string",
"analyzer": "autocomplete"
},
"searchTags": {
"type" : "string",
"analyzer": "autocomplete"
}
}
}
}
}
}'

What am I doing wrong here?
Please let me know.

Thanks.

Are you sure the result above came from when you had the json filter in place? Because this looks totally fine and should work. Is there anything in the logs? In the unlikely event of a screwed-up JSON string Logstash will log a message about it.

Yes, I am sure I used the same logstash conf file for the result above.
I checked the logs, there was no error or exception. Just to be sure, I did it all again and checked it. Cannot put my finger on it.
By the way, I tried loading the data as a JSON file. It worked perfectly fine!
Just when I import it from Postgres, I am running into this trouble.

Please advise.
Thanks again.

Ok, here goes! I was able to resolve this issue.
Had to import the data from Postgres in text format and apply json filter on the respective fields.

SELECT postgres_parent_json_object->>'postgres_child_json_tag' FROM postgres_table

It worked just fine! :slight_smile:

Oh yeah, when dealing with arrays, logstash reads the field names in lowercase format (camelcasing does not work, so make sure the source field name is in lowercase).
Hope this helps people who are stuck with reading nested json from Postgres.

Hi @anushav85,

Could you kindly share the working version of the input and filter sections of your logstash configuration file? I am specifically trying to understand your 'import the data from Postgres in text format' comment.

I have a similar situation where I'm using a SQL file as my input source, and the records have a few fields that are of type JSON, that are not being interpretted by Elasticsearch properly.

Here is a simple document. Checkout the fields, instances and customfields fields. These are JSON fields in the database.

{
"took" : 7,
"timed_out" : false,
"_shards" : {
"total" : 5,
"successful" : 5,
"failed" : 0
},
"hits" : {
"total" : 1,
"max_score" : 4.663562,
"hits" : [ {
"_index" : "metadata",
"_type" : "dataset",
"_id" : "1470",
"_score" : 4.663562,
"_source" : {
"id" : 1470,
"name" : "JK Dataset 5000",
"team" : "The A-Team",
"owner_name" : "Seat-Pacifico@here.com",
"steward_name" : "jonathan.kurz@here.com",
"contact_email" : "jonathan.kurz@here.com",
"description" : "The first dataset I've added since Amplitude logging was implemented.",
"pii" : "no",
"quality" : null,
"frequency" : null,
"environment" : null,
"external_doc_url" : null,
"sample_data_location" : null,
"current_version" : null,
"create_date" : "2016-11-02T17:24:35.590Z",
"last_change_date" : "2016-11-02T18:36:34.214Z",
"active" : true,
"hidden" : false,
"classification" : "HERE Internal Use Only",
"licenses" : null,
"schema_uri" : null,
"hidden_reason" : null,
"created_at" : "2016-11-02T17:24:35.591Z",
"updated_at" : "2016-11-02T18:36:34.216Z",
"fields" : {
"type" : "json",
"value" : "[{"dataset_id":1470,"id":627,"name":"field12`","description":"adfs","common_field":null,"pii":"no","created_at":"2016-11-02T17:25:30.551341+00:00","updated_at":"2016-11-02T17:25:30.551341+00:00"}]"
},
"customfields" : {
"type" : "json",
"value" : "[{"id":52,"dataset_id":1470,"namespace":"default","key":"new attribute with brackets []","value":"[asdfasdf,3,4,5]","created_at":"2016-11-02T17:27:35.413789+00:00","updated_at":"2016-11-02T17:27:35.413789+00:00"}]"
},
"tags" : null,
"instances" : {
"type" : "json",
"value" : "[{"instance_id":1559,"instance_name":null,"instance_description":"no description","source_uri":"testURI123","source_name":"JK Amplitude source","source_uri_hint":"no hint","source_owner":"jonathan.kurz@here.com"}]"
},
"@version" : "1",
"@timestamp" : "2016-11-02T21:31:01.408Z"
}
} ]
}
}

Here is my logstash config file,

input {
jdbc {
# Postgres jdbc connection string to our database, mydb
jdbc_connection_string => "Replace with db name"
# The user we wish to execute our statement as
jdbc_user => "Replace with user"
jdbc_password => "Replace with pwd"
# The path to our downloaded jdbc driver
jdbc_driver_library => "/home/metadata/postgres/postgresql-9.4-1201-jdbc42-20150827.124843-3.jar"
# The name of the driver class for Postgresql
jdbc_driver_class => "org.postgresql.Driver"
# our query
statement_filepath => "/home/metadata/logstash/query.sql"
# schedule
schedule => "* * * * *"
}
}
filter {
json {
source => "[instances][value]"
}
}

output {
elasticsearch {
index => "metadata"
document_type => "dataset"
document_id => "%{id}"
hosts => ["localhost"]
}
}

Regards,
Suman

Could you share the SQL statement in your "query.sql"?
By 'importing the data in text format' I meant logstash didn't work with the nested JSON object stored in our Postgres table for me, so I had to convert it to text by using the statement I had mentioned in my earlier post.

My SQL is somewhat long but here you go..

SELECT
dm.*,
(
SELECT array_to_json(array_agg(row_to_json(f)))
FROM (
SELECT *
FROM application.fields_master fm
WHERE fm.dataset_id = dm.id
) f
) AS fields,
(
SELECT array_to_json(array_agg(row_to_json(c)))
FROM (
SELECT *
FROM application.custom_fields_master cfm
WHERE cfm.dataset_id = dm.id
) c
) AS customfields,
(
SELECT array_to_json(array_agg(row_to_json(t)))
FROM (
SELECT tm.text
FROM application.tags_dataset_mapping tdm
INNER JOIN application.tags_master tm
ON tdm.dataset_id = dm.id
AND tdm.tag_id = tm.id
) t
) AS tags,
(
SELECT array_to_json(array_agg(row_to_json(i)))
FROM (
SELECT
di.id AS instance_id,
di.name AS instance_name,
di.description AS instance_description,
di.source_uri,
s.source_name,
s.source_uri_hint,
s.owned_by AS source_owner
FROM application.dataset_instance di
INNER JOIN application.sources s
ON di.dataset_id = dm.id
AND di.source_id = s.id
) i
) AS instances
FROM application.dataset_master dm

Here is a sample output record for the SQL query in my previous post. The last 4 fields are JSON type.

CREATE TABLE "MY_TABLE" (
id bigint,
name varchar,
team varchar,
owner_name varchar,
steward_name varchar,
contact_email varchar,
description varchar,
pii varchar,
quality varchar,
frequency varchar,
environment varchar,
external_doc_url varchar,
sample_data_location varchar,
current_version varchar,
create_date timestamp,
last_change_date timestamp,
active bool,
hidden bool,
classification varchar,
licenses varchar,
schema_uri varchar,
hidden_reason varchar,
created_at timestamptz,
updated_at timestamptz,
fields json,
customfields json,
tags json,
instances json
);

INSERT INTO "MY_TABLE"(id, name, team, owner_name, steward_name, contact_email, description, pii, quality, frequency, environment, external_doc_url, sample_data_location, current_version, create_date, last_change_date, active, hidden, classification, licenses, schema_uri, hidden_reason, created_at, updated_at, fields, customfields, tags, instances) VALUES (503, 'SK_Dataset_100', 'Insights', 'suman.kar@here.com', 'suman.kar@here.com', 'suman.kar@here.com', 'blah', 'yes', null, null, null, null, null, null, '2016-10-11 04:25:43.468000', '2016-10-14 00:16:24.735000', true, false, 'Confidential', null, null, null, '2016-10-11 04:25:43.580424', '2016-10-14 00:16:24.845744', [{"dataset_id":503,"id":50,"name":"eee","description":"eee","common_field":null,"pii":"yes","created_at":"2016-10-13T13:15:26.327249-07:00","updated_at":"2016-10-13T13:15:52.759431-07:00"}], null, null, null);

I had exact same problem. Got solution using filter as follows:

filter {
ruby {
code => "
require 'json'
some_json_field_value = JSON.parse(event.get('some_json_field').to_s)
event.set('some_json_field',some_json_field_value)
"
}
}

I had exact same problem. Got solution using filter as follows:

What's the benefit of this over a json filter?

I actually figured that. Thanks for confirming though.

Not sure what the benefit is but I was just not able to get the json filter to work.