sachi_mr
(Sachin)
May 16, 2019, 10:53pm
1
I am trying to get the tags to work with filter and output and so far no luck with it . The tags are required to import data from multiple tables conditionally. when the tags are applied to aggregate and output plugins they dont seem to import any data into the index. Below is a sample logstash config file. Any help is appreciated.
input {
jdbc {
jdbc_connection_string => "jdbc:sqlserver://DEV.localhost.com:3181;databaseName=SQL;user=DBA;password=DBA"
jdbc_user => "DBA"
jdbc_password => "DBA"
jdbc_driver_library => "C:/Apps/mssqljdbc/sqljdbc_6.2/enu/mssql-jdbc-6.2.2.jre8.jar"
jdbc_driver_class => "com.microsoft.sqlserver.jdbc.SQLServerDriver"
statement => "SELECT * from category"
use_column_value => true
tracking_column => catid
tracking_column_type => "numeric"
clean_run => true
record_last_run => true
tags => ["category"]
}
jdbc {
jdbc_connection_string => "jdbc:sqlserver://DEV.localhost.com:3181;databaseName=SQL;user=DBA;password=DBA"
jdbc_user => "DBA"
jdbc_password => "DBA"
jdbc_driver_library => "C:/Apps/mssqljdbc/sqljdbc_6.2/enu/mssql-jdbc-6.2.2.jre8.jar"
jdbc_driver_class => "com.microsoft.sqlserver.jdbc.SQLServerDriver"
statement => "SELECT * from subcategory"
use_column_value => true
tracking_column => subcatid
tracking_column_type => "numeric"
clean_run => true
record_last_run => true
tags => ["subcategory"]
}
}
filter {
if "category" in [tags] {
aggregate {
task_id => "%{[catid]}"
code => "
map['catid'] = event.get('catid')
map['category'] ||= []
map['category'] << {'catid' => event.get('catid'), 'col2' => event.get('col2'), 'cat' => event.get('cat')}
map['category_subcategory'] ||= []
map['category_subcategory'] << {'name' => 'category'}
event.cancel()
"
push_previous_map_as_event => true
timeout => 3
}
} else if "subcategory" in [tags] {
aggregate {
task_id => "%{[subcatid]}"
code => "
map['catid'] = event.get('catid')
map['category'] ||= []
map['category'] << {'subcatid' => event.get('subcatid'), 'col3' => event.get('col3'), 'cat' => event.get('cat')}
map['category_subcategory'] ||= []
map['category_subcategory'] << {'name' => 'category', 'parent' => event.get('catid')}
event.cancel()
"
push_previous_map_as_event => true
timeout => 3
}
}
}
output {
if "category" in [tags] {
elasticsearch {
hosts => ["http://localhost:9200"]
index => "category"
document_id => "category-%{[catid]}"
ssl_certificate_verification => false
workers => 1
}
}
else if "subcategory" in [tags] {
elasticsearch {
hosts => ["http://localhost:9200"]
index => "category"
document_id => "subcategory-%{[subcatid]}"
ssl_certificate_verification => false
workers => 1
}
}
}
Badger
May 17, 2019, 12:39pm
2
If you remove the elasticsearch outputs and just use
output { stdout { codec => rubydebug } }
do the events have the category and subcategory tags? I am wondering if the problem is in your aggregate filters.
sachi_mr
(Sachin)
May 17, 2019, 2:18pm
3
Using stdout has no difference. Still no records in the output. For sure the problem is with the aggregate filters. The events do not have category or subcategory tags. I have tried adding them with static values and still no luck.
OK, so if you get rid of the filter section as well, what does an event look like in the rubydebug output?
sachi_mr
(Sachin)
May 17, 2019, 2:29pm
5
Getting rid of the filters results in the desired output. Its just i am unable to figure out why the same does not with filters.
If you will tell us what data you are trying to manipulate we cannot tell you why your manipulations are failing.
sachi_mr
(Sachin)
May 17, 2019, 3:05pm
7
The plan is to import data from multiple tables using the join data type. These tables contain parent child relationships. Based on the documentation, i have decided to use the Join datatype and organize the data based on the table names.
Below is an example
table **category** contains the columns
- catid
- catname
- col2
- col3
table **subcategory** contains the columns
- subcatid
- subcatname
- catid ( foreign key to category catid)
- col5
The aggregate plugin should organize the data into below
document
- category
- catid
- catname
- col2
- col3
- subcategory
- subcatid
- subcatname
- catid
- col5
- category_subcategory
- name
- parent ( added only for subcategory records )
The tags are required to conditionally add category and subcategory fields so that searching is easier.
The reason I am asking to see your data is that your filters work just fine, which suggests that your data does not look the way you think it looks.
Of course I am inferring the data structure from the filter configuration, but this
input { generator { count => 1 lines => [ 'foo,1,a', 'foo,2,b', 'bar,1,a' ] } }
filter {
mutate { add_tag => [ "category"] }
csv { columns => [ "catid", "col2", "cat" ] }
if "category" in [tags] {
aggregate {
task_id => "%{[catid]}"
code => '
map["catid"] = event.get("catid")
map["category"] ||= []
map["category"] << {"catid" => event.get("catid"), "col2" => event.get("col2"), "cat" => event.get("cat")}
map["category_subcategory"] ||= []
map["category_subcategory"] << {"name" => "category"}
event.cancel()
'
push_previous_map_as_event => true
timeout => 3
}
}
}
gets me
{
"catid" => "foo",
"@timestamp" => 2019-05-17T17:21:15.264Z,
"@version" => "1",
"category_subcategory" => [
[0] {
"name" => "category"
},
[1] {
"name" => "category"
}
],
"category" => [
[0] {
"catid" => "foo",
"cat" => "a",
"col2" => "1"
},
[1] {
"catid" => "foo",
"cat" => "b",
"col2" => "2"
}
]
}
{
"catid" => "bar",
"@timestamp" => 2019-05-17T17:21:15.478Z,
"@version" => "1",
"category_subcategory" => [
[0] {
"name" => "category"
}
],
"category" => [
[0] {
"catid" => "bar",
"cat" => "a",
"col2" => "1"
}
],
"tags" => [
[0] "_aggregatefinalflush"
]
}
sachi_mr
(Sachin)
May 17, 2019, 8:07pm
9
i still see no output once the if condition is added to the output.
code sample below
input { generator { count => 1 lines => [ 'foo,1,a', 'foo,2,b', 'bar,1,a' ] } }
filter {
mutate { add_tag => [ "category"] }
csv { columns => [ "catid", "col2", "cat" ] }
if "category" in [tags] {
aggregate {
task_id => "%{[catid]}"
code => '
map["catid"] = event.get("catid")
map["category"] ||= []
map["category"] << {"catid" => event.get("catid"), "col2" => event.get("col2"), "cat" => event.get("cat")}
map["category_subcategory"] ||= []
map["category_subcategory"] << {"name" => "category"}
event.cancel()
'
push_previous_map_as_event => true
timeout => 3
}
}
}
output {
if "category" in [tags] {
stdout { codec => rubydebug }
}
}
Badger
May 17, 2019, 8:14pm
10
Doh! Just spotted the issue. Add this to the code in your ruby filter
map["tags"] = event.get("tags")
Do you need to merge the tags arrays of every event that you aggregate?
sachi_mr
(Sachin)
May 17, 2019, 9:25pm
11
Thanks Badger . Adding the tags to the map did the trick. I was trying to use the below which should have resulted in same behavior.
aggregate {
add_tag => [ "category"]
task_id => "%{[catid]}"
}
I have also noticed that now tags is also a field in the document which should be ok in this case.
Below is a working sample for everybody who has similar issues.
input { generator { count => 1 lines => [ 'foo,1,a', 'foo1,2,b', 'bar,1,a' ] } }
filter {
mutate { add_tag => [ "category"] }
csv { columns => [ "catid", "col2", "cat" ] }
if "category" in [tags] {
aggregate {
task_id => "%{[catid]}"
code => '
map["catid"] = event.get("catid")
map["tags"] = event.get("tags")
map["category"] ||= []
map["category"] << {"catid" => event.get("catid"), "col2" => event.get("col2"), "cat" => event.get("cat")}
map["category_subcategory"] ||= []
map["category_subcategory"] << {"name" => "category"}
event.cancel()
'
push_previous_map_as_event => true
timeout => 3
}
}
}
output {
if "category" in [tags] {
stdout { codec => rubydebug }
}
}
system
(system)
Closed
June 14, 2019, 9:25pm
12
This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.