Map MySQL columns into nested fields Elastic using Logstash

Hello,

I am struggling getting done some filtering to get in Elastic the information in the format I want. First, I have a MySQL table that I process in the input (no problem in that part). I want some of the columns to be outputed under a common nested field. E.g.:

author: Authorname
performer: Performername
(these are two columns in MySQL)

And I want this to be in Elasticsearch:

    "artists": [{
      "artist_name": "Authorname",
      "artist_role": "AUTHOR"
    },
    {
      "artist_name": "Performername",
      "artist_role": "PERFORMER"
    }]

Similarly, I have two columns with "original_title" and "alternative_title" which I want to be processed as for the case above, resulting into:

    "titles": [{
      "title_name": "The original title name",
      "title_type": "ORIGINAL"
    },
    {
      "title_name": "An alternative title name",
      "title_type": "ALTERNATIVE"
    }]

I have been playing a lot with aggregatable and mutate but I cannot get a solution. I sometimes partially get one of the two cases done, but the other one does not appear in Elastic, and the other fields that are not going to be nested are not processed at all. I guess I am not understanding how aggregatable work in terms of timeout and data processing. Take into account that I will have several rows, and want that format to be applied in each row translated to each corresponding Elasticsearch element.

I leave here my current filter status so that someone can bring light into it:

filter {
    aggregate {
        task_id => "%{id}" ### I do not know exactly what id to use here
        code => "
            map['artists'] ||= []
              map['artists'] << {'artist_name' => event.get('author'), 'ipi_role' => 'AUTHOR'}
              map['artists'] << {'artist_name' => event.get('performer'), 'ipi_role' => 'PERFORMER'}
              event.cancel()
        "
       push_previous_map_as_event => true
       timeout => 3
    }
    aggregate {
        task_id => "%{id}"  ### I do not know exactly what id to use here
        code => "
            map['titles'] ||= []
              map['titles'] << {'title_name' => event.get('original_title'), 'title_type' => 'ORIGINAL'}
              map['titles'] << {'title_name' => event.get('alternative_title'), 'title_type' => 'ALTERNATIVE'}
              event.cancel()
        "
       push_previous_map_as_event => true
       timeout => 3
    }    
    mutate {
      copy => { "id" => "[@metadata][_id]"}
      rename => {
        "producer" => "[producer_name][name]" 
      }
      remove_field => ["id", "@version", "unix_ts_in_secs"]
  }  
}

UPDATE: If I change the Ids of the two aggregate:

task_id => "%{author}"
...
task_id => "%{original_title}"

I get this in Elastic, which is ok for the part that is being processed, but it is missing the other nested fields, the rest of the regular fields, and the id is also messed up.

{
  "_index": "test_logstash",
  "_type": "_doc",
  "_id": "%{[@metadata][_id]}",
  "_version": 1,
  "_score": 0,
  "_source": {
    "@timestamp": "2022-05-16T12:58:00.090646200Z",
    "artists": [
      {
        "artist_name": "Authorname",
        "artist_role": "AUTHOR"
      },
      {
        "artist_name": "Performername",
        "artist_role": "PERFORMER"
      }
    ]
  }
}

In summary, the key point is that I do not want to group rows with same, e.g., title, but just row by row re-arrange some columns into this nested-like structure.

UPDATE 2:

I have slightly moved forward, being able to get both nested fields processed by using this code:

    aggregate {
        task_id => "%{id}"
        code => "
            map['artists'] ||= []
              map['artists'] << {'artist_name' => event.get('author'), 'artist_role' => 'AUTHOR'}
              map['artists'] << {'ipi_name' => event.get('performer'), 'artist_role' => 'PERFORMER'}  
            map['titles'] ||= []
              map['titles'] << {'title_name' => event.get('original_title'), 'title_type' => 'ORIGINAL'}
              map['titles'] << {'title_name' => event.get('alternative_title'), 'title_type' => 'ALTERNATIVE'}
              event.cancel()
        "
       push_previous_map_as_event => true
       timeout => 3
    }

    mutate {
      copy => { "id" => "[@metadata][_id]"}
      rename => {
        "series_name" => "[series_name][name]"        
      }
      remove_field => ["id", "@version", "unix_ts_in_secs"]
    }

However, still the rest of the fields (columns) are not processed, displaying a situation similar to the elastic output displayed above.

Thank you very much in advance.

Best regards,

Alejandro.-

Hi,

What are the rows you need to create one Elasticsearch document?
Do you need multiple rows for single Elasticsearch document? Or you will create single Elasticsearch document from single row.

Please share the rows (or single row) as csv or something.

Hi Tom,

Thanks for the reply.

I need a single Elasticsearch document from a single row

The information I have in my MySQL table is as follows:

And I need, from such example, two Elasticsearch documents that look like:

{
  "_index": "test_logstash",
  "_type": "_doc",
  "_id": "1",
  "_version": 1,
  "_score": 0,
  "_source": {
    "@timestamp": "2022-05-16T12:58:00.090646200Z",
    "artists": [
      {
        "artist_name": "Authorname",
        "artist_role": "AUTHOR"
      },
      {
        "ipi_name": "Performername",
        "artist_role": "PERFORMER"
      }
    ],
    "titles": [
      {
        "original_title": "The original title name",
        "title_type": "ORIGINAL"
      },
      {
        "ipi_name": "An alternative title name",
        "ipi_role": "ALTERNATIVE"
      }
    ],
    "ext_code" : "1234",
    "year" : "1997",
    "series_name" : "Foo",
    "duration" : "142"
  }
}

For the first one.

And:

{
  "_index": "test_logstash",
  "_type": "_doc",
  "_id": "2",
  "_version": 1,
  "_score": 0,
  "_source": {
    "@timestamp": "2022-05-16T12:58:00.090646200Z",
    "artists": [
      {
        "artist_name": "Second Author",
        "artist_role": "AUTHOR"
      },
      {
        "ipi_name": "Second performer",
        "artist_role": "PERFORMER"
      }
    ],
    "titles": [
      {
        "original_title": "Second original title",
        "title_type": "ORIGINAL"
      },
      {
        "ipi_name": "Its second alternative title",
        "ipi_role": "ALTERNATIVE"
      }
    ],
    "ext_code" : "2921",
    "year" : "2009",
    "series_name" : "Bar",
    "duration" : "842"
  }
}

For the second one.


UPDATE:

Another alternative I am trying is the following, but it only creates the first row, and not the second one:

filter {
    mutate {
      copy => { "id" => "[@metadata][_id]"}
    }
    aggregate {
        task_id => "%{id}"
        code => "
            map['id'] = event.get('id')
            map['ext_code'] = event.get('ext_code')
            map['year'] = event.get('year')
            map['duration'] = event.get('duration')
            map['series_name'] ||= []
            map['series_name'] << {'name' => event.get('series_name')}
            map['artists'] ||= []
              map['artists'] << {'artist_name' => event.get('author'), 'artist_role' => 'AUTHOR'}
              map['artists'] << {'artist_name' => event.get('performer'), 'artist_role' => 'PERFORMER'}
            map['titles'] ||= []
              map['titles'] << {'title_name' => event.get('original_title'), 'title_type' => 'ORIGINAL'}
              map['titles'] << {'title_name' => event.get('alternative_title'), 'title_type' => 'ALTERNATIVE'}
              event.cancel()
        "
       push_previous_map_as_event => true
    }
}

Which creates only this one document in Elastic:

{
  "_index": "test_logstash",
  "_type": "_doc",
  "_id": "1",
  "_version": 1,
  "_score": 0,
  "_source": {
     "@version": "1",
    "ext_code": 1234,
    "artists": [
      {
        "artist_role": "AUTHOR",
        "artist_name": "Authorname"
      },
      {
        "artist_role": "PERFORMER",
        "artist_name": "Performername"
      }
    ],
    "year": 1997,
    "id": 1,
    "titles": [
      {
        "title_name": "The original title name",
        "title_type": "ORIGINAL"
      },
      {
        "title_name": "An alternative title name",
        "title_type": "ALTERNATIVE"
      }
    ],
    "@timestamp": "2022-05-17T10:27:09.797732800Z",
    "series_name": [
      {
        "name": "Foo"
      }
    ],
    "duration": 142
  }
}

Best regards,

Alejandro.-

In such case, you don't need aggregate filter plugin. JDBC input plugin creates one event per one row. Aggregate filter plugin works for aggregating multiple events to single event.

Use simple ruby filter to arrage the event. Something like:

event.set("artists", [{"artist_name"=>event.get("author"), "artist_role"=>"AUTHOR"
},{"artist_name"=>event.get("performer"),"artist_role"=>"PERFORMER"}])

Thank you so much. This is exactly what I needed!

Best,

Alejandro.-

1 Like

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.