Create index by aggregating existing index

I created an index as below:
"hits" : [
{
"_index" : "family",
"_type" : "doc",
"_id" : "1",
"_score" : 1.0,
"_source" : {
"fkm_family" : "1",
"pkm_article" : "1001"
}
},
{
"_index" : "family",
"_type" : "doc",
"_id" : "2",
"_score" : 1.0,
"_source" : {
"fkm_family" : "1",
"pkm_article" : "1002"
}
},
{
"_index" : "family",
"_type" : "doc",
"_id" : "3",
"_score" : 1.0,
"_source" : {
"fkm_family" : "1",
"pkm_article" : "1003"
}
},
{
"_index" : "family",
"_type" : "doc",
"_id" : "4",
"_score" : 1.0,
"_source" : {
"fkm_family" : "2",
"pkm_article" : "1004"
}
},
{
"_index" : "family",
"_type" : "doc",
"_id" : "5",
"_score" : 1.0,
"_source" : {
"fkm_family" : "2",
"pkm_article" : "1005"
}
},
{
"_index" : "family",
"_type" : "doc",
"_id" : "6",
"_score" : 1.0,
"_source" : {
"fkm_family" : "2",
"pkm_article" : "1006"
}
},
{
"_index" : "family",
"_type" : "doc",
"_id" : "7",
"_score" : 1.0,
"_source" : {
"fkm_family" : "2",
"pkm_article" : "1007"
}
}
]
I like to create a new index which shows data like:
"_index" : "article_data",
"_type" : "_doc",
"_id" : "2",
"_score" : 1.0,
"_source" : {
"fkm_family" : "2",
"@timestamp" : "2019-11-07T21:28:37.721Z",
"articles" : [
{
"pkm_article" : "1006"
},
{
"pkm_article" : "1007"
},
{
"pkm_article" : "1005"
}
],
"@version" : "1",
"tags" : [
"_aggregatefinalflush"
]
}....
and so on...
basically sort of group (aggregate) by "fkm_family" field.
I am using the following filter:
filter {
aggregate {
task_id => "%{fkm_family}"
code => "
map['fkm_family'] = event.get('fkm_family')
map['articles'] ||=
map['articles'] << {
'pkm_article' => event.get('pkm_article')
}
event.cancel()
"
push_previous_map_as_event => true
timeout => 3
}
}
It should show 3 elements under "fkm_family" : "1" and 4 elements under "fkm_family" : "2"
But when I run, it returns different result every time.
Sometimes it returns 3 items under "fkm_family" : "2", when I rerun again it returns 1 element under "fkm_family" : "2".
What am I missing?

Are you using pipeline.workers 1?

I am new on this and sorry for the stupid question.
How can I check this?

It is not a stupid question.

By default logstash will use about as many worker threads as there are CPUs. For an aggregate, that means different events get aggregated by different threads, so when you re-run a file it may get aggregated in different way.

If you are on a multi-CPU system and did not take action to prevent it then you are using multiple threads. You either use '-w 1' or '--pipeline.workers 1' on the command line, or, if you are using pipelines.yml, the you can set the pipeline.workers for a particular pipeline, or, I expect, you can set the default number of pipeline workers in logstash.yml

Thanks.
This worked for this example. I am testing with a large and almost real example.

I am using the same logic on the other index and for some reason it doesn't aggregate all indexes.
Unfortunately the sample is too big that I cannot post here.
Any general tip that I can look to do the check list?
Could it be because the original index has some issue like not all documents have the exact same fields? Although they all have the field used by the aggregation.

I searched a little bit more and found the output index result is not coming all from the index defined as input but some is coming from other index with similar name (documents22)!!
Below is the logstash configuration file: DocumentsAggr.conf

input {
elasticsearch {
hosts => ["http://localhost:9200"]
user => "abc"
password => "efg"
index => "documents2"
}
}

filter {
aggregate {
task_id => "%{l_num2}"
code => "
map['documents'] ||=
map['documents'] << {
'content' => event.get('content'),
'h_id' => event.get('h_id')
}
map['l_num2'] = event.get('l_num2')
event.cancel()
"
push_previous_map_as_event => true
timeout => 3
}
}

output {
elasticsearch {
hosts => ["http://localhost:9200"]
ssl_certificate_verification => "false"
user => "abc"
password => "efg"
index => "documents4"
document_id => "%{l_num2}"
doc_as_upsert => "true"
action => "update"
}

}

Here is how I call it:
./logstash -f /logstash/bin/LogstashConfs/DocumentsAggr.conf -w 1
very scary!!!!
FYI input index is originally created from nutch crawler if that is relevant.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.