I am fetching data from SQL Server using Log stash. JDBC Rivers would convert columns with "." notation to Array structures. I am looking for a similar way in Log stash.
statement => "SELECT c.ID,c.NAME as 'name', c.AGE as 'age',l.LocationName as 'locations.cust_name' from customers c join location l on c.id=l.customerid"
So we can have multiple location names for a customer ID.
This is the filter I am using:
filter {
aggregate {
task_id => "%{id}"
code => "
map['locations'] ||= { 'cust_name' => [] }
event.to_hash.each do |key,value|
map[key] ||= value unless key == 'cust_name'
map['locations']['cust_name'] << value if key == 'cust_name'
end
"
push_previous_map_as_event => true
timeout => 5
}
}
However I am not able to form the JSON array of locations.
Can someone provide any help on this.
You need to understand how logstash event works.
Every event goes through filter/output. Aggregate filter does not filter out events. At output, you need to use if statement to discard none aggregated event. Otherwise last event stands.
You will need to create a table valued function containing FOR JSON PATH in SQL Server 2016 and use CROSS APPLY on that function to get results in the nested structure you want. Once you have that ready you will want to use the following ruby filter in your logstash config file to let ES know that the data in your nested column is in correct JSON format.
BEGIN
DECLARE @jsonvariable nvarchar(max);
select @jsonvariable = (
SELECT
[cust_name]
FROM table
where id = @id
FOR JSON PATH
)
INSERT @retcust_name select @jsonvariable
RETURN
END
If id = xyz has 4 cust_name, it will return
"cust_name_nested": [
{ "cust_name" = 1},
{ ... },
{ ... },
{ ... }
]
You need to use cross apply with this function to get all documents....
This is what i would do. Let me know if it works for you.
Here is a working example using the data you provided. Just make sure to set Logstash filter workers to 1 (-w 1 flag)
jdbc {
...
//assuming you want to put cust_name, address etc under [location]
statement => "select id,name,age, cust_name,address from atable order by id,name,age"
type => "atype"
....
}
filter:{
//use @metadata field to prevent saving the field
mutate {
add_field => { "[@metadata][type]" => "%{type}" }
remove_field => [ "type" ]
}
if "atype" == [@metadata][type] {
aggregate {
task_id => "%{id}%{name}%{age}"
code => "
map['id'] ||= event.get('id')
map['@metadata'] = event.get('@metadata')
map['name'] ||= event.get('name')
map['age'] ||= event.get('age')
map['location'] ||= []
map['location']< < { 'cust_name' => event.get('cust_name'),
'address' => event.get('address'),
...
}
"
push_previous_map_as_event => true
timeout =>5
timeout_tags => ['aggregated']
}
}
}
output {
//only ingest to ES when aggregated. timeout_tags put it there
if "aggregated" in [tags]{
elasticsearch {
index => "aindex"
document_type => "%{[@metadata][type]}"
document_id => "%{id}"
hosts => ......
action => "update"
doc_as_upsert => true
}
}
}
The above script has the type metadata stuff, you can ignote it. It is in the case you have multiple JDBC input, that you need to aggregate according to the type. (That is why set the type in JDBC input).
The key of aggregation is the task_id, which is where the aggregate comes from (grouping). Your example requires 3 fields aggregation, so task_id need to use 3 fields.
timeout_tags => ['aggregated'] is another key, you need to use the if "aggregated" in [tags]{
in the output to ES to filter events other than the aggregated event.
So, your example output would be like:
[{
"id": 1,
"name": "Test",
"age": 25,
"location": [
{
"cust_name": "T1"
"address":...
...
},
{
"cust_name": "T2"
"address":...
...
},
{
"cust_name": "T3"
"address":...
...
},
{
"cust_name": "T4"
"address":...
...
}]
},
{
"id": 2,
......
},
....
]
Is this the result you expect?
Thanks for your help. I tried what you suggested. I can see the data in the format i am expecting in stdout. However the same is not showing up in Elastic Search. In Elastic Search I am seeing IDs outside of my join condition. I am using inner join so only matching IDs should show up.
I have not setup Kibana yet. What I am not able to figure out is, if I am seeing the data correctly in stdout when running the logstash conf file why is it not showing up properly in Elastic Search.
Also why am I getting IDs which do not satisfy my Join condition.
First, you should add "order by id" sql clause to be sure that first, you have all result lines with id=1, then all result lines with id=2, etc...
Then, given your elasticsearch mapping, your aggregate config and the stdout result which is what you expect, there is no reason you get something different in elasticsearch.
Finally, I'm not sure to understand what you try to do in elasticsearch and what's wrong. For instance, inner join does not exist in elasticsearch. Maybe you could show your elasticsearch query and the results and explain what's wrong in a concrete example ?
I do have the order by ID in my SQL statement.
My Elastic Search Type is a combination of 2 Tables. For e.g. I am storing array of cust_name (which can be multiple for a given ID) in the same Type.
My Logstash Conf File:
input {
jdbc {
jdbc_connection_string => "jdbc:sqlserver://dbname.database.windows.net;databaseName=TEST_DB;user=username;password=Password;"
jdbc_user => "username"
jdbc_password => "Password"
jdbc_driver_library => "../../../../etc/logstash/driver/sqljdbc_6.0/enu/jre8/sqljdbc42.jar"
jdbc_driver_class => "com.microsoft.sqlserver.jdbc.SQLServerDriver"
schedule => "* * * * *"
statement => "SELECT c.ID as 'id',c.NAME as 'name', c.AGE as 'age',l.LocationName as 'colocation.cust_name' from customers c inner join location l on c.id=l.customerid order by c.id"
}
}
filter {
aggregate {
task_id => "%{id}"
code => "
map['colocation'] || = {
'cust_name' => []
}
event.to_hash.each do |key, value |
map[key] || = value unless key == 'colocation.cust_name'
map['colocation']['cust_name'] << value
if key == 'colocation.cust_name'
end
"
push_previous_map_as_event => true
timeout => 5
}
}
output {
elasticsearch {
hosts => "localhost:9200"
index => "lc-data"
document_type => "customers2"
document_id => "%{id}"
codec => rubydebug
}
stdout {
codec => rubydebug
}
}
I tried by removing "codec" and I can see the data in the format I was expecting.
Why am I still seeing IDs which are not returned as part of my SQL queries. The SQL query only returns few IDs but when running log stash all the IDs are being fetched.
The data is changed after some time. Not able to understand why is this happening
Apache, Apache Lucene, Apache Hadoop, Hadoop, HDFS and the yellow elephant
logo are trademarks of the
Apache Software Foundation
in the United States and/or other countries.