Merging the data column wise from two different indexes

Hi,

I am trying to merge two indexes column wise based on a common key from both the indices. I have tried logstash ruby code, transforms but nothing seems working to me. I am getting individual records having the columns containing either from index1 or index2 but not combined. Please help how i can achieve.

Index1:
{
"_index": "order_details",
"_id": "xxxxxx",
"_version": 1,
"_score": 0,
"_source": {
"amount": 1200,
"shop": "shop_12345",
"@timestamp": "2024-07-17T15:55:46.258578035Z",
"key": "12345678",
"@version": "1",
"source": "AMAZON"
}
},
{
"_index": "order_details",
"_id": "xxxxxx",
"_version": 1,
"_score": 0,
"_source": {
"amount": 500,
"shop": "shop_12345",
"@timestamp": "2024-07-17T15:55:46.258578035Z",
"key": "12345679",
"@version": "1",
"source": "AMAZON"
}
},
{
"_index": "order_details",
"_id": "xxxxxx",
"_version": 1,
"_score": 0,
"_source": {
"amount": 120,
"shop": "shop_12345",
"@timestamp": "2024-07-17T15:55:46.258578035Z",
"key": "12345680",
"@version": "1",
"source": "AMAZON"
}
}

Index2:
{
"_index": "overall_shop",
"_id": "XXXXXXXX",
"_version": 1,
"_score": 0,
"_source": {
"source": "FLIPKART",
"Order Net Value": "2326",
"shop": "shop_12345",
}
}

As i have same filed names in both the indexes fields like shop, source are to be named as shop1, source1 from index1 and shop2, source2 from index2.
Merged index:
{
"_index": "merged_details",
"_id": "xxxxxx",
"_version": 1,
"_score": 0,
"_source": {
"amount": 1200,
"shop1": "shop_12345",
"@timestamp": "2024-07-17T15:55:46.258578035Z",
"key": "12345678",
"@version": "1",
"source1": "AMAZON",
"source2": "FLIPKART",
"Order Net Value": "2326",
"shop2": "shop_12345"
}
},
{
"_index": "merged_details",
"_id": "xxxxxx",
"_version": 1,
"_score": 0,
"_source": {
"amount": 500,
"shop1": "shop_12345",
"@timestamp": "2024-07-17T15:55:46.258578035Z",
"key": "12345679",
"@version": "1",
"source1": "AMAZON",
"source2": "FLIPKART",
"Order Net Value": "2326",
"shop2": "shop_12345"
}
},
{
"_index": "merged_details",
"_id": "xxxxxx",
"_version": 1,
"_score": 0,
"_source": {
"amount": 120,
"shop1": "shop_12345",
"@timestamp": "2024-07-17T15:55:46.258578035Z",
"key": "12345680",
"@version": "1",
"source1": "AMAZON",
"source2": "FLIPKART",
"Order Net Value": "2326",
"shop2": "shop_12345"
}
}

In case it helps, I wrote some ideas in this discussion: Need Help with Merging Data from Two Elasticsearch Indices - #3 by prashant1

Thankyou @dadoonet , Will try with elasticsearch filter plugin as you suggested. But till now, i have tried in logstash with aggregate filter in many ways but end up getting individual records from each index separately instead of getting single record with merged fields from both the indexes.

input {
  elasticsearch {
    cloud_id => "xxxxxxxxxxxxxxxxxxxx"
        proxy => "xxxxxxxxxxxxxxx"
    index =>  "index1"
    query => '{"query": {"match_phrase": {"shop": "12345"}},"_source": ["Price","shop", "source"]}'
    ssl => true
    user => "xxxxxxxxxx"
    password => "xxxxxxxxxxxxx"
  }
  elasticsearch {
    cloud_id => "xxxxxxxxxxxxxxx"
    proxy => "xxxxxxxxxxxxxxxx"
    index =>  "index2"
    query => '{"query": {"term": {"shop": "12345"}},"_source": ["amount","shop"]}'
    ssl => true
    user => "xxxxxxxxxxxxx"
    password => "xxxxxxxxxxxxxxxxxx"
  }
}
 
 
filter {
    mutate {
      add_field => {
        "common_key" => "%{shop}"
        "Secondary" => "%{source}"
        "price" => "%{price}"
        "amount" => "%{amount}"
      }
    }
 
  # Aggregate data using the common key
  aggregate {
    task_id => "%{common_key}"
    code => "
      map['shop'] ||= event.get('common_key');
      map['source'] ||= event.get('source');
      map['amount'] ||= event.get('amount');
      map['price'] ||= event.get('price');
    "
    push_map_as_event_on_timeout => true
    timeout_task_id_field => "common_key"
    timeout => 60  # Adjust timeout as per your requirement
  }
}
 
output {
   stdout {
        codec => rubydebug
   }
    elasticsearch {
        cloud_id => "xxxxxxxxxxxx"
        proxy => "xxxxxxxxxxxx"
        index => "index3"
        ssl => true
        user => "xxxxxxxxxxxx"
        password => "xxxxxxxxxxx"
        action => "create"
    }
}

Output I am getting:

Doc1:
price       shop             source
---------    ---------         -------------
50            12345         amazon

Doc2:
  shop             amount
  ---------         -------------
  12345         400
                                  

Yeah. That won't work.
Use an input plus a filter instead of 2 inputs.

Hi @dadoonet, Thank you so much. The solution worked by using input elasticsearch filter and using a elastic filter for look up in the logstash filter section.
I need to now calculate the sum of the price for each shop and add it into the index3 as a field. Could you please help how i can achieve this in the same logstash pipeline.

Source1	     price	total_price	shop_id1	source2	shop_id2	amount
AMAZON	     234	337	           12345	FLIPKART	12345	400
AMAZON	     44	    337	           12345	FLIPKART	12345	400
AMAZON	     6	    337	           12345	FLIPKART	12345	400

As this one is solved, could you open a new discussion in Logstash? I think it's the best place for it.

Thank you @dadoonet , I have created a new topic for this. Calculating the Sum of a field by grouping