Logstash not sends all data to ES

I have 262K item in DB but logstash send only 228K to ES
this is the logstash query
input {
jdbc {
jdbc_default_timezone => "UTC"
jdbc_driver_class => "org.postgresql.Driver"
jdbc_driver_library => "/usr/share/logstash/postgresql-42.2.18.jar"

statement => 'SELECT
i."ItemId",
i."PharmacyId",
i."Name_I",
i."Name_II",
i."parentId",
i."Name_II" as "name",
i."DoseForItem",
i."MercatoId",
id."CurrentStock",
i."ManufacturerId",
i."TouchScreenId" ,
i."ClassifyId" ,
i."DefaultUomId" ,
id."PosSelectCounter",
i."CategoryId",
i."PharmaceuticalFormId",
m."Name_II" AS "ManufacturerName_II",
m."Name_I" AS "ManufacturerName_I",
ts."Title" as "TouchScreenTitle",
ic."Name_II" AS "CategoryName_II",
ic."Name_I" AS "CategoryName_I",
ic2."Name_II" AS "ClassifyName_II",
ic2."Name_I" AS "ClassifyName_I",
pf."Name_II" AS "PharmaceuticalFormName_II",
pf."Name_I" AS "PharmaceuticalFormName_I",
(
SELECT json_agg(distinct(it."PharmacyId"::text))::text
FROM "InvTransactions" it
WHERE it."ItemId" = i."ItemId"
) AS "activePharmacies",
(
SELECT u."UomName"::text
FROM "Uoms" u
WHERE u."UomId" = i."DefaultUomId"
) AS "uomName",
(
SELECT u2."UomId"
FROM "ItemUoms" iu
LEFT JOIN "Uoms" u2 ON iu."UomId" = u2."UomId"
WHERE iu."ItemId" = i."ItemId" limit 1
) AS "smallUomId",
(
SELECT u2."UomName"
FROM "ItemUoms" iu
LEFT JOIN "Uoms" u2 ON iu."UomId" = u2."UomId"
WHERE iu."ItemId" = i."ItemId" limit 1
) AS "smallUom",
(
SELECT json_agg(i2."PharmacyId"::text)::text
FROM "Items" i2
WHERE i2."parentId" = i."ItemId"
) AS "ItemChilds",
(
SELECT json_agg(b."BarcodeValue")::text
FROM "ItemBarcodes" ib
LEFT JOIN "Barcodes" b ON ib."BarcodeId" = b."BarcodeId"
WHERE ib."ItemId" = i."ItemId"
) AS "barcode",
(
SELECT json_agg(s."Name_II")::text
FROM "MedicineScientifics" ms
LEFT JOIN "Scientifics" s ON s."ScientificId" = ms."ScientificId"
WHERE ms."ItemId" = i."ItemId"
) AS "ScientificsName_II",
(
SELECT json_agg(s."Name_I")::text
FROM "MedicineScientifics" ms
LEFT JOIN "Scientifics" s ON s."ScientificId" = ms."ScientificId"
WHERE ms."ItemId" = i."ItemId"
) AS "ScientificsName_I",
(
SELECT json_agg(s."ScientificId")::text
FROM "MedicineScientifics" ms
LEFT JOIN "Scientifics" s ON s."ScientificId" = ms."ScientificId"
WHERE ms."ItemId" = i."ItemId"
) AS "scientificIds",
(
SELECT json_agg(ti."Name_I")::text
FROM "TherapeuticIndicationItems" tii
LEFT JOIN "TherapeuticIndications" ti ON tii."TherapeuticIndicationId" = ti."TherapeuticIndicationId"
WHERE tii."ItemId" = i."ItemId"
) AS "therapeuticIndication",
(
SELECT json_agg(ti."TherapeuticIndicationId"::text)::text
FROM "TherapeuticIndicationItems" tii
LEFT JOIN "TherapeuticIndications" ti ON tii."TherapeuticIndicationId" = ti."TherapeuticIndicationId"
WHERE tii."ItemId" = i."ItemId"
) AS "therapeuticIndicationIds"
FROM
"Items" i
LEFT JOIN
"Manufacturers" m ON m."ManufacturerId" = i."ManufacturerId"
LEFT JOIN
"TouchScreens" ts ON ts."TouchScreenId" = i."TouchScreenId"
LEFT JOIN
"ItemCategories" ic ON ic."ItemCategoryId" = i."CategoryId"
LEFT JOIN
"ItemClassifies" ic2 ON ic2."ItemClassifyId" = i."ClassifyId"
LEFT JOIN
"PharmaceuticalForms" pf ON pf."PharmaceuticalFormId" = i."PharmaceuticalFormId"
LEFT JOIN
"ItemDetails" id ON id."ItemId" = i."ItemId"'
jdbc_paging_enabled => true
jdbc_page_size => 10000
schedule => "* * * * *" # Run every minute (adjust as needed)
}
}

Welcome to the community!

Add id for jdbc you check LS statistics, what is in and out from pipeline.Also add IDs to other plugins to know which consume what. Not sure are you using anything in the filter.

input {
 jdbc {
    jdbc_default_timezone => "UTC"
    jdbc_driver_class => "org.postgresql.Driver"
    jdbc_driver_library => "/usr/share/logstash/postgresql-42.2.18.jar"
    statement => ... 
    id => "postgresql"
  }
}

You result should be:

      "events": {
        ->"in": 216610
        ->"out": 216485,
        "filtered": 216485,
        "duration_in_millis": 365495,
        "queue_push_duration_in_millis": 342466
      },
      "plugins": {
        "inputs": [
          {
            "id": "postgresql",
            "flow": {
              "throughput": {
                "current": 603.1,
                "lifetime": 590.7
              }
            },
            "name": "jdbc",
            "events": {
              "out": 216485,
              "queue_push_duration_in_millis": 342466
            }
          }
        ],

You can use this JSON statistics, just enable xpack.monitoring settings in logstash.yml or use agent/metricbeat.

Can you provide more info:

  • Which LS/ES version do you use
  • LS in-memory or persistent queue?
  • Is your jdbc select is equal to query which count 262K records?
  • Is there any other transformations in filter? IFs, drop, ...