S3 input plugin taking really long time to process

Hi Team,

I am using s3 input plugins to get billing data from aws. There are almost 15-20 files (ending with .csv.gz), each 115-120 MB of size.
Now these files are replaced next day with new set of files

Logstash config:

input {
s3 {
"access_key_id" => ""
"secret_access_key" => ""
"bucket" => "billing"
"prefix" => "billing_2020/billing/"
"exclude_pattern" => "/20220701-20220801|.json$"
"region" => "us-east-1"
"include_object_properties" => true
"watch_for_new_files" => true
"temporary_directory" => "/etc/logstash/conf.d/aws-logstash-file"
"sincedb_path" => "/etc/logstash/conf.d/aws-logstash-file/sincedb_path"
"additional_settings" => {
"force_path_style" => true
"follow_redirects" => false
}

        }
      }

filter{
mutate { gsub => [ "message"," "",""] }
mutate { add_field => { "file" => "%{[@metadata][s3][key]}" } }
csv {
source => "message"
separator => ","
skip_header => "true"
}
mutate {remove_field => [ "event", "message"]}

}

output {
elasticsearch {
hosts => "https://elk.domain.com:9200/"
index => "aws-s3-finops-%{+YYYY.MM.dd}"
codec => rubydebug {
metadata => true }
}

}

This config is running fine, but it can process 2-3 files in 24 hrs. So I am missing data/event.
hardware config of server: 2 CPU, 8GB RAM and JVM config for logstash is -Xms2g
-Xmx2g

Logstash version: logstash-8.2.2-1

What change I need to do here to speed up the event processing?

Hi,

Please help with Bulk insertion in elasticsearch from logstash, since this is taking a lot time to process billing data from aws

First you need to know what is taking time, if it is the processing in Logstash or if it is the indexing in Elasticsearch.

What are your Elasticsearch specs?

I would suggest that you manually download some files and try to ingest them using a pipeline with the file input and a temporary index for destination, this would help you to find if the bottleneck is Logstash or Elasticsearch.

Hi @leandrojmp, Thanks for your suggestion. It is logstash which is taking time to process those data.
My logstash config looking like below:

filter {
mutate { add_field => { "filename" => "%{[@metadata][s3][key]}" }
add_field => { "last_modified" => "%{[@metadata][s3][last_modified]}" }
add_field => { "version_id" => "%{[@metadata][s3][version_id]}" } }
csv {
source => "message"
separator => ","
skip_header => "true"
columns => ["identity/LineItemId", "identity/TimeInterval", "bill/InvoiceId", "bill/InvoicingEntity", "bill/BillingEntity", "bill/BillType","bill/PayerAccountId", "bill/BillingPeriodStartDate", "bill/BillingPeriodEndDate", "lineItem/UsageAccountId", "lineItem/LineItemType", "lineItem/UsageStartDate", "lineItem/UsageEndDate", "lineItem/ProductCode", "lineItem/UsageType", "lineItem/Operation", "lineItem/AvailabilityZone", "lineItem/ResourceId", "lineItem/UsageAmount", "lineItem/NormalizationFactor", "lineItem/NormalizedUsageAmount", "lineItem/CurrencyCode", "lineItem/UnblendedRate", "lineItem/UnblendedCost", "lineItem/BlendedRate", "lineItem/BlendedCost", "lineItem/LineItemDescription", "lineItem/TaxType", "lineItem/NetUnblendedRate", "lineItem/NetUnblendedCost", "lineItem/LegalEntity", "product/ProductName", "product/PurchaseOption", "product/accessType", "product/accountAssistance", "product/alarmType", "product/analyticsMode", "product/analyticsType", "product/apiCalls", "product/apiType", "product/architecturalReview", "product/architectureSupport", "product/attachmentType", "product/availability", "product/availabilityZone", "product/awsiotevents", "product/backupservice","product/baseProductReferenceCode", "product/bestPractices", "product/brokerEngine", "product/bundle", "product/bundleDescription", "product/bundleGroup", "product/cacheEngine", "product/cacheType", "product/callingType", "product/capacitystatus", "product/caseSeverityresponseTimes", "product/category", "product/channel", "product/ciType", "product/classicnetworkingsupport", "product/clockSpeed", "product/cloudSearchVersion", "product/comment", "product/component", "product/computeFamily", "product/computeType", "product/contentType", "product/country", "product/countsAgainstQuota", "product/cputupe", "product/cputype", "product/currentGeneration", "product/customerServiceAndCommunities", "product/data", "product/dataTransferQuota", "product/dataType", "product/databaseEdition", "product/databaseEngine", "product/datatransferout", "product/dedicatedEbsThroughput", "product/deploymentLocation", "product/deploymentOption", "product/describes", "product/description", "product/deviceOs", "product/directorySize", "product/directoryType", "product/directoryTypeDescription", "product/disableactivationconfirmationemail", "product/durability", "product/ecu", "product/edition", "product/endpoint", "product/endpointType", "product/engine", "product/engineCode", "product/enhancedNetworkingSupport", "product/enhancedNetworkingSupported", "product/entityType", "product/equivalentondemandsku", "product/eventType", "product/executionMode", "product/feature", "product/feeCode", "product/feeDescription", "product/fileSystemType", "product/findingGroup", "product/findingSource", "product/findingStorage", "product/freeOverage", "product/freeQueryTypes", "product/freeTier", "product/freeTrial", "product/freeUsageIncluded", "product/fromLocation", "product/fromLocationType", "product/fromRegionCode", "product/fusfCategory", "product/gb", "product/gets", "product/gpu", "product/gpuMemory", "product/granularity", "product/group", "product/groupDescription", "product/includedServices", "product/inputMode", "product/insightstype", "product/instance", "product/instanceFamily", "product/instanceFunction", "product/instanceName", "product/instanceType", "product/instanceTypeFamily", "product/intelAvx2Available", "product/intelAvxAvailable", "product/intelTurboAvailable", "product/io", "product/ioRequestType", "product/launchSupport", "product/license", "product/licenseModel", "product/lineType", "product/location", "product/locationType", "product/lockeprofiles", "product/logsDestination", "product/lookoutvisionimage", "product/machineLearningProcess", "product/mailboxStorage", "product/marketoption", "product/maxIopsBurstPerformance", "product/maxIopsvolume", "product/maxThroughputvolume", "product/maxVolumeSize", "product/maximumExtendedStorage", "product/maximumStorageVolume", "product/memory", "product/memoryGib", "product/memorytype", "product/messageDeliveryFrequency", "product/messageDeliveryOrder", "product/meterMode", "product/meteringType", "product/minVolumeSize", "product/minimumStorageVolume", "product/networkPerformance", "product/normalizationSizeFactor", "product/offer","product/operatingSystem", "product/operation", "product/operationsSupport", "product/opsItems", "product/origin", "product/osLicenseModel", "product/outputMode", "product/overageType", "product/parameterType", "product/physicalCores", "product/physicalCpu", "product/physicalGpu", "product/physicalProcessor", "product/pipeline", "product/platoclassificationtype", "product/platodataanalyzedtype", "product/platofeaturetype", "product/platoinstancename", "product/platoinstancetype", "product/platopagedatatype", "product/platopricingtype", "product/platopricingunittype", "product/platoprotocoltype", "product/platostoragename", "product/platostoragetype", "product/platotrafficdirection", "product/platousagetype", "product/platovolumetype", "product/preInstalledSw", "product/pricingUnit", "product/primaryplaceofuse", "product/proactiveGuidance", "product/processorArchitecture","product/processorFeatures", "product/productFamily", "product/productSchemaDescription", "product/programmaticCaseManagement", "product/protocol", "product/provisioned", "product/purchaseterm", "product/queueType", "product/readtype", "product/recipient", "product/region", "product/regionCode", "product/replicationType", "product/requestDescription", "product/requestType", "product/resource", "product/resourceEndpoint", "product/resourcePriceGroup", "product/resourceType", "product/rootvolume", "product/routingTarget", "product/routingType", "product/runningMode", "product/scanType", "product/servicecode", "product/servicename", "product/sku", "product/softwareIncluded", "product/softwareType", "product/standardGroup", "product/standardStorage", "product/standardStorageRetentionIncluded", "product/steps", "product/storage", "product/storageClass", "product/storageFamily", "product/storageMedia", "product/storageType", "product/subcategory", "product/subscriptionType", "product/supportedModes", "product/technicalSupport", "product/tenancy", "product/thirdpartySoftwareSupport", "product/throughput", "product/throughputCapacity", "product/tickettype", "product/tiertype", "product/toLocation", "product/toLocationType", "product/toRegionCode", "product/training", "product/transactionType", "product/transferType", "product/type","product/updates", "product/usageFamily", "product/usagetype", "product/uservolume", "product/vcpu", "product/version", "product/videoMemoryGib", "product/volumeApiName", "product/volumeType", "product/vpcnetworkingsupport", "product/whoCanOpenCases", "product/withActiveUsers", "pricing/LeaseContractLength", "pricing/OfferingClass", "pricing/PurchaseOption", "pricing/RateCode", "pricing/RateId", "pricing/currency", "pricing/publicOnDemandCost", "pricing/publicOnDemandRate", "pricing/term", "pricing/unit", "reservation/AmortizedUpfrontCostForUsage", "reservation/AmortizedUpfrontFeeForBillingPeriod", "reservation/EffectiveCost", "reservation/EndTime", "reservation/ModificationStatus", "reservation/NetAmortizedUpfrontCostForUsage", "reservation/NetAmortizedUpfrontFeeForBillingPeriod", "reservation/NetEffectiveCost", "reservation/NetRecurringFeeForUsage", "reservation/NetUnusedAmortizedUpfrontFeeForBillingPeriod", "reservation/NetUnusedRecurringFee", "reservation/NetUpfrontValue", "reservation/NormalizedUnitsPerReservation", "reservation/NumberOfReservations", "reservation/RecurringFeeForUsage", "reservation/ReservationARN", "reservation/StartTime", "reservation/SubscriptionId", "reservation/TotalReservedNormalizedUnits", "reservation/TotalReservedUnits", "reservation/UnitsPerReservation", "reservation/UnusedAmortizedUpfrontFeeForBillingPeriod", "reservation/UnusedNormalizedUnitQuantity", "reservation/UnusedQuantity", "reservation/UnusedRecurringFee", "reservation/UpfrontValue", "discount/BundledDiscount", "discount/SppDiscount", "discount/TotalDiscount", "savingsPlan/TotalCommitmentToDate", "savingsPlan/SavingsPlanARN", "savingsPlan/SavingsPlanRate", "savingsPlan/UsedCommitment", "savingsPlan/SavingsPlanEffectiveCost", "savingsPlan/AmortizedUpfrontCommitmentForBillingPeriod", "savingsPlan/RecurringCommitmentForBillingPeriod", "savingsPlan/StartTime", "savingsPlan/EndTime", "savingsPlan/OfferingType", "savingsPlan/PaymentOption", "savingsPlan/PurchaseTerm", "savingsPlan/Region", "savingsPlan/NetSavingsPlanEffectiveCost", "savingsPlan/NetAmortizedUpfrontCommitmentForBillingPeriod", "savingsPlan/NetRecurringCommitmentForBillingPeriod", "resourceTags/aws:createdBy", "resourceTags/user:Application Name", "resourceTags/user:CostCenter", "resourceTags/user:Environment","resourceTags/user:Name", "resourceTags/user:Owner", "resourceTags/user:Project", "resourceTags/user:Server Owner"]
}
mutate {remove_field => [ "event", "message"]}

}

I assume since logstash is working in single ingestion method, that's the reason it is processing slow. Please suggest what to do here.

How did you arrived at this conclusion?

You didn't share your Elasticsearch Specs.

Did you tried this?

download some files and try to ingest them using a pipeline with the file input and a temporary index for destination, this would help you to find if the bottleneck is Logstash or Elasticsearch.

What do you mean with this:

Are you using pipeline.workers set to 1 or you mean anything else?