Es document count != wc -l output of file after sending data with logstash?

Hi friends! I've got what I thought was a simple configuration -- the input is a file that passes through 3 filters -- csv to parse the file, mutate to do a character replace that makes the timestamp automatically detected as a date field, and the removal of the original message. The output of this is sent to Elasticsearch.

When it completes however, the document count seen for the index is less than the number of lines in the csv file.

The logstash-plain.log file doesn't show any errors.

I've done a test where the output is a file instead of to Elasticsearch, and the line counts match between the input and output files.

I believe that narrows down the problem to Elasticsearch dropping documents (schema conflict?), although I'm not certain how to discover that this is going on. Are there logging settings I can enable that would reveal dropped events, or a configuration I can set for the Elasticsearch output to send dropped events elsewhere? I tried --log.level debug, but I don't know what to filter if something wrong did happen.

If Elasticsearch was rejecting events you would be seeing

[WARN ][logstash.outputs.elasticsearch][...] Could not index event to Elasticsearch.

in the logstash logs. You do not need to enable DEBUG logging. If events are being rejected you can set up a DLQ to receive them.

Are you setting the document_id option on the output? If so, is it possible you are overwriting documents?

@Badger Thanks for the quick reply! For one of my tests I did try fingerprinting "message" and setting the id to the fingerprint. If there was an overwrite, I should expect the 'Deleted documents" count to be non-zero right?

I'm going to enable DLQ and see what happens -- I've removed the fingerprinting stuff in the meantime.

Related to these tests I've been doing -- even though I've got "start_position => 'beginning'" set for the file input, it looks like logstash might still be using the since_db file. For instance I tried sending the file to a different index name, but logstash didn't send any data. Is this by design? Since then, I've been deleting that sincedb file and deleting the index before each test to make sure everything's clean.

For testing purposes I would recommend that you set sincedb_path to /dev/null, this would make logstash not store the last position read beween the runs.

logstash always uses an in-memory sincedb, that will be persisted across pipeline or process restarts unless sincedb_path is set to "NUL" (on Windows) or "/dev/null" (on UNIX).

Thanks all!

Okay, so now I'm really perplexed. I stopped logstash, killed the sincedb file, and deleted the index in ES.

I fired logstash back up again and let it run until the document count stopped incrementing. Oddly this time, the count is higher than the number of lines in the file. The total document count shows 97811172.

wc -l for the input file shows 97807422.

So, somehow, ES has an extra 3750 documents.

I did notice as logstash was running that I would sometimes get these "NoConnectionAvailableError, :will_retry_in_seconds=>4" messages in the logstash logs before the connection would re-establish itself. This happened a number of times. How atomic is ES when receiving blocks from logstash when there are communication errors?

Also, the dead_letter_queue data folder didn't have any data -- just a temp file with "1" in it, so that's good. (I couldn't find any "Could not index" messages in the logs either)

I'm going to have logstash generate sequential document id's and see what happens.

Hmm -- this results in only a single document getting indexed -- I assume the variable isn't persisting like I think it should?

input {
  file {
    path => "source.csv"
    start_position => "beginning"
  }
}

filter {
  csv {
    columns => [ "email", "user_id", "timestamp" ]
    remove_field => [ "log", "event", "host" ]
    separator => ","
  }

  mutate {
    gsub => [ "timestamp", " ", "T" ]
  }

  mutate {
    remove_field => [ "message" ]
  }

  ruby {
    init => "@counter = 0"
    code => "
      @counter += 1
      event.set('[@metadata][count]', @counter)
    "
  }
}
output {
  elasticsearch {
    hosts => ["..."]
    index => "incremental_ids"
    document_id => "%{[@metadata][count]}"
  }
}

I would expect that ruby filter to work to generate ids (although you may need pipeline.workers 1). What is the _id on the document that is indexed?

I was using Kibana's view of the indices -- I think it just didn't refresh between the checks I was doing since when I came back later after aborting logstash, the numbers increased.

I re-ran and got a bunch if deleted documents along with new documents, so I think you're right about lowering the workers to 1. Re-running now, and I'm seeing the document count increase with no deleted documents. Crossing fingers!

It's still running -- interestingly I have a number of deleted documents. Waiting to see if the document count at least matches the source file, but man, using logstash doesn't seem to be a reliable way to transfer data into ES? Is this a known issue? I know that originally ES was meant to be used more like a search engine where speed trumps accuracy (it's accepted that shards might fail during queries).

Previously I've written a python script to do something similar to this (also using bulk) that didn't suffer from this duplicate doc issue. Would filebeat fare better?

Logstash is pretty reliable, it must be something particular to your data that is causing this issue because first you had less documents in Elasticsearch then you had more documents in Elasticsearch, so something is not working as it should .

Does this happen with every csv file? How this csv file is created? Can you provide a sample of the file where this issue happens?

Can you share the logstash pipeline you were using when you had less documents in Elasticsearch and the one when you had more documents?

What do you had in the debug logs?

The "less" version was when I was using fingerprinting. My guess is there are some duplicates in the csv file (the fingerprinting was based off of the entire message) which resulted in the end count being lower than the file.

The debug logs were far too verbose to be helpful unless I knew what to grep for -- for instance it included each line of the csv file as it processed them.

The csv file contains three fields -- an email address, an id, and a timestamp. there are 97,807,422 entries in it.

Assuming the id generation happening in that logstash ruby block is reliable, I can't see how there would be any deleted documents in the index other than logstash attempting to re-send something that ES already received. I double checked the logs and confirmed only one worker is operating.

Right now the count is at 90,382,089 total documents, with 5,138 deleted documents.

This was the version of the pipeline before adding the counter (The one that resulted in more documents):

input {
  file {
    path => "source.csv"
    start_position => "beginning"
  }
}

filter {
  csv {
    columns => [ "email", "user_id", "timestamp" ]
    remove_field => [ "log", "event", "host" ]
    separator => ","
  }

  mutate {
    gsub => [ "timestamp", " ", "T" ]
  }

  mutate {
    remove_field => [ "message" ]
  }
}
output {
  elasticsearch {
    hosts => ["..."]
    index => "test1"
  }
}

I am not sure what you mean by the count of "Deleted documents". I do not think anything in the logstash configurations you posted would delete documents. Is this a number from some statistics/counters page in Elasticsearch or kibana?

If you were using fingerprint of the message field and have duplicates in your csv files, then it would make sense to have less documents then you have in the file.

But without the fingerprint, it makes no sense to have more documents then the number of lines in the file. The file input is based in the new line character, if I'm not wrong it reads the file and emits an event for every new line character, so if you have 1000 lines in your file, and every line ends with a new line character, then you should have 1000 events passing through logstash and going to the output.

Since you are making tests to find the cause I would suggest that before everything else you check if you have duplicates in your csv, but depending on the file size and the machine specs it can be CPU expensive.

You could try with uniq, like uniq file | wc -l or with cat and sort, like cat file | sort -u | wc -l

If you do not have duplicates, maybe try to index the same data in two different indices, one with the fingerprint of the message field and another without it.

Your output would be something like this:

output {
    elasticsearch {
        hosts => ["hosts"]
        index => "test-with-fingerprint"
        document_id => "fingerprint_field"
    }
    elasticsearch {
        hosts => ["hosts"]
        index => "test-without-fingerprint"
    }
}

You can use more workers in this case as running it with 1 one worker could take a really long time.

After that check the number of documents in the index using GET _cat/indices.

In Kibana, under Index Management, I get a list of indicies. There are a number of columns for each -- one is the "Total Document" count, and next to that is "Deleted documents".

If I use _cat/indices instead:

green open incremental_ids F9NGcrnBTvGfCZNMxVRAQw 5 1 90382089 5138  14.9gb  7.5gb

That 5138 is deleted?

This is a brand new index that I'm exclusively using just for this logstash import test. I agree that nothing in that pipeline should be deleting documents. I'm testing to see if logstash is sending the same document more than once. A re-send would use the same id since the id's are coming from logstash instead of getting generated by ES. ES sees the same id and overwrites the "old" document with the one with the same ID -- I think that process is what is incrementing the "deleted documents" counter.

Did I mention that for a previous test for the output I chose a file instead of Elasticsearch? The file had the exact same number of lines as the source.

Yeah, the 5138 is the number of deleted docs.

I'm not sure, but I think that Logstash has an at least once delivery guarantee to Elasticsearch, so in some cases it could duplicate your data when retrying to send a bulk request or for some other reason.

Since you shared that you had connection errors to Elasticsearch in your logstash log, maybe this could be the reason of these deleted documents.

With this in mind, having more documents than lines in the file would make sense, those extra documents would be created when logstash retried some requests.

This would explain both cases, when you had less events in Elasticsearch than lines in the csv file, you were using a fingerprint of the message as the _id for the document, you need to check your csv files for duplicates to confirm that.

That must be the case -- the final count shows a complete set of documents, plus what would have been those dups as deleted instead:

green open incremental_ids F9NGcrnBTvGfCZNMxVRAQw 5 1 97807422 5888    16gb    8gb

I wonder if there's a way to change logstash's behavior here. Or a means to create a counter that is global to all workers?

Or maybe I need to bump up the settings in ES to prevent this? Like maybe I'm hitting some kind of connection limit with the service?

[2022-05-10T21:31:04,577][WARN ][logstash.outputs.elasticsearch][main][c11bf306d3c2a6449765e0ff96ca2bcb8115cb994e8d52d9e21e20aace3f55e2] Marking url as dead. Last error: [LogStash::Outputs::ElasticSearch::HttpClient::Pool::HostUnreachableError] ElasticSearch Unreachable: [https://host:443/][Manticore::SocketTimeout] Read timed out {:url=>https://host:443/, :error_message=>"ElasticSearch Unreachable: [https://host:443/][Manticore::SocketTimeout] Read timed out", :error_class=>"LogStash::Outputs::ElasticSearch::HttpClient::Pool::HostUnreachableError"}                                                                                 
[2022-05-10T21:31:04,627][ERROR][logstash.outputs.elasticsearch][main][c11bf306d3c2a6449765e0ff96ca2bcb8115cb994e8d52d9e21e20aace3f55e2] Attempted to send a bulk request but ElasticSearch appears to be unreachable or down {:message=>"ElasticSearch Unreachable: [https://host:443/][Manticore::SocketTimeout] Read timed out", :exception=>LogStash::Outputs::ElasticSearch::HttpClient::Pool::HostUnreachableError, :will_retry_in_seconds=>2}
[2022-05-10T21:31:06,687][ERROR][logstash.outputs.elasticsearch][main][c11bf306d3c2a6449765e0ff96ca2bcb8115cb994e8d52d9e21e20aace3f55e2] Attempted to send a bulk request but there are no living connections in the pool (perhaps ElasticSearch is unreachable or down?) {:message=>"No Available connections", :exception=>LogStash::Outputs::ElasticSearch::HttpClient::Pool::NoConnectionAvailableError, :will_retry_in_seconds=>4}
[2022-05-10T21:31:08,950][WARN ][logstash.outputs.elasticsearch][main] Restored connection to ElasticSearch instance {:url=>"https://host:443/"}

I re-did this data input using filebeat:

filebeat.inputs:
- type: log
  enabled: true
  paths:
  - source.csv
  processors:
    - decode_csv_fields:
        fields:
          message: decoded
        separator: ","
    - extract_array:
        field: decoded
        mappings:
          email: 0
          user_id: 1
          timestamp: 2
    - drop_fields:
        fields: ["message", "decoded", "agent.ephmeral_id", "agent.hostname", "agent.id", "agent.name", "agent.name", "agent.type", "agent.version", "ecs.version", "host.name", "input.type", "log.file.path", "agent", "ecs"]
  index: filebeat-test

The result took about a day longer than when I tried using logstash with a single worker -- I think around 3 days -- way way longer than logstash's default worker count. However, and unfortunately too, I ended up with an extra 3400 documents. It also didn't help that it would send fields I told it to drop, like all the agent.* stuff (wasted bandwidth and storage).

I'm curious if there is a quick and reliable method to get data into ES outside of writing something myself in python. It shouldn't be this difficult, right? ELK is supposed to be useful for security these days, right? I.E. reliable for ingesting security events?