Import data into nested document using logstash

Hello,

I have created an index with the following mappings.

PUT /article_data
{
  "mappings" : {
    "properties" : {
      "articles" : {
        "type": "nested",
        "properties" : {
          "pkm_article" : {
            "type" : "long"
          }
        }
      },
      "fkm_family" : {
        "type" : "long"
      },
      "s_family" : {
        "type" : "text",
        "fields" : {
          "keyword" : {
            "type" : "keyword",
            "ignore_above" : 256
          }
        }
      }
    }
  }
}

I would like to import data into this index via Logstash from a mySQL db.

This is the Data:

| fkm_family | s_family | pkm_article |
| ---------- | -------- | ----------- |
| 1          | Jelly    | 1000        |
| 1          | Jelly    | 1001        |
| 1          | Jelly    | 1002        |
| 1          | Jelly    | 1003        |
| 1          | Jelly    | 1004        |
| 2          | Nexus    | 1005        |
| 2          | Nexus    | 1006        |
| 2          | Nexus    | 1007        |
| 2          | Nexus    | 1008        |

And this my Logstash config file

input {
  jdbc {
    jdbc_driver_library => "/opt/logstash/mysql-connector-java-5.1.44-bin.jar"
    jdbc_driver_class => "com.mysql.jdbc.Driver"
    jdbc_connection_string => "jdbc:mysql://db:3306/article_data"
    jdbc_user => "root"
	  jdbc_password => "password"
    statement => "SELECT fkm_family, s_family, pkm_article FROM `article_data`"
    type=>"article_data"
  }
}

filter {
  aggregate {
    task_id => "%{fkm_family}"
    code => "
      map['fkm_family'] = event.get('fkm_family')
      map['s_family'] = event.get('s_family')
      map['articles'] ||= []
      map['articles'] << {'pkm_article' => event.get('pkm_article')}
      event.cancel()
    "
    push_previous_map_as_event => true
    timeout => 3
  }
}

output {
  if [type] == "article_data" {
    elasticsearch {
      index => "article_data"
      hosts => "elasticsearch:9200"
      document_id => "%{fkm_family}"
    }
  }
}

The problem is that no data is imported.
The index is empty and in the logfile of Elasticsearch the following errors are output:

[2019-07-30T20:43:15,096][DEBUG][o.e.a.b.TransportShardBulkAction] [elk] [article_data][0] failed to execute bulk item (index) index {[article_data][article_data][18336], source[n/a, actual length: [2.2kb], max length: 2kb]}
java.lang.IllegalArgumentException: object mapping [articles] can't be changed from nested to non-nested
	at org.elasticsearch.index.mapper.ObjectMapper.doMerge(ObjectMapper.java:450) ~[elasticsearch-7.0.1.jar:7.0.1]
	at org.elasticsearch.index.mapper.ObjectMapper.merge(ObjectMapper.java:443) ~[elasticsearch-7.0.1.jar:7.0.1]
	at org.elasticsearch.index.mapper.ObjectMapper.merge(ObjectMapper.java:47) ~[elasticsearch-7.0.1.jar:7.0.1]
	at org.elasticsearch.index.mapper.ObjectMapper.doMerge(ObjectMapper.java:472) ~[elasticsearch-7.0.1.jar:7.0.1]
	at org.elasticsearch.index.mapper.RootObjectMapper.doMerge(RootObjectMapper.java:276) ~[elasticsearch-7.0.1.jar:7.0.1]
	at org.elasticsearch.index.mapper.ObjectMapper.merge(ObjectMapper.java:443) ~[elasticsearch-7.0.1.jar:7.0.1]
	at org.elasticsearch.index.mapper.RootObjectMapper.merge(RootObjectMapper.java:271) ~[elasticsearch-7.0.1.jar:7.0.1]
	at org.elasticsearch.index.mapper.Mapping.merge(Mapping.java:91) ~[elasticsearch-7.0.1.jar:7.0.1]
	at org.elasticsearch.index.mapper.DocumentMapper.merge(DocumentMapper.java:321) ~[elasticsearch-7.0.1.jar:7.0.1]
	at org.elasticsearch.cluster.metadata.MetaDataMappingService$PutMappingExecutor.applyRequest(MetaDataMappingService.java:275) ~[elasticsearch-7.0.1.jar:7.0.1]
	at org.elasticsearch.cluster.metadata.MetaDataMappingService$PutMappingExecutor.execute(MetaDataMappingService.java:238) ~[elasticsearch-7.0.1.jar:7.0.1]
	at org.elasticsearch.cluster.service.MasterService.executeTasks(MasterService.java:687) ~[elasticsearch-7.0.1.jar:7.0.1]
	at org.elasticsearch.cluster.service.MasterService.calculateTaskOutputs(MasterService.java:310) ~[elasticsearch-7.0.1.jar:7.0.1]
	at org.elasticsearch.cluster.service.MasterService.runTasks(MasterService.java:210) ~[elasticsearch-7.0.1.jar:7.0.1]
	at org.elasticsearch.cluster.service.MasterService$Batcher.run(MasterService.java:142) ~[elasticsearch-7.0.1.jar:7.0.1]
	at org.elasticsearch.cluster.service.TaskBatcher.runIfNotProcessed(TaskBatcher.java:150) ~[elasticsearch-7.0.1.jar:7.0.1]
	at org.elasticsearch.cluster.service.TaskBatcher$BatchedTask.run(TaskBatcher.java:188) ~[elasticsearch-7.0.1.jar:7.0.1]
	at org.elasticsearch.common.util.concurrent.ThreadContext$ContextPreservingRunnable.run(ThreadContext.java:681) ~[elasticsearch-7.0.1.jar:7.0.1]
	at org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor$TieBreakingPrioritizedRunnable.runAndClean(PrioritizedEsThreadPoolExecutor.java:252) ~[elasticsearch-7.0.1.jar:7.0.1]
	at org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor$TieBreakingPrioritizedRunnable.run(PrioritizedEsThreadPoolExecutor.java:215) ~[elasticsearch-7.0.1.jar:7.0.1]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_191]

Can someone tell me what I'm doing wrong?

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.