skjoos
January 27, 2020, 3:51pm
1
Hello!
I'm trying to aggregate some data from a MS SQL database by using Logstash.
Below is a view of the database.
The jbdc field and my aggregation filter look like this:
jdbc {
....
statement =>
"
select
d.Id,
u.FirstName
from Document d
full join Approvers a
on d.Id = a.DocumentId
full join [User] u
on a.UserId = u.Id
"
}
filter {
aggregate {
task_id => "%{d.Id}"
code => "
map['document_id'] || = event.get('d.Id')
map['approvers'] ||= []
map['approvers'] << {'first_name' => event.get('u.FirstName')}
event.cancel()
"
push_previous_map_as_event => true
timeout => 3
}
}
If I execute the statment inside the jbdc field above in the database i would get this data:
| Id | FirstName |
| 1 | Nils |
| 2 | Rudolf |
| 2 | Olle |
| 3 | NULL |
And the way I would like to aggregate the data look something like this:
[
{
"document_id": "1",
"approvers": [
{ "first_name": "Nils"}
]
},
{
"document_id": "2",
"approvers": [
{ "first_name": "Rudolf"},
{ "first_name": "Olle"}
]
},
{
"document_id": "3",
"approvers": []
},
]
Any clue on how to fix it? Am I missing something?
Thank you!
Badger
January 27, 2020, 4:23pm
2
skjoos:
"%{ d.Id}"
Why do you have that space inside the {} ?
skjoos
January 28, 2020, 9:11am
3
@Badger Just a typo for this example, fixed now.
Still got the same error.
ITIC
(Jordi)
January 28, 2020, 10:15am
4
Hi
What do you get if you run your config? Can you show us the actual output you are getting from stdout{}
?
Are you getting any error messsages in your logs? Or is it just that the output you get is not what you need?
Have you made sure your pipeline has only one worker? This is a requirement for the aggregate{}
filter.
Hope this helps.
skjoos
January 28, 2020, 1:33pm
5
Hi
I get no errors when running the config and the config runs with -w 1 .
It feels like the filter isn't doing anything at all. Does Logstash recognize fields with a dot like Document.Id / d.Id?
One thing I'm really unsure about is the task_id , what id should I use when using joins?
The result is below:
1 hit
{
"_index": "randomindex",
"_type": "_doc",
"_id": "OFpE7G8BDQZMTmIc_gTh",
"_score": 1,
"_source": {
"@version": "1",
"first_name": "Nils",
"document_id": 1,
"@timestamp": "2020-01-28T13:09:02.005Z"
},
"fields": {
"@timestamp": [
"2020-01-28T13:09:02.005Z"
]
}
}
2 hit
{
"_index": "randomindex",
"_type": "_doc",
"_id": "OVpE7G8BDQZMTmIc_gTh",
"_score": 1,
"_source": {
"@version": "1",
"first_name": "Olle",
"document_id": 2,
"@timestamp": "2020-01-28T13:09:01.994Z"
},
"fields": {
"@timestamp": [
"2020-01-28T13:09:01.994Z"
]
}
}
3 hit
{
"_index": "randomindex",
"_type": "_doc",
"_id": "N1pE7G8BDQZMTmIc_gTh",
"_score": 1,
"_source": {
"@version": "1",
"first_name": null,
"document_id": 3,
"@timestamp": "2020-01-28T13:09:02.006Z"
},
"fields": {
"@timestamp": [
"2020-01-28T13:09:02.006Z"
]
}
}
4 hit
{
"_index": "randomindex",
"_type": "_doc",
"_id": "OlpE7G8BDQZMTmIc_wRi",
"_version": 1,
"_score": 0,
"_source": {
"@version": "1",
"first_name": "Rudolf",
"document_id": 2,
"@timestamp": "2020-01-28T13:09:02.005Z"
},
"fields": {
"@timestamp": [
"2020-01-28T13:09:02.005Z"
]
}
}
ITIC
(Jordi)
January 28, 2020, 2:10pm
6
Hi
I think you should use document_id
as task_id
in your aggregate{}
filter (task_id => "%{document_id}").
Try commenting out the code
section for now, or leaving it empty (code => ""
).
Your timeout
is only 3 seconds. Try increasing it.
Since you do not have beginning or end events, you should probably use timeout_task_id_field => "task_id"
and set a value to inactivity_timeout
.
Hope this helps
Badger
January 28, 2020, 2:59pm
7
In addition to '-w 1' you will need to disable pipeline.java_execution .
system
(system)
Closed
February 25, 2020, 2:59pm
8
This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.