Index records with JSON type fields from Postgres to Elasticsearch using Logstash

Hi,

I have a SQL file, that generates records from a Postgres db. I would like to index these records in Elasticsearch using Logstash. Here is a sample record from the database.

1, 'Test Dataset 1', 'Insights', 'suman.kar@here.com', 'kevin.sherard@here.com', 'kevin.sherard@here.com', 'blah', 'no', , '2016-09-22 23:49:12.330454', '2016-09-30 04:37:48.456346', [{"dataset_id":1,"id":21,"name":"f1","description":"f1","common_field":null,"pii":"no","created_at":"2016-09-26T13:41:46.100589-07:00","updated_at":"2016-09-26T13:41:46.100589-07:00"},{"dataset_id":1,"id":22,"name":"f2","description":"f2","common_field":null,"pii":"no","created_at":"2016-09-26T13:42:01.748609-07:00","updated_at":"2016-09-26T13:42:01.748609-07:00"}], null, null, null

As you can see above, one of the fields (highlighted in bold) is of JSON type. I have 4 such fields for each record.
When I try to send these records to Elasticsearch, they show up in the following format though (focus on the JSON type fields only).

    "id" : 1470,
    "name" : "JK Dataset 5000",
    "team" : "The A-Team",
    "owner_name" : "ffff@here.com",
    "steward_name" : "ffff@here.com",
    "contact_email" : "fffff@here.com",
    "description" : "The first dataset I've added since Amplitude logging was implemented.",
    "pii" : "no",
    "create_date" : "2016-11-02T17:24:35.590Z",
    "last_change_date" : "2016-11-02T18:36:34.214Z",
    "active" : true,
    "hidden" : false,
    "classification" : "HERE Internal Use Only",
    "licenses" : null,
    "schema_uri" : null,
    "hidden_reason" : null,
    "created_at" : "2016-11-02T17:24:35.591Z",
    "updated_at" : "2016-11-02T18:36:34.216Z",
    "fields" : {
      "type" : "json",
      "value" : "[{\"dataset_id\":1470,\"id\":627,\"name\":\"field12`\",\"description\":\"adfs\",\"common_field\":null,\"pii\":\"no\",\"created_at\":\"2016-11-02T17:25:30.551341+00:00\",\"updated_at\":\"2016-11-02T17:25:30.551341+00:00\"}]"
    },
    "customfields" : {
      "type" : "json",
      "value" : "[{\"id\":52,\"dataset_id\":1470,\"namespace\":\"default\",\"key\":\"new attribute with brackets []\",\"value\":\"[asdfasdf,3,4,5]\",\"created_at\":\"2016-11-02T17:27:35.413789+00:00\",\"updated_at\":\"2016-11-02T17:27:35.413789+00:00\"}]"
    },
    "tags" : null,
    "instances" : {
      "type" : "json",
      "value" : "[{\"instance_id\":1559,\"instance_name\":null,\"instance_description\":\"no description\",\"source_uri\":\"testURI123\",\"source_name\":\"JK Amplitude source\",\"source_uri_hint\":\"no hint\",\"source_owner\":\"jonathan.kurz@here.com\"}]"
    },
    "@version" : "1",
    "@timestamp" : "2016-11-03T20:02:01.479Z"
  }
} ]

}
}

How do I transform the JSON type fields so that the document gets indexed in the following format instead? I see a bunch of posts that seem to address a similar issue but none of the recommended solutions seemed to work for me. I think we need to use a mapping and filter but nothing I tried seem to work.

{
"id": 1470,
"classification": "HERE Internal Use Only",
"licenses": null,
"name": "JK Dataset 5000",
"team": "The A-Team",
"ownerName": "fff@here.com",
"stewardName": "fff@here.com",
"contactEmail": "fff@here.com",
"description": "The first dataset I've added since Amplitude logging was implemented.",
"pii": "no",
"quality": null,
"frequency": null,
"environment": null,
"externalDocUrl": null,
"sampleDataLocation": null,
"currentVersion": null,
"schemaUri": null,
"hidden": false,
"createDate": "2016-11-02 T17:24:35",
"lastChangeDate": "2016-11-02 T18:36:34",
"fields": [{
"id": 627,
"name": "field12`",
"description": "adfs",
"commonField": null,
"pii": "no"
}],
"instances": [{
"id": 1559,
"name": null,
"description": "no description",
"sourceUri": "testURI123",
"sourceId": 292,
"customAttributes": [{
"id": 4045,
"key": "custom1",
"value": "amplitude"
}]
}],
"customAttributes": [{
"id": 52,
"nameSpace": "default",
"key": "new attribute with brackets []",
"value": "[asdfasdf,3,4,5]"
}],
"tags": []
}

Continuation from last post, here is my logstash configuration file,

input {
jdbc {
# Postgres jdbc connection string to our database, mydb
jdbc_connection_string => ""
# The user we wish to execute our statement as
jdbc_user => "
"
jdbc_password => "*****"
# The path to our downloaded jdbc driver
jdbc_driver_library => "/home/metadata/postgres/postgresql-9.4-1201-jdbc42-20150827.124843-3.jar"
# The name of the driver class for Postgresql
jdbc_driver_class => "org.postgresql.Driver"
# our query
statement_filepath => "/home/metadata/logstash/query.sql"
# schedule
schedule => "* * * * *"
}
}
output {
elasticsearch {
index => "metadata"
document_type => "dataset"
document_id => "%{id}"
hosts => ["localhost"]
}
}

Also, here is my mapping. Just focus on the 'instances' field

curl -XPUT 'localhost:9200
{
"metadata": {
"mappings": {
"dataset": {
"properties": {
"@timestamp": {
"type": "date",
"format": "strict_date_optional_time||epoch_millis"
},
"@version": {
"type": "string"
},
"active": {
"type": "boolean"
},
"classification": {
"type": "string"
},
"contact_email": {
"type": "string"
},
"create_date": {
"type": "date",
"format": "strict_date_optional_time||epoch_millis"
},
"created_at": {
"type": "date",
"format": "strict_date_optional_time||epoch_millis"
},
"current_version": {
"type": "string"
},
"customfields": {
"type": "string"
},
"description": {
"type": "string"
},
"environment": {
"type": "string"
},
"external_doc_url": {
"type": "string"
},
"fields": {
"type": "string"
},
"frequency": {
"type": "string"
},
"hidden": {
"type": "boolean"
},
"hidden_reason": {
"type": "string"
},
"id": {
"type": "long"
},
"instances": {
"type": "nested",
"properties": {
"instance_id": { "type": "string" },
"instance_name": { "type": "string" },
"instance_description": { "type": "string" },
"source_uri": { "type": "string" },
"source_name": { "type": "string" },
"source_uri_hint": { "type": "string" },
"owned_by": { "type": "string" }
}
},
"last_change_date": {
"type": "date",
"format": "strict_date_optional_time||epoch_millis"
},
"licenses": {
"type": "string"
},
"name": {
"type": "string"
},
"owner_name": {
"type": "string"
},
"pii": {
"type": "string"
},
"quality": {
"type": "string"
},
"sample_data_location": {
"type": "string"
},
"schema_uri": {
"type": "string"
},
"steward_name": {
"type": "string"
},
"tags": {
"properties": {
"type": {
"type": "string"
},
"value": {
"type": "string"
}
}
},
"team": {
"type": "string"
},
"updated_at": {
"type": "date",
"format": "strict_date_optional_time||epoch_millis"
}
}
}
}
}
}'

Hi @magnusbaeck,

I see a lot of posts from you regarding the same topic. Could you kindly address my specific situation? Thanks in advance.

Regards,
Suman