Hello,
I have created an index with the following mappings.
PUT /article_data
{
"mappings" : {
"properties" : {
"articles" : {
"type": "nested",
"properties" : {
"pkm_article" : {
"type" : "long"
}
}
},
"fkm_family" : {
"type" : "long"
},
"s_family" : {
"type" : "text",
"fields" : {
"keyword" : {
"type" : "keyword",
"ignore_above" : 256
}
}
}
}
}
}
I would like to import data into this index via Logstash from a mySQL db.
This is the Data:
| fkm_family | s_family | pkm_article |
| ---------- | -------- | ----------- |
| 1 | Jelly | 1000 |
| 1 | Jelly | 1001 |
| 1 | Jelly | 1002 |
| 1 | Jelly | 1003 |
| 1 | Jelly | 1004 |
| 2 | Nexus | 1005 |
| 2 | Nexus | 1006 |
| 2 | Nexus | 1007 |
| 2 | Nexus | 1008 |
And this my Logstash config file
input {
jdbc {
jdbc_driver_library => "/opt/logstash/mysql-connector-java-5.1.44-bin.jar"
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_connection_string => "jdbc:mysql://db:3306/article_data"
jdbc_user => "root"
jdbc_password => "password"
statement => "SELECT fkm_family, s_family, pkm_article FROM `article_data`"
type=>"article_data"
}
}
filter {
aggregate {
task_id => "%{fkm_family}"
code => "
map['fkm_family'] = event.get('fkm_family')
map['s_family'] = event.get('s_family')
map['articles'] ||= []
map['articles'] << {'pkm_article' => event.get('pkm_article')}
event.cancel()
"
push_previous_map_as_event => true
timeout => 3
}
}
output {
if [type] == "article_data" {
elasticsearch {
index => "article_data"
hosts => "elasticsearch:9200"
document_id => "%{fkm_family}"
}
}
}
The problem is that no data is imported.
The index is empty and in the logfile of Elasticsearch the following errors are output:
[2019-07-30T20:43:15,096][DEBUG][o.e.a.b.TransportShardBulkAction] [elk] [article_data][0] failed to execute bulk item (index) index {[article_data][article_data][18336], source[n/a, actual length: [2.2kb], max length: 2kb]}
java.lang.IllegalArgumentException: object mapping [articles] can't be changed from nested to non-nested
at org.elasticsearch.index.mapper.ObjectMapper.doMerge(ObjectMapper.java:450) ~[elasticsearch-7.0.1.jar:7.0.1]
at org.elasticsearch.index.mapper.ObjectMapper.merge(ObjectMapper.java:443) ~[elasticsearch-7.0.1.jar:7.0.1]
at org.elasticsearch.index.mapper.ObjectMapper.merge(ObjectMapper.java:47) ~[elasticsearch-7.0.1.jar:7.0.1]
at org.elasticsearch.index.mapper.ObjectMapper.doMerge(ObjectMapper.java:472) ~[elasticsearch-7.0.1.jar:7.0.1]
at org.elasticsearch.index.mapper.RootObjectMapper.doMerge(RootObjectMapper.java:276) ~[elasticsearch-7.0.1.jar:7.0.1]
at org.elasticsearch.index.mapper.ObjectMapper.merge(ObjectMapper.java:443) ~[elasticsearch-7.0.1.jar:7.0.1]
at org.elasticsearch.index.mapper.RootObjectMapper.merge(RootObjectMapper.java:271) ~[elasticsearch-7.0.1.jar:7.0.1]
at org.elasticsearch.index.mapper.Mapping.merge(Mapping.java:91) ~[elasticsearch-7.0.1.jar:7.0.1]
at org.elasticsearch.index.mapper.DocumentMapper.merge(DocumentMapper.java:321) ~[elasticsearch-7.0.1.jar:7.0.1]
at org.elasticsearch.cluster.metadata.MetaDataMappingService$PutMappingExecutor.applyRequest(MetaDataMappingService.java:275) ~[elasticsearch-7.0.1.jar:7.0.1]
at org.elasticsearch.cluster.metadata.MetaDataMappingService$PutMappingExecutor.execute(MetaDataMappingService.java:238) ~[elasticsearch-7.0.1.jar:7.0.1]
at org.elasticsearch.cluster.service.MasterService.executeTasks(MasterService.java:687) ~[elasticsearch-7.0.1.jar:7.0.1]
at org.elasticsearch.cluster.service.MasterService.calculateTaskOutputs(MasterService.java:310) ~[elasticsearch-7.0.1.jar:7.0.1]
at org.elasticsearch.cluster.service.MasterService.runTasks(MasterService.java:210) ~[elasticsearch-7.0.1.jar:7.0.1]
at org.elasticsearch.cluster.service.MasterService$Batcher.run(MasterService.java:142) ~[elasticsearch-7.0.1.jar:7.0.1]
at org.elasticsearch.cluster.service.TaskBatcher.runIfNotProcessed(TaskBatcher.java:150) ~[elasticsearch-7.0.1.jar:7.0.1]
at org.elasticsearch.cluster.service.TaskBatcher$BatchedTask.run(TaskBatcher.java:188) ~[elasticsearch-7.0.1.jar:7.0.1]
at org.elasticsearch.common.util.concurrent.ThreadContext$ContextPreservingRunnable.run(ThreadContext.java:681) ~[elasticsearch-7.0.1.jar:7.0.1]
at org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor$TieBreakingPrioritizedRunnable.runAndClean(PrioritizedEsThreadPoolExecutor.java:252) ~[elasticsearch-7.0.1.jar:7.0.1]
at org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor$TieBreakingPrioritizedRunnable.run(PrioritizedEsThreadPoolExecutor.java:215) ~[elasticsearch-7.0.1.jar:7.0.1]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_191]
Can someone tell me what I'm doing wrong?