JSON Array using Logstash

I am fetching data from SQL Server using Log stash. JDBC Rivers would convert columns with "." notation to Array structures. I am looking for a similar way in Log stash.

I am using the logstash-filter-aggregate pluggin.

I have the below Mapping:

    {
	"customers": {
		"properties": {

			"age": {
				"type": "long"
			},
			"id": {
				"type": "long"
			},
			"locations": {
				"properties": {
					"cust_name": {
						"type": "text"
					}
				}
			},
			"name": {
				"type": "text",
				"fields": {
					"keyword": {
						"type": "keyword",
						"ignore_above": 256
					}
				}
			}
		}
	}
}

locations is an array inside the document.

SQL Statement used in Logstash Conf file

 statement => "SELECT c.ID,c.NAME as 'name', c.AGE as 'age',l.LocationName as 'locations.cust_name' from customers c join location l on c.id=l.customerid"

So we can have multiple location names for a customer ID.

This is the filter I am using:

filter {
         aggregate {
             task_id => "%{id}"
             code => "
                map['locations'] ||= { 'cust_name' => [] }
                event.to_hash.each do |key,value|
                    map[key] ||= value unless key == 'cust_name'
                    map['locations']['cust_name'] << value if key == 'cust_name'
                end
             "
             push_previous_map_as_event => true
             timeout => 5
         }
     }

However I am not able to form the JSON array of locations.
Can someone provide any help on this.

Could you provide an example event data, as it is just before aggregate filter ?

You need to understand how logstash event works.
Every event goes through filter/output. Aggregate filter does not filter out events. At output, you need to use if statement to discard none aggregated event. Otherwise last event stands.

See my example at

Sample data which I am getting from my SQL Query

ID name age locations.cust_name
1 Test 25 T1
1 Test 25 T2
1 Test 25 T3
1 Test 25 T4

One ID will have multiple cust_name. I am looking to have an array of locations.

You will need to create a table valued function containing FOR JSON PATH in SQL Server 2016 and use CROSS APPLY on that function to get results in the nested structure you want. Once you have that ready you will want to use the following ruby filter in your logstash config file to let ES know that the data in your nested column is in correct JSON format.

ruby {
code => "
require 'json'
json_column = JSON.parse(event.get('sqlcolumn').to_s)
event.set('column_name_in_es',json_column)
"
}

Let me know if this works.

Thanks.

Hi Rory

I have looked into FOR JSON PATH in SQL Server. Using FOR JSON PATH i get the following from SQL Server.

[{
	"id": 1,
	"name": "Test",
	"age": 32,
	"location": {
		"cust_name": "T1"
	}
}, {
	"id": 1,
	"name": "Test",
	"age": 32,
	"location": {
		"cust_name": "T2"
	}
}, {
	"id": 1,
	"name": "Test",
	"age": 32,
	"location": {
		"cust_name": "T"
	}
}]

How do i prepare an array of location from here?

You will have to create a sql table valued function that returns the columns you need in the nested document.

The function should look like:

CREATE FUNCTION [mytablevaluedfunctionJSON]
(@id varchar(100))
RETURNS @retcust_name TABLE
(
cust_name_nested nvarchar(max)
)

BEGIN
DECLARE @jsonvariable nvarchar(max);
select @jsonvariable = (
SELECT
[cust_name]
FROM table
where id = @id
FOR JSON PATH
)
INSERT @retcust_name select @jsonvariable
RETURN
END

If id = xyz has 4 cust_name, it will return
"cust_name_nested": [
{ "cust_name" = 1},
{ ... },
{ ... },
{ ... }
]

You need to use cross apply with this function to get all documents....

This is what i would do. Let me know if it works for you.

Sorry but can you provide sample data in Json format, using stdout output with rubydebug=true activated ?

When I see your conf, I wonder if you should set %{ID} instead of %{id}.

Thus, you should add event.cancel() at you aggregate's code end to cancel current event (only aggregate timeout event is interesting for you)

Data using rubydebug=true

{
     "@timestamp" => 2017-07-14T21:32:00.131Z,
           "name" => "Ramesh",
       "@version" => "1",
             "id" => 1,
            "age" => 32,
    "location.cust_name" => "T1"
}
{
     "@timestamp" => 2017-07-14T21:32:00.133Z,
           "name" => "Ramesh",
       "@version" => "1",
             "id" => 1,
            "age" => 32,
    "location.cust_name" => "T2"
}
{
     "@timestamp" => 2017-07-14T21:32:00.137Z,
           "name" => "Ramesh",
       "@version" => "1",
             "id" => 1,
            "age" => 32,
    "location.cust_name" => "T3"
}
{
     "@timestamp" => 2017-07-14T21:32:00.140Z,
           "name" => "Khilan",
       "@version" => "1",
             "id" => 2,
            "age" => 25,
    "location.cust_name" => "T2"
}
{
     "@timestamp" => 2017-07-14T21:32:00.141Z,
           "name" => "Khilan",
       "@version" => "1",
             "id" => 2,
            "age" => 25,
    "location.cust_name" => "T1"
}
{
     "@timestamp" => 2017-07-14T21:32:00.131Z,
           "name" => "Ramesh",
       "@version" => "1",
             "id" => 1,
              "location" => {
        "cust_name" => []
    },
            "age" => 32,
    "location.cust_name" => "T1"
}
{
     "@timestamp" => 2017-07-14T21:32:00.140Z,
           "name" => "Khilan",
       "@version" => "1",
             "id" => 2,
              "location" => {
        "cust_name" => []
    },
            "age" => 25,
    "location.cust_name" => "T2"
}

Here is a working example using the data you provided. Just make sure to set Logstash filter workers to 1 (-w 1 flag)

jdbc {

...
//assuming you want to put cust_name, address etc under [location]
statement => "select id,name,age, cust_name,address from atable order by id,name,age"
type => "atype"
....
}
filter:{
//use @metadata field to prevent saving the field
mutate {
add_field => { "[@metadata][type]" => "%{type}" }
remove_field => [ "type" ]
}
if "atype" == [@metadata][type] {
aggregate {
task_id => "%{id}%{name}%{age}"
code => "
map['id'] ||= event.get('id')
map['@metadata'] = event.get('@metadata')
map['name'] ||= event.get('name')
map['age'] ||= event.get('age')
map['location'] ||= []
map['location']< < { 'cust_name' => event.get('cust_name'),
'address' => event.get('address'),
...
}
"
push_previous_map_as_event => true
timeout =>5
timeout_tags => ['aggregated']
}
}
}
output {
//only ingest to ES when aggregated. timeout_tags put it there
if "aggregated" in [tags]{
elasticsearch {
index => "aindex"
document_type => "%{[@metadata][type]}"
document_id => "%{id}"
hosts => ......
action => "update"
doc_as_upsert => true
}
}
}

The above script has the type metadata stuff, you can ignote it. It is in the case you have multiple JDBC input, that you need to aggregate according to the type. (That is why set the type in JDBC input).
The key of aggregation is the task_id, which is where the aggregate comes from (grouping). Your example requires 3 fields aggregation, so task_id need to use 3 fields.
timeout_tags => ['aggregated'] is another key, you need to use the if "aggregated" in [tags]{
in the output to ES to filter events other than the aggregated event.
So, your example output would be like:
[{
"id": 1,
"name": "Test",
"age": 25,
"location": [
{
"cust_name": "T1"
"address":...
...
},
{
"cust_name": "T2"
"address":...
...
},
{
"cust_name": "T3"
"address":...
...
},
{
"cust_name": "T4"
"address":...
...
}]
},
{
"id": 2,
......
},
....
]
Is this the result you expect?

OK, so given your data, I suggest you this aggregate configuration :

filter {
         aggregate {
             task_id => "%{id}"
             code => "
                map['locations'] ||= { 'cust_name' => [] }
                event.to_hash.each do |key,value|
                    map[key] ||= value unless key == 'location.cust_name'
                    map['locations']['cust_name'] << value if key == 'location.cust_name'
                end
                event.cancel()
             "
             push_previous_map_as_event => true
             timeout => 5
         }
     }

Hi Fabien

Thanks for your help. I tried what you suggested. I can see the data in the format i am expecting in stdout. However the same is not showing up in Elastic Search. In Elastic Search I am seeing IDs outside of my join condition. I am using inner join so only matching IDs should show up.

Do you try to watch elasticsearch data using kibana dev tools console ?

I have not setup Kibana yet. What I am not able to figure out is, if I am seeing the data correctly in stdout when running the logstash conf file why is it not showing up properly in Elastic Search.

Also why am I getting IDs which do not satisfy my Join condition.

First, you should add "order by id" sql clause to be sure that first, you have all result lines with id=1, then all result lines with id=2, etc...

Then, given your elasticsearch mapping, your aggregate config and the stdout result which is what you expect, there is no reason you get something different in elasticsearch.

Finally, I'm not sure to understand what you try to do in elasticsearch and what's wrong. For instance, inner join does not exist in elasticsearch. Maybe you could show your elasticsearch query and the results and explain what's wrong in a concrete example ?

Hi Fabien

I do have the order by ID in my SQL statement.
My Elastic Search Type is a combination of 2 Tables. For e.g. I am storing array of cust_name (which can be multiple for a given ID) in the same Type.

My Logstash Conf File:

input {
	jdbc {
		jdbc_connection_string => "jdbc:sqlserver://dbname.database.windows.net;databaseName=TEST_DB;user=username;password=Password;"
		jdbc_user => "username"
		jdbc_password => "Password"
		jdbc_driver_library => "../../../../etc/logstash/driver/sqljdbc_6.0/enu/jre8/sqljdbc42.jar"
		jdbc_driver_class => "com.microsoft.sqlserver.jdbc.SQLServerDriver"
		
		schedule => "* * * * *"
		statement => "SELECT c.ID as 'id',c.NAME as 'name', c.AGE as 'age',l.LocationName as 'colocation.cust_name' from customers c inner join location l on c.id=l.customerid order by c.id"
	}
}
filter {
	aggregate {
		task_id => "%{id}"
		code => "
		map['colocation'] || = {
			'cust_name' => []
		}
		event.to_hash.each do |key, value |
				map[key] || = value unless key == 'colocation.cust_name'
		map['colocation']['cust_name'] << value
		if key == 'colocation.cust_name'
		end
			"
		push_previous_map_as_event => true
		timeout => 5
	}
}
output {
	elasticsearch {
		hosts => "localhost:9200"
		index => "lc-data"
		document_type => "customers2"
		document_id => "%{id}"
		codec => rubydebug
	}
	stdout {
		codec => rubydebug
	}
}

My ES Mapping:

{
	"mappings": {
		"customers2": {
			"properties": {
				"@timestamp": {
					"type": "date"
				},
				"@version": {
					"type": "text",
					"fields": {
						"keyword": {
							"type": "keyword",
							"ignore_above": 256
						}
					}
				},
				"address": {
					"type": "text",
					"fields": {
						"keyword": {
							"type": "keyword",
							"ignore_above": 256
						}
					}
				},
				"age": {
					"type": "long"
				},
				"colocation": {
					"properties": {
						"cust_name": {
							"type": "text"
						}
					}
				},
				"id": {
					"type": "long"
				},
				"name": {
					"type": "text",
					"fields": {
						"keyword": {
							"type": "keyword",
							"ignore_above": 256
						}
					}
				},
				"salary": {
					"type": "float"
				}
			}
		}
	}
}

you should remove "codec => rubydebug" in elasticsearch output

I tried by removing "codec" and I can see the data in the format I was expecting.

  1. Why am I still seeing IDs which are not returned as part of my SQL queries. The SQL query only returns few IDs but when running log stash all the IDs are being fetched.

  2. The data is changed after some time. Not able to understand why is this happening

Sorry for your new issues, but I have no idea why you got these strange issues.

One more question...if I have multiple array of children under one document type, how do I setup the filter in that case