Logstash Pipeline for Nested data from sql

Hi,
I am trying to insert nested data from sqldb to Elasticsearch index.

I create a query with joins and insert data into Elasticsearch index.
I used aggregate, but it is not giving correct result.

Mapping

PUT testing1111
{
    "mappings" : {
      "properties" : {
        "my_task_id" : {
          "type" : "long"
        },
       "my_details" : {
          "type" : "text"
        },
		"posts" : {
          "type" : "nested",
		  "properties": {
          "s_id": {
            "type": "long"
          },
		  "status": {
            "type": "text"
          }
        }}
      }
   }
}

aggregate Filter

filter {
    aggregate {
        task_id => "%{my_task_id}"
        code => "
            map['my_task_id'] = event.get(my_task_id)
			map['my_details'] = event.get(my_details)
			map['posts'] ||= []
            map['posts'] << {

                'status' => event.get('status'),
                's_id' => event.get(s_id)
            }
        event.cancel()"
        push_previous_map_as_event => true
        timeout => 80
    }
}

Data is

elasticsearch_nested_error

Target to insert data into this format into index

{ "my_task_id": 2,
  "my_details" : "solve problem two",
  "posts" : [
  {"status" : 1,
  "s_id" : 3
  },
  {"status" : 1,
  "s_id" : 7
  },
  {"status" : 2,
  "s_id" : 99
  },
  {"status" : 2,
  "s_id" : 43
  }
  ]
}


Data coming form sql. I make a jdbc connection, my_task_id and my_details field coming from table A and status, s_id coming from table B

I also used mutate filter but did not achieve targeted result.

Thanks

What result does it produce, and what do you not like about it?

Hi,
I find the error.
I miss single quotes on my_task_id ,my_details and s_id.

Correct filter is Here

filter {
    aggregate {
        task_id => "%{my_task_id}"
        code => "
            map['my_task_id'] = event.get('my_task_id')
			map['my_details'] = event.get('my_details')
			map['posts'] ||= []
            map['posts'] << {

                'status' => event.get('status'),
                's_id' => event.get('s_id')
            }
        event.cancel()"
        push_previous_map_as_event => true
        timeout => 80
    }
}

Now it's Working.

Human Error :slight_smile:

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.