I cant load my 15Gb csv data by logstash

im using docker based elk stack(sebp/elk).

i trying to load 15GB csv datas to elasticsearch.

first i tried migrate from mariadb.

input {
    jdbc {
    jdbc_driver_library => "/var/lib/elasticsearch/mariadb-java-client-2.5.4.jar"
    jdbc_driver_class => "com.mysql.cj.jdbc.Driver"
    jdbc_connection_string => "jdbc:mysql://xxxxxx/xxxxxx"
    jdbc_pool_timeout => 30000
    jdbc_paging_enabled => true
    jdbc_page_size => 100000
    jdbc_user => "xxxxxx"
    jdbc_password =>"xxxxxxxx"
    schedule => "50 * * * *"
    #tracking_column_typ => "numeric"
    use_column_value => true
    tracking_column => metering_date
    charset => "UTF-8"
    #parameters => { "tracking_date" => "metering_date" }
    statement => "SELECT wu.wateruser_id, wu.wateruser_name, wu.wateruser_type, wu.wateruser_type2, wu.wateruser_type3,wu.wateruser_gauge , substring_index(wu.wateruser_gps, ',', 1) as latitude, substring_index(wu.wateruser_gps, ',', -1) as longitude, wu.areaid, wu.dividarea, wu.dongno, wu.check_type,	mdv.install_date,  mda.metering_signal01, mda.metering_signal02, mda.metering_signal03, mda.metering_signal04, mda.metering_state, mda.metering_value, mda.modem_rssi, mda.modem_signal01, mda.modem_signal02, mda.modem_signal03,mda.metering_date, mda.receiving_date FROM meterdata mda, wateruser wu, meterdevice mdv where mdv.end_date is not null 	and wu.wateruser_org_idx=1 	and wu.mng_id = mdv.mng_id 	and mdv.modem_id = mda.modem_id order by mda.metering_date, mda.modem_id"
    }
}
filter {
}
output {
    elasticsearch {
        hosts => "localhost:9200"
        index => "sm-deagu-raw-01"
    }
    stdout {
       codec => rubydebug
    }
}

it doesnt worked........
actually nothing happened.

so i tried by csv file.

input {
file {
	path => "/var/lib/elasticsearch/sm_elk_raw.csv"
	start_position => "beginning"
}
}

filter {
csv {
	separator => ","
	columns => ["wateruser_id","wateruser_name","wateruser_type","wateruser_type2","wateruser_type3","wateruser_gauge","latitude","longitude","areaid","dividarea","dongno","check_type","mdv.install_date","metering_signal01","metering_signal02","metering_signal03","metering_signal04","metering_state","metering_value","modem_rssi","modem_signal01","modem_signal02","modem_signal03","metering_date","receiving_date"]
	skip_empty_columns => true

	add_field => ["[geoip][location]", "%{longitude}"]
	add_field => ["[geoip][location]", "%{latitude}"]
}

date { match => ["install_date", "yyyy-MM-dd HH:mm:ss"]}
date { match => ["metering_date", "yyyy-MM-dd HH:mm:ss"]}
date { match => ["receiving_date", "yyyy-MM-dd HH:mm:ss"]}

mutate { convert => ["wateruser_id", "string"] }
mutate { rename => ["wateruser_id", "wateruser ID"] }

mutate { convert => ["wateruser_name", "string"] }
mutate { rename => ["wateruser_name", "wateruser Name"] }

mutate { convert => ["wateruser_type", "string"] }
mutate { rename => ["wateruser_type", "wateruser type 1"] }

mutate { convert => ["wateruser_type2", "string"] }
mutate { rename => ["wateruser_type2", "wateruser type 2"] }

mutate { convert => ["wateruser_type3", "string"] }
mutate { rename => ["wateruser_type3", "wateruser type 2"] }

mutate { convert => ["wateruser_gauge", "integer"] }
mutate { rename => ["wateruser_gauge", "gauge"] }

rename => { "longitude" => "[geoip][longitude]"}
rename => { "latitude" => "[geoip][latitude]"}
convert => {"[geoip][longitude]" => "float"}
convert => {"[geoip][latitude]" => "float"}
convert => {"[geoip][location]" => "float"}

mutate { convert => ["areaid", "integer"] }
mutate { rename => ["areaid", "area ID"] }

mutate { convert => ["dividarea", "integer"] }
mutate { rename => ["dividarea", "Divide Area"] }

mutate { convert => ["dongno", "integer"] }
mutate { rename => ["dongno", "dong ID"] }

mutate { convert => ["check_type", "string"] }
mutate { rename => ["check_type", "water type"] }

mutate { rename => ["install_date", "install date"] }
mutate { rename => ["metering_date", "meter date"] }
mutate { rename => ["receiving_date", "receive date"] }

mutate { convert => ["metering_signal01", "integer"] }
mutate { rename => ["metering_signal01", "meter status 1"] }
	
mutate { convert => ["metering_signal02", "integer"] }
mutate { rename => ["metering_signal02", "meter status 2"] }
	
mutate { convert => ["metering_signal03", "integer"] }
mutate { rename => ["metering_signal03", "meter status 3"] }

mutate { convert => ["metering_signal04", "integer"] }
mutate { rename => ["metering_signal04", "meter status 4"] }

mutate { convert => ["metering_state", "string"] }
mutate { rename => ["metering_state", "meter state"] }

mutate { convert => ["metering_value", "float"]  }
mutate { rename => ["metering_value", "usage"] }

mutate { convert => ["modem_rssi", "integer"] }
mutate { rename => ["modem_rssi", "modem RSSI"] }

mutate { convert => ["modem_signal01", "integer"] }
mutate { rename => ["modem_signal01", "modem status 1"] }
	
mutate { convert => ["modem_signal02", "integer"] }
mutate { rename => ["modem_signal02", "modem status 2"] }
	
mutate { convert => ["modem_signal03", "integer"] }
mutate { rename => ["modem_signal03", "modem status 3"] }

}

output {
elasticsearch {
	action => "index"
	hosts => ["localhost:9200"]
	index => "smartmetring-cvs"
}
stdout { codec => rubydebug{} }

}

first i tested
bin/logstash -t /etc/logstash/conf.d/00-sm-mariadb-raw.conf

but its showing error.

[2020-03-03T05:09:31,657][INFO ][logstash.runner          ] Starting Logstash {"logstash.version"=>"6.5.1"}
[2020-03-03T05:09:32,480][ERROR][logstash.agent           ] Failed to execute action {:action=>LogStash::PipelineAction::Create/pipeline_id:main, :exception=>"LogStash::ConfigurationError", :message=>"Expected one of #, input, filter, output at line 1, column 1 (byte 1) after ", :backtrace=>["/opt/logstash/logstash-core/lib/logstash/compiler.rb:41:in `compile_imperative'", "/opt/logstash/logstash-core/lib/logstash/compiler.rb:49:in `compile_graph'", "/opt/logstash/logstash-core/lib/logstash/compiler.rb:11:in `block in compile_sources'", "org/jruby/RubyArray.java:2486:in `map'", "/opt/logstash/logstash-core/lib/logstash/compiler.rb:10:in `compile_sources'", "org/logstash/execution/AbstractPipelineExt.java:149:in `initialize'", "/opt/logstash/logstash-core/lib/logstash/pipeline.rb:22:in `initialize'", "/opt/logstash/logstash-core/lib/logstash/pipeline.rb:90:in `initialize'", "/opt/logstash/logstash-core/lib/logstash/pipeline_action/create.rb:42:in `block in execute'", "/opt/logstash/logstash-core/lib/logstash/agent.rb:92:in `block in exclusive'", "org/jruby/ext/thread/Mutex.java:148:in `synchronize'", "/opt/logstash/logstash-core/lib/logstash/agent.rb:92:in `exclusive'", "/opt/logstash/logstash-core/lib/logstash/pipeline_action/create.rb:38:in `execute'", "/opt/logstash/logstash-core/lib/logstash/agent.rb:317:in `block in converge_state'"]}
[2020-03-03T05:09:32,819][INFO ][logstash.agent           ] Successfully started Logstash API endpoint {:port=>9600}


(message chagned after this dicuss wrote. i re write hole text. )

[2020-03-03T05:57:52,512][INFO ][logstash.runner          ] Starting Logstash {"logstash.version"=>"6.5.1"}
[2020-03-03T05:57:53,836][ERROR][logstash.agent           ] Failed to execute action {:action=>LogStash::PipelineAction::Create/pipeline_id:main, :exception=>"LogStash::ConfigurationError", :message=>"Expected one of #, { at line 39, column 9 (byte 1551) after filter {\n\tcsv {\n\t\tseparator => \",\"\n\t\tcolumns => [\"wateruser_id\",\"wateruser_name\",\"wateruser_type\",\"wateruser_type2\",\"wateruser_type3\",\"wateruser_gauge\",\"latitude\",\"longitude\",\"areaid\",\"dividarea\",\"dongno\",\"check_type\",\"mdv.install_date\",\"metering_signal01\",\"metering_signal02\",\"metering_signal03\",\"metering_signal04\",\"metering_state\",\"metering_value\",\"modem_rssi\",\"modem_signal01\",\"modem_signal02\",\"modem_signal03\",\"metering_date\",\"receiving_date\"]\n\t\tskip_empty_columns => true\n\n\t\tadd_field => [\"[geoip][location]\", \"%{longitude}\"]\n\t\tadd_field => [\"[geoip][location]\", \"%{latitude}\"]\n\t}\n\n\tdate { match => [\"install_date\", \"yyyy-MM-dd HH:mm:ss\"]}\n\tdate { match => [\"metering_date\", \"yyyy-MM-dd HH:mm:ss\"]}\n\tdate { match => [\"receiving_date\", \"yyyy-MM-dd HH:mm:ss\"]}\n\n\tmutate { convert => [\"wateruser_id\", \"string\"] }\n\tmutate { rename => [\"wateruser_id\", \"wateruser ID\"] }\n\n\tmutate { convert => [\"wateruser_name\", \"string\"] }\n\tmutate { rename => [\"wateruser_name\", \"wateruser Name\"] }\n\n\tmutate { convert => [\"wateruser_type\", \"string\"] }\n\tmutate { rename => [\"wateruser_type\", \"wateruser type 1\"] }\n\n\tmutate { convert => [\"wateruser_type2\", \"string\"] }\n\tmutate { rename => [\"wateruser_type2\", \"wateruser type 2\"] }\n\t\n\tmutate { convert => [\"wateruser_type3\", \"string\"] }\n\tmutate { rename => [\"wateruser_type3\", \"wateruser type 2\"] }\n\t\n\tmutate { convert => [\"wateruser_gauge\", \"integer\"] }\n\tmutate { rename => [\"wateruser_gauge\", \"gauge\"] }\n\t\n\trename ", :backtrace=>["/opt/logstash/logstash-core/lib/logstash/compiler.rb:41:in `compile_imperative'", "/opt/logstash/logstash-core/lib/logstash/compiler.rb:49:in `compile_graph'", "/opt/logstash/logstash-core/lib/logstash/compiler.rb:11:in `block in compile_sources'", "org/jruby/RubyArray.java:2486:in `map'", "/opt/logstash/logstash-core/lib/logstash/compiler.rb:10:in `compile_sources'", "org/logstash/execution/AbstractPipelineExt.java:149:in `initialize'", "/opt/logstash/logstash-core/lib/logstash/pipeline.rb:22:in `initialize'", "/opt/logstash/logstash-core/lib/logstash/pipeline.rb:90:in `initialize'", "/opt/logstash/logstash-core/lib/logstash/pipeline_action/create.rb:42:in `block in execute'", "/opt/logstash/logstash-core/lib/logstash/agent.rb:92:in `block in exclusive'", "org/jruby/ext/thread/Mutex.java:148:in `synchronize'", "/opt/logstash/logstash-core/lib/logstash/agent.rb:92:in `exclusive'", "/opt/logstash/logstash-core/lib/logstash/pipeline_action/create.rb:38:in `execute'", "/opt/logstash/logstash-core/lib/logstash/agent.rb:317:in `block in converge_state'"]}
[2020-03-03T05:57:54,130][INFO ][logstash.agent           ] Successfully started Logstash API endpoint {:port=>9600}

maybe this part is problem
rename => { "longitude" => "[geoip][longitude]"}
rename => { "latitude" => "[geoip][latitude]"}

here sample data of the csv
wateruser_id ,wateruser_name,wateruser_type, wateruser_type2 ,wateruser_type3,wateruser_gauge,latitude,longitude , areaid,dividarea ,dongno,check_type,install_date ,metering_signal01,metering_signal02,metering_signal03,metering_signal04 ,metering_state,metering_value,modem_rssi,modem_signal01,modem_signal02,modem_signal03,metering_date,receiving_date
06-025-222-2200-00_A00,xxx,H,A00,13,35.7878851,128.6685566,06,222,025,E2 ,2017-04-25 10:29:54,0,0,0,4,정상,0.004,124,3,0,1,2017-01-01 00:00:22,2018-02-03 10:11:45

what am i wrong?

w u help me?

Do you have any other files in the config directory?

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.