Unable to load CSV file into Elasticsearch

Hi,

Logstash uploads the data into Kibana once and sometimes it doesn't even though I use the same same config file.

I use the same config file to load the same data again after deleting the index it created in Kibana only. I make some small changes and upload it again. When I do this I get the following output.

C:\Users\ramya.t\logstash-7.3.0\bin>logstash -f C:\Users\ramya.t\Downloads\Colt\retail.conf
Java HotSpot(TM) 64-Bit Server VM warning: Option UseConcMarkSweepGC was deprecated in version 9.0 and will likely be removed in a future release.
WARNING: An illegal reflective access operation has occurred
WARNING: Illegal reflective access by org.jruby.runtime.encoding.EncodingService (file:/C:/Users/ramya.t/logstash-7.3.0/logstash-core/lib/jars/jruby-complete-9.2.7.0.jar) to field java.io.Console.cs
WARNING: Please consider reporting this to the maintainers of org.jruby.runtime.encoding.EncodingService
WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations
WARNING: All illegal access operations will be denied in a future release
Thread.exclusive is deprecated, use Thread::Mutex
Sending Logstash logs to C:/Users/ramya.t/logstash-7.3.0/logs which is now configured via log4j2.properties
[2019-09-10T09:59:15,782][WARN ][logstash.config.source.multilocal] Ignoring the 'pipelines.yml' file because modules or command line options are specified
[2019-09-10T09:59:15,792][INFO ][logstash.runner ] Starting Logstash {"logstash.version"=>"7.3.0"}
[2019-09-10T09:59:17,980][INFO ][org.reflections.Reflections] Reflections took 29 ms to scan 1 urls, producing 19 keys and 39 values
[2019-09-10T09:59:19,347][INFO ][logstash.outputs.elasticsearch] Elasticsearch pool URLs updated {:changes=>{:removed=>, :added=>[http://localhost:9200/]}}
[2019-09-10T09:59:19,481][WARN ][logstash.outputs.elasticsearch] Restored connection to ES instance {:url=>"http://localhost:9200/"}
[2019-09-10T09:59:19,516][INFO ][logstash.outputs.elasticsearch] ES Output version determined {:es_version=>7}
[2019-09-10T09:59:19,519][WARN ][logstash.outputs.elasticsearch] Detected a 6.x and above cluster: the type event field won't be used to determine the document _type {:es_version=>7}
[2019-09-10T09:59:19,538][INFO ][logstash.outputs.elasticsearch] New Elasticsearch output {:class=>"LogStash::Outputs::ElasticSearch", :hosts=>["//localhost"]}
[2019-09-10T09:59:19,589][INFO ][logstash.outputs.elasticsearch] Using default mapping template
[2019-09-10T09:59:19,631][INFO ][logstash.outputs.elasticsearch] Attempting to install template {:manage_template=>{"index_patterns"=>"logstash-", "version"=>60001, "settings"=>{"index.refresh_interval"=>"5s", "number_of_shards"=>1}, "mappings"=>{"dynamic_templates"=>[{"message_field"=>{"path_match"=>"message", "match_mapping_type"=>"string", "mapping"=>{"type"=>"text", "norms"=>false}}}, {"string_fields"=>{"match"=>"", "match_mapping_type"=>"string", "mapping"=>{"type"=>"text", "norms"=>false, "fields"=>{"keyword"=>{"type"=>"keyword", "ignore_above"=>256}}}}}], "properties"=>{"@timestamp"=>{"type"=>"date"}, "@version"=>{"type"=>"keyword"}, "geoip"=>{"dynamic"=>true, "properties"=>{"ip"=>{"type"=>"ip"}, "location"=>{"type"=>"geo_point"}, "latitude"=>{"type"=>"half_float"}, "longitude"=>{"type"=>"half_float"}}}}}}}
[2019-09-10T09:59:19,636][WARN ][org.logstash.instrument.metrics.gauge.LazyDelegatingGauge] A gauge metric of an unknown type (org.jruby.specialized.RubyArrayOneObject) has been create for key: cluster_uuids. This may result in invalid serialization. It is recommended to log an issue to the responsible developer/development team.
[2019-09-10T09:59:19,639][INFO ][logstash.javapipeline ] Starting pipeline {:pipeline_id=>"main", "pipeline.workers"=>4, "pipeline.batch.size"=>125, "pipeline.batch.delay"=>50, "pipeline.max_inflight"=>500, :thread=>"#<Thread:0x501ad2c2 run>"}
[2019-09-10T09:59:20,228][INFO ][logstash.inputs.file ] No sincedb_path set, generating one based on the "path" setting {:sincedb_path=>"C:/Users/ramya.t/logstash-7.3.0/data/plugins/inputs/file/.sincedb_8b2a1e7d1249525b6e487bf33b908668", :path=>["C:/Users/ramya.t/Downloads/Colt/ELK/CC_FINAL_Master.csv"]}
[2019-09-10T09:59:20,247][INFO ][logstash.javapipeline ] Pipeline started {"pipeline.id"=>"main"}
[2019-09-10T09:59:20,307][INFO ][logstash.agent ] Pipelines running {:count=>1, :running_pipelines=>[:main], :non_running_pipelines=>}
[2019-09-10T09:59:20,312][INFO ][filewatch.observingtail ] START, creating Discoverer, Watch with file and sincedb collections
[2019-09-10T09:59:20,881][INFO ][logstash.agent ] Successfully started Logstash API endpoint {:port=>9600}

The index does not created after this, and logstash does not read the data after this.

Kindly help!!

My Config file is,

My congfig file is:

input {
file {
path => "C:/Users/ramya.t/Downloads/Colt/ELK/CC_FINAL_Master.csv"
start_position => "beginning"
}
}

filter {
csv {
separator => ","
columns => [ "USERNAME", "SEQUENCEID", "REQUEST_TIMESTAMP", "A_FLOORSUITE",
"A_BUILDINGNAME", "A_PREMISESNUMBER", "A_STREETNAME", "A_CITYTOWN",
"A_STATE", "A_POSTALZIPCODE", "A_LATITUDE", "A_LONGITUDE",
"A_LONGADDRESS", "A_SITETELEPHONENUMBER", "A_RADIUS", "A_ISHUB",
"A_COLTOPERATINGCOUNTRY", "A_REQUIREDPRODUCT", "A_BANDWIDTH",
"A_CONNECTIVITYTYPE1", "A_CONNECTIVITYTYPE2", "A_CONNECTIVITYTYPE3",
"A_CONNECTIVITYTYPE4", "B_FLOORSUITE", "B_BUILDINGNAME",
"B_PREMISESNUMBER", "B_STREETNAME", "B_CITYTOWN", "B_STATE",
"B_POSTALZIPCODE", "B_LATITUDE", "B_LONGITUDE", "B_LONGADDRESS",
"B_SITETELEPHONENUMBER", "B_RADIUS", "B_ISHUB",
"B_COLTOPERATINGCOUNTRY", "B_REQUIREDPRODUCT", "B_BANDWIDTH",
"B_CONNECTIVITYTYPE1", "B_CONNECTIVITYTYPE2", "B_CONNECTIVITYTYPE3",
"B_CONNECTIVITYTYPE4", "SCHEMAVERSION", "REQUESTTYPE", "RESPONSE",
"RESP_ERRORTYPE", "RESP_ERRORCODE", "STATUS", "ONNET_STATUS",
"ONNETAEND_STATUS", "ONNETBEND_STATUS", "OFFNET_STATUS",
"OFFNETOPTION_STATUS", "OFFNETOPTION_AEND_STATUS",
"OFFNETOPTION_BEND_STATUS", "OLO_STATUS", "OLOAEND_STATUS",
"OLOBEND_STATUS", "OLOOPTION_STATUS", "OLOOPTAENDRES_STATUS",
"OLOOPTBENDRES_STATUS", "NEARNETSTATUS_STATUS",
"NEARNETSTATUS_AENDRESULT_STATUS", "NEARNETSTATUS_BENDRESULT_STATUS","is_A_country_colt", "is_A_city_colt", "is_B_country_colt", "is_B_city_colt"]
}

   if [SCHEMAVERSION] == "5.0" {
      mutate {
               replace => [ "SCHEMAVERSION", "5" ]
                }
            }
    if [SCHEMAVERSION] == "4.0" {
      mutate {
               replace => [ "SCHEMAVERSION", "4" ]
                }
            }
     if [SCHEMAVERSION] == "3.0" {
      mutate {
               replace => [ "SCHEMAVERSION", "3" ]
                }
            }
       if [SCHEMAVERSION] == "2.0" {
      mutate {
               replace => [ "SCHEMAVERSION", "2" ]
                }
            }
   
    if [SCHEMAVERSION] == "6.0" {
      mutate {
               replace => [ "SCHEMAVERSION", "6" ]
                }
            }

    if [SCHEMAVERSION] == "1.0" {
      mutate {
               replace => [ "SCHEMAVERSION", "1" ]
                }
            }

    if [A_COLTOPERATINGCOUNTRY] == "" {
      mutate {
               replace => [ "is_A_country_colt", "" ]
                }
            }
    if [A_CITYTOWN] == "" {
      mutate {
               replace => [ "is_A_city_colt", "" ]
                }
            }
    if [B_COLTOPERATINGCOUNTRY] == "" {
      mutate {
               replace => [ "is_B_country_colt", "" ]
                }
            }
    if [B_CITYTOWN] == "" {
      mutate {
               replace => [ "is_B_city_colt", "" ]
                }
            }

   date {
   match => [ "REQUEST_TIMESTAMP", "yyyy-MM-dd HH:mm:ss" ]
  }

}

output {
elasticsearch {
hosts => "localhost"
index => "ccfinal1"
}
stdout {}
}

What does retail.conf contain?

input {
file {
path => "C:/Users/ramya.t/Downloads/Colt/CC_FINAL_Master.csv"
start_position => "beginning"
}
}

filter {
csv {
separator => ","
columns => [ "USERNAME", "SEQUENCEID", "REQUEST_TIMESTAMP", "A_FLOORSUITE",
"A_BUILDINGNAME", "A_PREMISESNUMBER", "A_STREETNAME", "A_CITYTOWN",
"A_STATE", "A_POSTALZIPCODE", "A_LATITUDE", "A_LONGITUDE",
"A_LONGADDRESS", "A_SITETELEPHONENUMBER", "A_RADIUS", "A_ISHUB",
"A_COLTOPERATINGCOUNTRY", "A_REQUIREDPRODUCT", "A_BANDWIDTH",
"A_CONNECTIVITYTYPE1", "A_CONNECTIVITYTYPE2", "A_CONNECTIVITYTYPE3",
"A_CONNECTIVITYTYPE4", "B_FLOORSUITE", "B_BUILDINGNAME",
"B_PREMISESNUMBER", "B_STREETNAME", "B_CITYTOWN", "B_STATE",
"B_POSTALZIPCODE", "B_LATITUDE", "B_LONGITUDE", "B_LONGADDRESS",
"B_SITETELEPHONENUMBER", "B_RADIUS", "B_ISHUB",
"B_COLTOPERATINGCOUNTRY", "B_REQUIREDPRODUCT", "B_BANDWIDTH",
"B_CONNECTIVITYTYPE1", "B_CONNECTIVITYTYPE2", "B_CONNECTIVITYTYPE3",
"B_CONNECTIVITYTYPE4", "SCHEMAVERSION", "REQUESTTYPE", "RESPONSE",
"RESP_ERRORTYPE", "RESP_ERRORCODE", "STATUS", "ONNET_STATUS",
"ONNETAEND_STATUS", "ONNETBEND_STATUS", "OFFNET_STATUS",
"OFFNETOPTION_STATUS", "OFFNETOPTION_AEND_STATUS",
"OFFNETOPTION_BEND_STATUS", "OLO_STATUS", "OLOAEND_STATUS",
"OLOBEND_STATUS", "OLOOPTION_STATUS", "OLOOPTAENDRES_STATUS",
"OLOOPTBENDRES_STATUS", "NEARNETSTATUS_STATUS",
"NEARNETSTATUS_AENDRESULT_STATUS", "NEARNETSTATUS_BENDRESULT_STATUS","is_A_country_colt", "is_A_city_colt", "is_B_country_colt", "is_B_city_colt"]
}

   if [SCHEMAVERSION] == "5.0" {
      mutate {
               replace => [ "SCHEMAVERSION", "5" ]
                }
            }
    if [SCHEMAVERSION] == "4.0" {
      mutate {
               replace => [ "SCHEMAVERSION", "4" ]
                }
            }
     if [SCHEMAVERSION] == "3.0" {
      mutate {
               replace => [ "SCHEMAVERSION", "3" ]
                }
            }
       if [SCHEMAVERSION] == "2.0" {
      mutate {
               replace => [ "SCHEMAVERSION", "2" ]
                }
            }
   
    if [SCHEMAVERSION] == "6.0" {
      mutate {
               replace => [ "SCHEMAVERSION", "6" ]
                }
            }

    if [SCHEMAVERSION] == "1.0" {
      mutate {
               replace => [ "SCHEMAVERSION", "1" ]
                }
            }

    if [A_COLTOPERATINGCOUNTRY] == "" {
      mutate {
               replace => [ "is_A_country_colt", "" ]
                }
            }
    if [A_CITYTOWN] == "" {
      mutate {
               replace => [ "is_A_city_colt", "" ]
                }
            }
    if [B_COLTOPERATINGCOUNTRY] == "" {
      mutate {
               replace => [ "is_B_country_colt", "" ]
                }
            }
    if [B_CITYTOWN] == "" {
      mutate {
               replace => [ "is_B_city_colt", "" ]
                }
            }

   date {
   match => [ "REQUEST_TIMESTAMP", "yyyy-MM-dd HH:mm:ss" ]
  }

}

output {
elasticsearch {
hosts => "localhost"
index => "ccfinal1"
}
stdout {}
}

That is the expected behaviour of a file input. It tracks how much of the file it has read in the sincedb. The start_position option only affects what it reads the first time it sees a file. It has no effect after a restart.

It is possible you want to use

sincedb_path => "NUL"
1 Like

Thank you @Badger!! It got in now : )