Create nested JSON with aggregate filter plugin

Hi,

i am trying to import data from mySQL DB by Logstash and would like to create a nested JSON.
The data from the db looks as follows:

| fkm_family | s_family | pkm_article |
| ---------- | -------- | ----------- |
| 1          | Jelly    | 1000        |
| 1          | Jelly    | 1001        |
| 1          | Jelly    | 1002        |
| 1          | Jelly    | 1003        |
| 1          | Jelly    | 1004        |
| 2          | Nexus    | 1005        |
| 2          | Nexus    | 1006        |
| 2          | Nexus    | 1007        |
| 2          | Nexus    | 1008        |

And I would like to aggregate the data in this way:

[
  {
    "fkm_family": 1,
    "s_family": "Jelly",
    "articles": [
      {
        "pkm_article": 1000
      },
      {
        "pkm_article": 1001
      },
      {
        "pkm_article": 1002
      },
      {
        "pkm_article": 1003
      },
      {
        "pkm_article": 1004
      }
    ]
  },
  {
    "fkm_family": 2,
    "s_family": "Nexus",
    "articles": [
      {
        "pkm_article": 1005
      },
      {
        "pkm_article": 1006
      },
      {
        "pkm_article": 1007
      },
      {
        "pkm_article": 1008
      }
    ]
  }
]

The problem is that no data is imported.
The index is empty and Logstash does not return an error.
Here is the config file:

input {
  jdbc {
    jdbc_driver_library => "/opt/logstash/mysql-connector-java-5.1.44-bin.jar"
    jdbc_driver_class => "com.mysql.jdbc.Driver"
    jdbc_connection_string => "jdbc:mysql://db:3306/article_data"
    jdbc_user => "root"
	  jdbc_password => "password"
    statement => "SELECT fkm_family, s_family, pkm_article FROM `article_data`"
    type=>"article_data"
  }
}

filter {
  aggregate {
    task_id => "%{fkm_family}"
    code => "
      map['fkm_family'] = event.get('fkm_family')
      map['s_family'] = event.get('s_family')
      map['articles'] ||= []
      map['articles'] << {'pkm_article' => event.get('pkm_article')}
      event.cancel()
    "
    push_previous_map_as_event => true
    timeout => 3
  }
}

output {
  if [type] == "article_data" {
    elasticsearch {
      index => "article_data"
      hosts => "elasticsearch:9200"
      document_id => "%{fkm_family}"
    }
  }
}

Can someone tell me what I'm doing wrong?

There is nothing wrong with the aggregate. If I use a csv input then I get two events, the second being

"fkm_family" => "2",
  "s_family" => "Nexus",
  "articles" => [
    [0] {
        "pkm_article" => "1005"
    },
    [1] {
        "pkm_article" => "1006"
    },
    [2] {
        "pkm_article" => "1007"
    },
    [3] {
        "pkm_article" => "1008"
    }
],

so I would look at the jdbc input.

If I remove the aggregation part, everything will work fine.
Do you think the problem can still be the jdbc input?

The only fields on the event will be the ones you added to the map. It will not have a type field unless you add

map['type'] = event.get('type')

Yeeeees, that's the solution. Thank you very much!

I have another problem now :slightly_frowning_face:
If I define "articles" as nested field and then import the data, the index is completely empty.
How do I change the config file to import data? Or doesn't it work with nested fields?

What do you mean by 'nested field'?

I would like to create the index with the following mappings.

PUT /article_data
{
  "mappings" : {
    "properties" : {
      "articles" : {
        "type": "nested",
        "properties" : {
          "pkm_article" : {
            "type" : "long"
          }
        }
      },
      "fkm_family" : {
        "type" : "long"
      },
      "s_family" : {
        "type" : "text",
        "fields" : {
          "keyword" : {
            "type" : "keyword",
            "ignore_above" : 256
          }
        }
      }
    }
  }
}

But if I do this, the index is empty after the data import.

I suggest you check both the logstash logs and the elasticsearch logs. You may be getting mapping errors when ingesting.

Oh yes, there are errors in the elasticserach logfile.

[2019-07-30T20:43:15,096][DEBUG][o.e.a.b.TransportShardBulkAction] [elk] [article_data][0] failed to execute bulk item (index) index {[article_data][article_data][18336], source[n/a, actual length: [2.2kb], max length: 2kb]}
java.lang.IllegalArgumentException: object mapping [articles] can't be changed from nested to non-nested
	at org.elasticsearch.index.mapper.ObjectMapper.doMerge(ObjectMapper.java:450) ~[elasticsearch-7.0.1.jar:7.0.1]
	at org.elasticsearch.index.mapper.ObjectMapper.merge(ObjectMapper.java:443) ~[elasticsearch-7.0.1.jar:7.0.1]
	at org.elasticsearch.index.mapper.ObjectMapper.merge(ObjectMapper.java:47) ~[elasticsearch-7.0.1.jar:7.0.1]
	at org.elasticsearch.index.mapper.ObjectMapper.doMerge(ObjectMapper.java:472) ~[elasticsearch-7.0.1.jar:7.0.1]
	at org.elasticsearch.index.mapper.RootObjectMapper.doMerge(RootObjectMapper.java:276) ~[elasticsearch-7.0.1.jar:7.0.1]
	at org.elasticsearch.index.mapper.ObjectMapper.merge(ObjectMapper.java:443) ~[elasticsearch-7.0.1.jar:7.0.1]
	at org.elasticsearch.index.mapper.RootObjectMapper.merge(RootObjectMapper.java:271) ~[elasticsearch-7.0.1.jar:7.0.1]
	at org.elasticsearch.index.mapper.Mapping.merge(Mapping.java:91) ~[elasticsearch-7.0.1.jar:7.0.1]
	at org.elasticsearch.index.mapper.DocumentMapper.merge(DocumentMapper.java:321) ~[elasticsearch-7.0.1.jar:7.0.1]
	at org.elasticsearch.cluster.metadata.MetaDataMappingService$PutMappingExecutor.applyRequest(MetaDataMappingService.java:275) ~[elasticsearch-7.0.1.jar:7.0.1]
	at org.elasticsearch.cluster.metadata.MetaDataMappingService$PutMappingExecutor.execute(MetaDataMappingService.java:238) ~[elasticsearch-7.0.1.jar:7.0.1]
	at org.elasticsearch.cluster.service.MasterService.executeTasks(MasterService.java:687) ~[elasticsearch-7.0.1.jar:7.0.1]
	at org.elasticsearch.cluster.service.MasterService.calculateTaskOutputs(MasterService.java:310) ~[elasticsearch-7.0.1.jar:7.0.1]
	at org.elasticsearch.cluster.service.MasterService.runTasks(MasterService.java:210) ~[elasticsearch-7.0.1.jar:7.0.1]
	at org.elasticsearch.cluster.service.MasterService$Batcher.run(MasterService.java:142) ~[elasticsearch-7.0.1.jar:7.0.1]
	at org.elasticsearch.cluster.service.TaskBatcher.runIfNotProcessed(TaskBatcher.java:150) ~[elasticsearch-7.0.1.jar:7.0.1]
	at org.elasticsearch.cluster.service.TaskBatcher$BatchedTask.run(TaskBatcher.java:188) ~[elasticsearch-7.0.1.jar:7.0.1]
	at org.elasticsearch.common.util.concurrent.ThreadContext$ContextPreservingRunnable.run(ThreadContext.java:681) ~[elasticsearch-7.0.1.jar:7.0.1]
	at org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor$TieBreakingPrioritizedRunnable.runAndClean(PrioritizedEsThreadPoolExecutor.java:252) ~[elasticsearch-7.0.1.jar:7.0.1]
	at org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor$TieBreakingPrioritizedRunnable.run(PrioritizedEsThreadPoolExecutor.java:215) ~[elasticsearch-7.0.1.jar:7.0.1]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_191]

I am confused and have no idea where the error is in my script.

This is really turning into an elasticsearch question. The aggregate produces events in the desired format. You have a mapping, and elasticsearch fails to map the events. I would ask a new question in the elasticsearch forum showing the event format, mapping, and error message.

Okay, thanks. I have now opened a topic there.
Do you have any idea what the problem is?

No. I don't know a lot about elasticsearch.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.