Logstash doesn't dump all my events to elasticsearch

I have a postgresql table "relou" having more than 2.8m records:

> select count(*) from relou;
>   count  
> ---------
>  2853566

I'm indexing this table to elasticsearch using logstash:

input {
    jdbc {
        jdbc_connection_string => "jdbc:postgresql://***"
        jdbc_user => "***"
        jdbc_password => "***"
        jdbc_driver_library => "/path/postgresql-42.1.4.jar"
        jdbc_driver_class => "org.postgresql.Driver"
	    statement => "select * from relou"
        jdbc_paging_enabled => "true"
        jdbc_fetch_size => "100000"
    }
}

filter {
    mutate {
      add_field => [ "[location][lat]", "%{lat}" ]
      add_field => [ "[location][lon]", "%{lon}" ]
    }

    mutate {
        convert => [ "[location][lat]", "float" ]
        convert => [ "[location][lon]", "float" ]
    }

    mutate { remove_field => [ "lat", "lon" ] }
}

output {
    elasticsearch {
        hosts => ["***"]
        index => "test"
        document_type => "test"
        document_id => "%{type}%{id_catalog_shop_sku}"
        user => logstash
        password => ***
    }
}

I'm executing the corresponding conf file, everything goes smoothly (no errors) and when I check the log file I can see that all the 2.8m rows from "relou" table have been recorded:

[2018-04-14T19:45:26,326][INFO ][logstash.inputs.jdbc     ] (3.085797s) SELECT * FROM (select * from relou) AS "t1" LIMIT 100000 OFFSET 2800000

[2018-04-14T19:46:20,974][INFO ][logstash.pipeline        ] Pipeline has terminated {:pipeline_id=>"relou", :thread=>"#<Thread:0x17a53624@/usr/share/logstash/logstash-core/lib/logstash/pipeline.rb:246 run>"}

[2018-04-14T19:46:21,329][INFO ][logstash.pipeline        ] Pipeline has terminated {:pipeline_id=>".monitoring-logstash", :thread=>"#<Thread:0x428b6a84@/usr/share/logstash/logstash-core/lib/logstash/pipeline.rb:246 run>"}

In kibana under monitoring, I checked the "relou" pipeline and it says:

Events Received: 2.8m events
Events Emitted: 2.8m events

But when I'm doing a count on my test elasticsearch index I only have 1.8m records:

GET /test/_count

{
  "count": 1805997,
  "_shards": {
    "total": 5,
    "successful": 5,
    "skipped": 0,
    "failed": 0
  }
}

If you do a distinct count of this in the database, what number do you get? You may be repeatedly overwriting records that have the same sku.

yeah i checked that before but i do not have any duplicate id_catalog_shop_sku:

select count(*) from( select id_catalog_shop_sku from relou group by id_catalog_shop_sku) temp;
  count  
---------
 2853566

When I run my conf file I have this warning though:

[2018-04-15T05:30:14,163][WARN ][logstash.outputs.elasticsearch] Detected a 6.x and above cluster: thetypeevent field won't be used to determine the document _type {:es_version=>6}

Do you think it is somehow linked to my issue?

No, that warning is routine. What does this produce?

SELECT COUNT(*) FROM (SELECT DISTINCT id_catalog_shop_sku FROM relou)

Alternatively, do not set the document_id and see how many documents you get then.

crawling_db=> SELECT COUNT(*) FROM (SELECT DISTINCT id_catalog_shop_sku FROM relou) temp;
  count  
---------
 2853566

Same count as a simple count(*) so I guess no duplicated id_catalog_shop_sku.

I tried with the following output (no document_type and no document_id)

output {
    elasticsearch {
        hosts => ["***"]
        index => "***"
        user => logstash
        password => ***
    }
}

And it worked (exact same number of records in postgresql and in elasticsearch).

But then I do need document_id as I need my records to be replaced in case of editing.
So I changed my output to:

output {
    elasticsearch {
        hosts => ["***"]
        index => "***"
        document_id => "sku%{id_catalog_shop_sku}"
        user => logstash
        password => ***
    }
}

But with this output, at the end of the execution of the logstash script, 1m records are missing (2.8m in postegresql and 1.8m in elasticsearch). The kibana monitoring session is still telling me that 2.8m events have been received and 2.8m have been emitted. But a count(*) on my elasticsearch index tells me that I just have 1.8m records.

So yeah playing with document_id as an effect on the number of records in elasticsearch but i do not got why

It feels like Elasticsearch things the id_catalog_shop_sku are duplicates, even though the database does not. Not sure how that could happen.

OK, so if you do that, the documents have a id_catalog_shop_sku field, right? If you build a visualization in Kibana using a data table, then split series on Terms + id_catalog_shop_sku are there rows with a count greater than one?

It's indeed the case.

Instead of going through Kibana, I just ran the following aggregation:

{
  "size": 0,
    "aggs" : {
        "genres" : {
            "terms" : { "field" : "id_catalog_shop_sku"}
      }
    }
} 

Corresponding buckets:

  "buckets": [
        {
          "key": "84",
          "doc_count": 913519
        },
        {
          "key": "7",
          "doc_count": 645357
        },
        {
          "key": "4",
          "doc_count": 532189
        },
        {
          "key": "5",
          "doc_count": 373544
        },
        {
          "key": "3",
          "doc_count": 164517
        },
        {
          "key": "8",
          "doc_count": 60446
        },
        {
          "key": "15",
          "doc_count": 49797
        },
        {
          "key": "11",
          "doc_count": 23078
        },
        {
          "key": "13",
          "doc_count": 21507
        },
        {
          "key": "1",
          "doc_count": 18871
        }
      ]

So it says there's 913519 records having 84 for id_catalog_shop_sku.

I then ran the following query:

{
"_source": "id_catalog_shop_sku",
"size": 3,
"query":{
"bool": {
"filter": [
{ "term": {"id_catalog_shop_sku": "84"} }
]
}
}
}

Results:

 "max_score": 0,
    "hits": [
      {
        "_index": "otop",
        "_type": "doc",
        "_id": "lBabyWIBlyT3ryG2GDos",
        "_score": 0,
        "_source": {
          "id_catalog_shop_sku": "1386-84-79287"
        }
      },
      {
        "_index": "otop",
        "_type": "doc",
        "_id": "mBabyWIBlyT3ryG2GDos",
        "_score": 0,
        "_source": {
          "id_catalog_shop_sku": "1347-84-79287"
        }
      },
      {
        "_index": "otop",
        "_type": "doc",
        "_id": "nBabyWIBlyT3ryG2GDos",
        "_score": 0,
        "_source": {
          "id_catalog_shop_sku": "1428-84-79287"
        }
      }
    ]

None of them have 84 as exact id_catalog_shop_sku value, they just have it in a part of that field.

I also checked in my postgresql table and i do not have any id_catalog_shop_sku having 84 as value:

select count(*) from relou where id_catalog_shop_sku = '84';
 count 
-------
     0

So how come it acts like duplicates when I try to set that field as _id and how come the previous query display these results even though none of them have 84 as id_catalog_shop_sku?

For the fields of a document, an analyzer is applied to convert the field into a set of words that should be indexed. A simple analyzer would convert 1428-84-79287 into the three words 1428, 84, 79287. You could then look up that document using any one of those words. So getting many results back when searching against id_catalog_shop_sku is expected.

But it would be bizarre for an analyzer to be applied to document_id. It just makes no sense to me.

But then why is it applied to the aggregation as well?
I'm dumping again my data but this time i'm replacing "-" by "a" within my id_catalog_shop_sku field

I don't know. It makes no sense that it would be.

Can you run those aggregations against the id_catalog_shop_sku.keyword field (or id_catalog_shop_sku.raw if you are running an older version)?

I have to say: I'm lost.

So I replaced all the "-" by "a" whitin the id_catalog_shop_sku field:

select id_catalog_shop_sku from relou limit 3;
 id_catalog_shop_sku  
----------------------
 1063a7a8000000072566
 1068a7a8000000072566
 1085a7a8000000072566

Those values are still unique:

select count(*)
from(
	select
		id_catalog_shop_sku,
		count(*) nb
	from relou
	group by id_catalog_shop_sku
) temp
where nb > 1;
 count 
-------
     0
(1 row)

My table still have 2.8m rows:

select count(*) from relou;
  count  
---------
 2853856

I'm dumping this table to elasticsearch and now some of my id_catalog_shop_sku are not unique (they are set to keyword, I'm using 6.2), and the unique count is now down to 1.8m (the full count is 2.8m) so it basically says I have 1m duplicates in elasticsearch while my postgresql table doesn't have any:

{
  "size": 0,
  "aggs" : {
    "genres" : {
      "terms" : { "field" : "id_catalog_shop_sku.keyword"}
    },
    "count":{
      "cardinality": {
        "field": "id_catalog_shop_sku.keyword"
      }
    }
  }
} 

Result:

"aggregations": {
    "genres": {
      "doc_count_error_upper_bound": 15,
      "sum_other_doc_count": 2853816,
      "buckets": [
        {
          "key": "1064a7a0753759187323",
          "doc_count": 4
        },
        {
          "key": "1072a7a8886327372714",
          "doc_count": 4
        },
        {
          "key": "1349a84a37732",
          "doc_count": 4
        },
        {
          "key": "1362a84a16570",
          "doc_count": 4
        },
        {
          "key": "1369a84a37732",
          "doc_count": 4
        },
        {
          "key": "1371a84a57320",
          "doc_count": 4
        },
        {
          "key": "1382a84a16570",
          "doc_count": 4
        },
        {
          "key": "1422a84a57320",
          "doc_count": 4
        },
        {
          "key": "1439a84a46299",
          "doc_count": 4
        },
        {
          "key": "1441a84a48630",
          "doc_count": 4
        }
      ]
    },
    "count": {
      "value": 1794420
    }
  }

Something has to be wrong with id_catalog_shop_sku I'm gonna try with to set it to unique integer instead of string.

Can you pull the records with a key such as 1072a7a8886327372714 and locate them in the database using other fields, then compare the source rows?

Yep I did, it's like a bunch of single unique entries in postgresql have been duplicated 4 times, meaning in elasticsearch all the records having the same id_catalog_shop_sku are identical (the 4 records are identical) while I just have one unique of those records in postgre.

It also means I'm missing some records in elasticsearch, because in postegresql I have 2.8 unique id_catalog_shop_sku while I juste have 1.8m unique records in elasticsearch.

I'm gonna create a new field having for value 0, 1, 2 ... and so on, so I will be able to figure which rows are being duplicated/ignored and why...

Many thanks for your help

I found the issue... I needed to order by (or index) my rows in postgresql.
Indeed, logstash runs the following queries:

[2018-04-16T13:14:36,297][INFO ][logstash.inputs.jdbc     ] (34.511709s) SELECT count(*) AS "count" FROM (select * from relou) AS "t1" LIMIT 1
[2018-04-16T13:16:43,319][INFO ][logstash.inputs.jdbc     ] (127.015157s) SELECT * FROM (select * from relou) AS "t1" LIMIT 100000 OFFSET 0
[2018-04-16T13:20:26,502][INFO ][logstash.inputs.jdbc     ] (130.383027s) SELECT * FROM (select * from relou) AS "t1" LIMIT 100000 OFFSET 100000
[2018-04-16T13:24:07,075][INFO ][logstash.inputs.jdbc     ] (127.604030s) SELECT * FROM (select * from relou) AS "t1" LIMIT 100000 OFFSET 200000
[2018-04-16T13:27:52,592][INFO ][logstash.inputs.jdbc     ] (122.959538s) SELECT * FROM (select * from relou) AS "t1" LIMIT 100000 OFFSET 300000
[2018-04-16T13:31:39,673][INFO ][logstash.inputs.jdbc     ] (131.027392s) SELECT * FROM (select * from relou) AS "t1" LIMIT 100000 OFFSET 400000
[2018-04-16T13:35:32,316][INFO ][logstash.inputs.jdbc     ] (131.719554s) SELECT * FROM (select * from relou) AS "t1" LIMIT 100000 OFFSET 500000
[2018-04-16T13:39:13,814][INFO ][logstash.inputs.jdbc     ] (124.438707s) SELECT * FROM (select * from relou) AS "t1" LIMIT 100000 OFFSET 600000
[2018-04-16T13:42:52,662][INFO ][logstash.inputs.jdbc     ] (127.332579s) SELECT * FROM (select * from relou) AS "t1" LIMIT 100000 OFFSET 700000
[2018-04-16T13:46:38,670][INFO ][logstash.inputs.jdbc     ] (131.476872s) SELECT * FROM (select * from relou) AS "t1" LIMIT 100000 OFFSET 800000
[2018-04-16T13:50:25,570][INFO ][logstash.inputs.jdbc     ] (139.456266s) SELECT * FROM (select * from relou) AS "t1" LIMIT 100000 OFFSET 900000
[2018-04-16T13:54:20,866][INFO ][logstash.inputs.jdbc     ] (132.207349s) SELECT * FROM (select * from relou) AS "t1" LIMIT 100000 OFFSET 1000000
[2018-04-16T13:58:09,516][INFO ][logstash.inputs.jdbc     ] (131.966954s) SELECT * FROM (select * from relou) AS "t1" LIMIT 100000 OFFSET 1100000
[2018-04-16T14:02:05,327][INFO ][logstash.inputs.jdbc     ] (141.402090s) SELECT * FROM (select * from relou) AS "t1" LIMIT 100000 OFFSET 1200000
[2018-04-16T14:06:09,681][INFO ][logstash.inputs.jdbc     ] (146.035072s) SELECT * FROM (select * from relou) AS "t1" LIMIT 100000 OFFSET 1300000
[2018-04-16T14:10:01,701][INFO ][logstash.inputs.jdbc     ] (139.717228s) SELECT * FROM (select * from relou) AS "t1" LIMIT 100000 OFFSET 1400000
[2018-04-16T14:14:12,481][INFO ][logstash.inputs.jdbc     ] (141.244925s) SELECT * FROM (select * from relou) AS "t1" LIMIT 100000 OFFSET 1500000
[2018-04-16T14:18:02,027][INFO ][logstash.inputs.jdbc     ] (141.641225s) SELECT * FROM (select * from relou) AS "t1" LIMIT 100000 OFFSET 1600000
[2018-04-16T14:22:33,106][INFO ][logstash.inputs.jdbc     ] (144.891984s) SELECT * FROM (select * from relou) AS "t1" LIMIT 100000 OFFSET 1700000
[2018-04-16T14:30:39,808][INFO ][logstash.inputs.jdbc     ] (155.788051s) SELECT * FROM (select * from relou) AS "t1" LIMIT 100000 OFFSET 1800000
[2018-04-16T14:34:46,769][INFO ][logstash.inputs.jdbc     ] (147.352680s) SELECT * FROM (select * from relou) AS "t1" LIMIT 100000 OFFSET 1900000
[2018-04-16T14:41:01,659][INFO ][logstash.inputs.jdbc     ] (153.538615s) SELECT * FROM (select * from relou) AS "t1" LIMIT 100000 OFFSET 2000000
[2018-04-16T14:48:26,355][INFO ][logstash.inputs.jdbc     ] (149.636503s) SELECT * FROM (select * from relou) AS "t1" LIMIT 100000 OFFSET 2100000
[2018-04-16T14:52:40,695][INFO ][logstash.inputs.jdbc     ] (155.081139s) SELECT * FROM (select * from relou) AS "t1" LIMIT 100000 OFFSET 2200000
[2018-04-16T14:57:42,830][INFO ][logstash.inputs.jdbc     ] (160.886441s) SELECT * FROM (select * from relou) AS "t1" LIMIT 100000 OFFSET 2300000
[2018-04-16T15:03:17,846][INFO ][logstash.inputs.jdbc     ] (153.907712s) SELECT * FROM (select * from relou) AS "t1" LIMIT 100000 OFFSET 2400000
[2018-04-16T15:11:10,194][INFO ][logstash.inputs.jdbc     ] (152.567438s) SELECT * FROM (select * from relou) AS "t1" LIMIT 100000 OFFSET 2500000
[2018-04-16T15:15:26,487][INFO ][logstash.inputs.jdbc     ] (164.238046s) SELECT * FROM (select * from relou) AS "t1" LIMIT 100000 OFFSET 2600000
[2018-04-16T15:19:43,634][INFO ][logstash.inputs.jdbc     ] (159.817469s) SELECT * FROM (select * from relou) AS "t1" LIMIT 100000 OFFSET 2700000
[2018-04-16T15:28:30,811][INFO ][logstash.inputs.jdbc     ] (159.195139s) SELECT * FROM (select * from relou) AS "t1" LIMIT 100000 OFFSET 2800000

Since I didn't order my results and my table is not indexed, postegresql was randomly selecting the data during its pagination, so same rows could have been selected multiple times during the pagination.

In my logstash file I changed my query to:
select * from relou order by id_catalog_shop_sku

And I now have the same number of records in postgre and in elasitcsearch, both having unique id_catalog_shop_sku

Thank again for taking the time to look into my problem.

1 Like

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.