I installed the ELK stack from Elastic's repositories, and am encountering problems while attempting to ingest a 1 GB CSV file on an Amazon Linux (AWS) instance. This is my first attempt at ELK, so there's probably something simple I'm missing.
I've exceeded the character count, and will post the output of:
./logstash -f /etc/logstash/conf.d/10-aws.conf -v --verbose --debug
in the following message.
No new messages are written to the /var/log/logstash directory. With the debug and verbose logging options, shouldn't I be seeing (at a minimum) processed lines from the CSV in the console output? I wondered if it's an incompatibility with Amazon Linux or OpenJDK, or my instance (t2.micro) is too small. But I imagine I would receive error message for incompatibilities, or see extremely slow processing if that were the case.
Attempted resolutions
- chmod 777 the CSV file, to ensure it's readable.
- Explicitly specify the CSV file in the Logstash configuration.
- Adding sincedb_path => "/dev/null" to the file filter, in order to force reparsing.
- Verified that no indexes were created in ElasticSearch.
- Deleted .sincedb files in /root. One contained 0 0 0 0. The other (more recent) was empty.
Setup
ElasticSearch 2.2.0
Kibana 4.4.0
Logstash 2.2.0
All were installed from Elastic's repositories. The only change I made was to set chkconfig on.
java version "1.7.0_95"
OpenJDK Runtime Environment (amzn-2.6.4.0.65.amzn1-x86_64 u95-b00)
OpenJDK 64-Bit Server VM (build 24.95-b01, mixed mode)
Amazon Linux AMI release 2015.09 on t2.micro instance
Logstash script
input {
file {
path => "/tmp/elk_data/aws_invoices/aws-billing-detailed-2015-01.csv"
# If CSV modified, reload from beginning. Default setting is
# "end", and will index new lines appended to the file.
start_position => "beginning"
# Force reparse.
sincedb_path => "/dev/null"
}
}
filter {
csv {
# Include all columns and drop later as necessary.
columns => [
"InvoiceID", "PayerAccountID", "LinkedAccountID",
"RecordType", "RecordID", "ProductName",
"RateID", "SubscriptionID", "PricingPlanID",
"UsageType", "Operation", "AvailabilityZone",
"ReservedInstance", "ItemDescription", "UsageStartDate",
"UsageEndDate", "UsageQuantity", "BlendedRate",
"BlendedCost", "UnblendedRate","UnblendedCost",
"ResourceID", "AutoscalingGroupName", "cfLogicalID",
"cfStackID", "cfStackName", "tagEnvironment",
"tagName", "tagOwner"
]
# Smart enough to detect text qualifier?
separator => ","
}
drop {
remove_field => ["AutoscalingGroupName", "cfLogicalID",
"cfStackID", "cfStackName", "tagEnvironment",
"tagName", "tagOwner"]
}
# Drop header, and rows containing invoice totals.
if [InvoiceID] == "InvoiceID" or "Total" in [RecordType] {
drop { }
}
ruby {
code => "
event['UsageStartDate'] = Date.parse(event['UsageStartDate']);
event['UsageEndDate'] = Date.parse(event['UsageEndDate']);
"
}
mutate {
convert => {
# Convert decimal values from string to float.
"UsageQuantity" => "float"
"BlendedRate" => "float"
"BlendedCost" => "float"
"UnblendedRate" => "float"
"UnblendedCost" => "float"
}
}
}
output {
elasticsearch {
hosts => ["localhost:9200"]
action => "index"
}
stdout {
codec => rubydebug
}
}