Add new values in the nested field - Logstash to Elasticsearch

Hello Elasticusers,

I have a little issue, I have one field in Elasticsearch that is a nested field and I want to add some new values (like an array) into this field using Logstash. However, if the field is empty, Logstash doesn't create an array and each new value replace the latest...

For example : if my "actor" field (id, first_name, last_name) contains already an array, my Logstash script (that you can find below) add my new values as expected :

"actor": [
  {
      "id": "1",
      "first_name": "BLOOM",
      "last_name": "ORLANDO"
   },
   {
      "id": 1,
      "first_name": "PENÉLOPE",
      "last_name": "CRUZ"
    }
 ]

However, if my "actor" field is empty, Logstash doesn't create an array and add the new value like this :

 "actor":
  {
      "id": "1",
      "first_name": "BLOOM",
      "last_name": "ORLANDO"
   }

In the last case, you can notice that my "actor" field is not an array and the next value will replace the latest. Do you have any idea how can I fix this issue? I would like to force Logstash to add my first value as an array if my field is empty...

To complete, these are my configurations :

My nested field is "actor" and this is my Elasticsearch mapping for this field :

actor": {
"type": "nested",
"properties": {
  "id": {
	"type": "long"
  },
  "first_name": {
	"index": "not_analyzed",
	"type": "string"
  },
  "last_name": {
	"index": "not_analyzed",
	"type": "string"
  }
}

}

This is my configuration's file for Logstash :

input {
	jdbc {
		jdbc_driver_library => "path_to_my_connector"
		jdbc_driver_class => "com.mysql.jdbc.Driver"
		jdbc_connection_string => "connection_to_my_db"
		jdbc_user => "user"
		jdbc_password => "password"
		statement => "
                SELECT 
			film.film_id as id,
			film.title,
		        film.description,
			film.release_year,
			film.length,
			film.special_features,
			actor.actor_id as actor_id,
			actor.first_name as actor_first_name,
			actor.last_name as actor_last_name
		  FROM
			actor,
			film,
			film_actor,
		  WHERE
			film_actor.film_id = film.film_id
		  AND
			actor.actor_id = film_actor.actor_id
		  AND
			film.film_id = 1;"
	}
}
filter {
	mutate {
		remove_field => ["host","@timestamp","@version","path","message"]
		rename =>{
			"actor_id" => "[actor][id]"
			"actor_first_name" => "[actor][first_name]"
			"actor_last_name" => "[actor][last_name]"
		}
	}
}
output { 
	elasticsearch {
		hosts => ["elastic_host"]
		index => "films"
		document_type => "details"
		document_id => "%{id}"
		action => "update"
		doc_as_upsert => true
		script => "
                if (ctx._source.actor == null || ctx._source.actor.empty) {
			   ctx._source += [event.actor]
		    }
		    else if (! ctx._source.actor.id.contains(event.actor.id)) {
			   ctx._source.actor += event.actor
		   }"
}

Thank you in advance for your helps.

3 Likes