Indexing flattened field with unique keys is very slow

I have a logstash ingest process that ingests records at 4k events per second. Recently we added field that is an object consisting of unique keys. I save it as a flattened data type. But ingest processing time starts at 3k per second and within an hour goes down to 60 events per second. I can't ingest more than 3 million out of the 200 million records. An example field looks like this:

{
  "que": [
    13
  ],
  "cinq.": [
    30
  ],
  "qui": [
    18
  ],
  "toutes": [
    29
  ],
  "beaucoup": [
    9
  ],
  "du": [
    22
  ],
  "des": [
    10
  ],
  "lettres": [
    12
  ],
  "Simon": [
    31
  ],
  "(et": [
    17
  ],
  "m'avez": [
    15
  ],
  "parvenues)": [
    25
  ],
  "sont": [
    21
  ],
  "me": [
    20
  ],
  "cheres": [
    3
  ],
  "nombreuses": [
    11
  ],
  "Je": [
    6
  ],
  "tantes": [
    4
  ],
  "vous": [
    7,
    14,
    27
  ],
  "Dimanche": [
    0
  ],
  "pas": [
    24
  ],
  "reste": [
    23
  ],
  "envoyees": [
    16
  ],
  "et": [
    26
  ],
  "embrasse": [
    28
  ],
  "ne": [
    19
  ],
  "remercie": [
    8
  ],
  "Mes": [
    2
  ]
}

The object has different keys and numbers for just about every record.

Since this field does not need to be filtered or sorted, I set it as:

"abstract_inverted_index": {
          "type": "flattened",
          "index": false,
          "doc_values": false
        }

Any idea what could be making this ingest slow down over time? I noticed event latency gradually goes up from 5ms to over 250ms fairly quickly.

Here is a screenshot showing how everything is slowing down over time. My Elasticsearch cluster receiving events is hardly working, with CPU and heap memory low.

How big is the index, how big are the shards, and how many shards are there?

Hi Alex. The index is 205 million records total. It's divided between 38 different indexes based on year, and each index has 1 primary shard and 3 replicas. When the full dataset is loaded there is about 20 to 40gb in each shard.

I thought maybe memory was the issue, so increased the memory size for logstash in docker to 8gb out of the 16gb available on the machine. That got me farther, but the ingest eventually slowed down. Do you think I should add more memory for logstash? More than 8gb?

Hi all. I am still having this problem and do not believe it is related to the flattened fields. It appears logstash continually uses heap space. Any idea what may be going wrong?

Can you share your full pipeline configuration to help understand what logstash is trying to do?

@leandrojmp Sure! Here is my logstash file:

input {
    jdbc {
        jdbc_driver_library => "/usr/share/jars/redshift-jdbc42-2.1.0.1.jar"
        jdbc_driver_class => "com.amazon.redshift.jdbc42.Driver"
        jdbc_connection_string => "${JDBC_URL_PROD}"
        jdbc_user => "${JDBC_USER_PROD}"
        jdbc_password => "${JDBC_PASSWORD_PROD}"
        jdbc_fetch_size => 10
        last_run_metadata_path => "/usr/share/sql_last_value.yml"
        schedule => "* * * * *"
        statement => "SELECT updated, json_save from mid.json_works WHERE updated > :sql_last_value order by updated;"
        use_column_value => true
        tracking_column => updated
        tracking_column_type => "timestamp"
    }
}

filter {
      json {
        source => "json_save"
      }
      if [publication_year] {
        mutate { convert => { "publication_year" => "integer" } }

        if [publication_year] < 1960 {
          mutate { add_field => { "[@metadata][index_suffix]" => "1959-or-less" } }
        } else if [publication_year] > 1959 and [publication_year] < 1970 {
          mutate { add_field => { "[@metadata][index_suffix]" => "1960s" } }
        } else if [publication_year] > 1969 and [publication_year] < 1980 {
          mutate { add_field => { "[@metadata][index_suffix]" => "1970s" } }
        } else if [publication_year] > 1979 and [publication_year] < 1990 {
          mutate { add_field => { "[@metadata][index_suffix]" => "1980s" } }
        } else if [publication_year] > 1989 and [publication_year] < 1995 {
          mutate { add_field => { "[@metadata][index_suffix]" => "1990-to-1994" } }
        } else if [publication_year] > 1994 and [publication_year] < 2000 {
          mutate { add_field => { "[@metadata][index_suffix]" => "1995-to-1999" } }
        } else {
          mutate { add_field => { "[@metadata][index_suffix]" => "%{publication_year}" } }
        }
      } else {
        mutate { add_field => { "[@metadata][index_suffix]" => "invalid-data" } }
      }

      mutate {
        remove_field => ["json_save", "version"]
      }
}

output {
    elasticsearch {
        hosts => ["${ES_HOST_PROD}"]
        index => "works-v6-%{[@metadata][index_suffix]}"
        user => "${ES_USER_PROD}"
        password => "${ES_PASSWORD_PROD}"
        document_id => "%{id}"
    }
}

Here is my docker-compose file:

version: "3"
services:
  logstash:
    image: docker.elastic.co/logstash/logstash:7.16.3
    environment:
      - XPACK_MONITORING_ENABLED=false
      - NODE_NAME=works
      - PIPELINE_BATCH_SIZE=75
      - PIPELINE_WORKERS=6
      - QUEUE_TYPE=persisted
      - QUEUE_DRAIN=true
      - QUEUE_MAX_BYTES=10gb
      - "LS_JAVA_OPTS=-Xmx8g -Xms8g"
    env_file:
      - .env
    volumes:
      - ./jars/:/usr/share/jars/
      - ./pipeline/:/usr/share/logstash/pipeline/
      - ./sql_last_value.yml:/usr/share/sql_last_value.yml
    command: logstash
  metricbeat:
    image: docker.elastic.co/beats/metricbeat:7.16.3
    depends_on:
      - logstash
    env_file:
      - .env
    volumes:
      - ./metricbeat.yml:/usr/share/metricbeat/metricbeat.yml

Of note my Elasticsearch cluster has low CPU (<15%) and low heap usage while indexing. So I know the cluster is not the issue.

Well there's your problem. See this thread and the PRs linked to the PR it links to. logstash uses a hash to remember every key it has ever seen.

TL/DR: mitigated in the next releases (7.17 and 8.x).

2 Likes

Oh thank you. At least I know I'm not crazy and can stop trying to get this to work. Holy cow.

Do you know of a way I can use a development version of 7.17 that has this fix?

The fix was merged into the branch earlier this afternoon, so unless you want to build it from source I think you have to wait.

Thanks again for your help!

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.