Logstash csv output plugin problem

Hi there,
I want to output elasticsearch documents to csv file with using logstash, but faced some problems...

tail -f /var/log/logstash/logstash-plain.log

[2020-07-10T11:11:02,561][ERROR][logstash.javapipeline    ][elastiflow][output_elasticsearch_csv] A plugin had an unrecoverable error. Will restart this pl
  Pipeline_id:elastiflow
  Plugin: <LogStash::Inputs::Elasticsearch password=><password>, hosts=>["10.250.31.42:9200"], index=>"cic-format-%{+YYYY.MM.dd}", id=>"output_elasticsearc:Codecs::JSON id=>"json_a21bbcfa-b453-44a2-9cd9-cd6c72507134", enable_metric=>true, charset=>"UTF-8">, query=>"{ \"sort\": [ \"_doc\" ] }", size=>1000, scrid"], ssl=>false>
  Error: [404] {"error":{"root_cause":[{"type":"index_not_found_exception","reason":"no such index [cic-format-%{+YYYY.MM.dd}]","resource.type":"index_or_aic-format-%{+YYYY.MM.dd}"}],"type":"index_not_found_exception","reason":"no such index [cic-format-%{+YYYY.MM.dd}]","resource.type":"index_or_alias","resou+YYYY.MM.dd}"},"status":404}
  Exception: Elasticsearch::Transport::Transport::Errors::NotFound
  Stack: /usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/elasticsearch-transport-5.0.5/lib/elasticsearch/transport/transport/base.rb:202:in `__raise_tra
/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/elasticsearch-transport-5.0.5/lib/elasticsearch/transport/transport/base.rb:319:in `perform_request'
/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/elasticsearch-transport-5.0.5/lib/elasticsearch/transport/transport/http/manticore.rb:67:in `perform_req
/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/elasticsearch-transport-5.0.5/lib/elasticsearch/transport/client.rb:131:in `perform_request'
/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/elasticsearch-api-5.0.5/lib/elasticsearch/api/actions/search.rb:183:in `search'
/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-input-elasticsearch-4.7.0/lib/logstash/inputs/elasticsearch.rb:340:in `search_request'
/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-input-elasticsearch-4.7.0/lib/logstash/inputs/elasticsearch.rb:268:in `do_run_slice'
/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-input-elasticsearch-4.7.0/lib/logstash/inputs/elasticsearch.rb:246:in `do_run'
/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-input-elasticsearch-4.7.0/lib/logstash/inputs/elasticsearch.rb:234:in `run'
/usr/share/logstash/logstash-core/lib/logstash/java_pipeline.rb:345:in `inputworker'
/usr/share/logstash/logstash-core/lib/logstash/java_pipeline.rb:336:in `block in start_input'

my config:

input {
  # elasticsearch
  elasticsearch {
    id => "output_elasticsearch_csv"
    hosts => "10.250.31.42:9200"
    index => "cic-format-%{+YYYY.MM.dd}"
    user => "${user}"
    password => "${pwd}"
    docinfo => true
  }
}

output {
  csv {
    # elastic field name
    fields => ["_id", "fl_dur", "tot_fw_pk", "tot_bw_pk", "fw_pkt_l_avg", "bw_pkt_l_avg", "fl_byt_s", "fl_pkt_s", "fw_iat_avg", "bw_iat_avg", "fw_pkt_s", "bw_pkt_s", "down_up_radio", "pkt_size_avg", "fw_seg_avg", "bw_seg_avg", "fw_has_fin", "fw_has_syn", "fw_has_rst", "fw_has_pst", "fw_has_ack", "fw_has_urg", "fw_has_cwe", "fw_has_ece", "bw_has_fin", "bw_has_syn", "bw_has_rst", "bw_has_pst", "bw_has_ack", "bw_has_urg", "bw_has_cwe", "bw_has_ece", "fw_dur", "bw_dur", "fw_src_addr", "fw_dst_addr", "fw_src_port", "fw_dst_port", "fw_ip_protocol", "fw_ip_tos", "fw_src_as", "fw_src_asn", "fw_dst_asn", "fw_input_snmp", "fw_output_snmp", "bw_src_addr", "bw_dst_addr", "bw_src_port", "bw_src_port", "bw_ip_protocol", "bw_ip_tos", "bw_src_as", "bw_src_asn", "bw_dst_asn", "bw_input_snmp", "bw_output_snmp"]
    path => "/var/log/logstash/csv/cic-%{+YYYY.MM.dd}.csv"
  }

}

my data in elasticsearch:

{
  "_index": "cic-format-2020.07.10",
  "_type": "_doc",
  "_id": "8D5YOHMBB49SfiIi_Iau",
  "_version": 1,
  "_score": 0,
  "_source": {
    "fl_dur": 104,
    "fw_dur": 52,
    "bw_dur": 52,
    "tot_fw_pk": 5,
    "tot_bw_pk": 5,
    "tot_l_fw_pkt": 589,
    "tot_l_bw_pkt": 589,
    "fw_pkt_l_avg": 117,
    "bw_pkt_l_avg": 117,
    "fl_byt_s": 1178000000,
    "fl_pkt_s": 10000000,
    "fw_iat_avg": 10,
    "bw_iat_avg": 10,
    "fw_pkt_s": 5000000,
    "bw_pkt_s": 5000000,
    "down_up_radio": 1,
    "pkt_size_avg": 117,
    "fw_seg_avg": 117,
    "bw_seg_avg": 117,
    "fw_src_addr": "163.29.98.82",
    "fw_dst_addr": "10.250.35.20",
    "fw_src_port": 80,
    "fw_dst_port": 63179,
    "fw_ip_protocol": "TCP",
    "fw_ip_tos": 0,
    "fw_src_as": "Data Communication Business Group (4782)",
    "fw_src_asn": 0,
    "fw_dst_asn": 0,
    "fw_input_snmp": "0",
    "fw_output_snmp": "0",
    "bw_src_addr": "163.29.98.82",
    "bw_dst_addr": "10.250.35.20",
    "bw_src_port": 80,
    "bw_ip_protocol": "TCP",
    "bw_ip_tos": 0,
    "bw_src_as": "Data Communication Business Group (4782)",
    "bw_src_asn": 0,
    "bw_dst_asn": 0,
    "bw_input_snmp": "0",
    "bw_output_snmp": "0",
    "fw_has_fin": true,
    "fw_has_syn": true,
    "fw_has_rst": false,
    "fw_has_pst": false,
    "fw_has_ack": true,
    "fw_has_urg": false,
    "fw_has_cwe": false,
    "fw_has_ece": false,
    "bw_has_fin": true,
    "bw_has_syn": true,
    "bw_has_rst": false,
    "bw_has_pst": false,
    "bw_has_ack": true,
    "bw_has_urg": false,
    "bw_has_cwe": false,
    "bw_has_ece": false
  }
}

And there also a csv file in the path I specified, but file is empty,

Does anyone know how to solve this problem, please?
Kase

You don't have an "csv output plugin problem". You just didn't read your error message :slight_smile: Your problem is the input:

"reason":"no such index [cic-format-%{+YYYY.MM.dd}]"

You are trying to use Logstash's date syntax which does not work here, so it's literally looking for an index called cic-format-%{+YYYY.MM.dd}. I haven't tried it, but I guess that Elasticsearch date math might work?

1 Like

Hi @Jenni,
thanks for your answering, I later learned why sprintf format didn't working,in doc, it says
Because of their dependency on events and fields, the following configuration options will only work within filter and output blocks.
So it does'n work in input blocks.
I also tried your method, but it couldn't work either..., I want to try this, but don't know how to set today's date as an environmental variable.
Does anyone know how to do it?
Thanks.
Kase

I also tried not to use dynamic variable in index, but there is no information in the output file, does anyone have any ideas?

input {
  # elasticsearch
  elasticsearch {
    id => "input_elasticsearch"
    hosts => "10.250.31.42:9200"
    index => "cic-format-2020.07.13"
    user => "${user}"
    password => "${pwd}"
    docinfo => true
    schedule => "* * * * *"
  }
}

output {
  csv {
    id => "output_csv"
    # elastic field name
    fields => ["[@metadata][_id]", "fl_dur", "tot_fw_pk", "tot_bw_pk", "fw_pkt_l_avg", "bw_pkt_l_avg", "fl_byt_s", "fl_pkt_s", "fw_iat_avg", "bw_iat_avg", "fw_pkt_s", "bw_pkt_s", "down_up_radio", "pkt_size_avg", "fw_seg_avg", "bw_seg_avg", "fw_has_fin", "fw_has_syn", "fw_has_rst", "fw_has_pst", "fw_has_ack", "fw_has_urg", "fw_has_cwe", "fw_has_ece", "bw_has_fin", "bw_has_syn", "bw_has_rst", "bw_has_pst", "bw_has_ack", "bw_has_urg", "bw_has_cwe", "bw_has_ece", "fw_dur", "bw_dur", "fw_src_addr", "fw_dst_addr", "fw_src_port", "fw_dst_port", "fw_ip_protocol", "fw_ip_tos", "fw_src_as", "fw_src_asn", "fw_dst_asn", "fw_input_snmp", "fw_output_snmp", "bw_src_addr", "bw_dst_addr", "bw_src_port", "bw_src_port", "bw_ip_protocol", "bw_ip_tos", "bw_src_as", "bw_src_asn", "bw_dst_asn", "bw_input_snmp", "bw_output_snmp"]
    path => "/var/log/logstash/csv/cic-%{+YYYY.MM.dd}.csv"
  }

}

After modifying the configuration file, there are also errors that have not occurred in logstash.

[2020-07-13T06:18:01,848][WARN ][logstash.outputs.elasticsearch][elastiflow][output_elasticsearch_single] Could not index event to Elasticsearch. {:status=>400, :action=>["index", {:_id=>nil, :_index=>"elastiflow-3.5.3-2020.07.13", :routing=>nil, :_type=>"_doc"}, #<LogStash::Event:0x4ab506ed>], :response=>{"index"=>{"_index"=>"elastiflow-3.5.3-2020.07.13", "_type"=>"_doc", "_id"=>"i6fSRnMBB49SfiIit8f0", "status"=>400, "error"=>{"type"=>"mapper_parsing_exception", "reason"=>"failed to parse field [node.ipaddr] of type [ip] in document with id 'i6fSRnMBB49SfiIit8f0'. Preview of field's value: '%{host}'", "caused_by"=>{"type"=>"illegal_argument_exception", "reason"=>"'%{host}' is not an IP string literal."}}}}}
[2020-07-13T06:18:01,851][WARN ][logstash.outputs.elasticsearch][elastiflow][output_elasticsearch_single] Could not index event to Elasticsearch. {:status=>400, :action=>["index", {:_id=>nil, :_index=>"elastiflow-3.5.3-2020.07.13", :routing=>nil, :_type=>"_doc"}, #<LogStash::Event:0x64a2d3db>], :response=>{"index"=>{"_index"=>"elastiflow-3.5.3-2020.07.13", "_type"=>"_doc", "_id"=>"jKfSRnMBB49SfiIit8f0", "status"=>400, "error"=>{"type"=>"mapper_parsing_exception", "reason"=>"failed to parse field [node.ipaddr] of type [ip] in document with id 'jKfSRnMBB49SfiIit8f0'. Preview of field's value: '%{host}'", "caused_by"=>{"type"=>"illegal_argument_exception", "reason"=>"'%{host}' is not an IP string literal."}}}}}
[2020-07-13T06:18:01,852][WARN ][logstash.outputs.elasticsearch][elastiflow][output_elasticsearch_single] Could not index event to Elasticsearch. {:status=>400, :action=>["index", {:_id=>nil, :_index=>"elastiflow-3.5.3-2020.07.13", :routing=>nil, :_type=>"_doc"}, #<LogStash::Event:0x576197de>], :response=>{"index"=>{"_index"=>"elastiflow-3.5.3-2020.07.13", "_type"=>"_doc", "_id"=>"jafSRnMBB49SfiIit8f0", "status"=>400, "error"=>{"type"=>"mapper_parsing_exception", "reason"=>"failed to parse field [node.ipaddr] of type [ip] in document with id 'jafSRnMBB49SfiIit8f0'. Preview of field's value: '%{host}'", "caused_by"=>{"type"=>"illegal_argument_exception", "reason"=>"'%{host}' is not an IP string literal."}}}}}
[2020-07-13T06:18:01,854][WARN ][logstash.outputs.elasticsearch][elastiflow][output_elasticsearch_single] Could not index event to Elasticsearch. {:status=>400, :action=>["index", {:_id=>nil, :_index=>"elastiflow-3.5.3-2020.07.13", :routing=>nil, :_type=>"_doc"}, #<LogStash::Event:0x4253a2f1>], :response=>{"index"=>{"_index"=>"elastiflow-3.5.3-2020.07.13", "_type"=>"_doc", "_id"=>"jqfSRnMBB49SfiIit8f0", "status"=>400, "error"=>{"type"=>"mapper_parsing_exception", "reason"=>"failed to parse field [node.ipaddr] of type [ip] in document with id 'jqfSRnMBB49SfiIit8f0'. Preview of field's value: '%{host}'", "caused_by"=>{"type"=>"illegal_argument_exception", "reason"=>"'%{host}' is not an IP string literal."}}}}}

If I remove the csv config, there will be no errors....

Seems like you don't have a field host. So your node.ipaddr is "%{host}" and therefore invalid for an IP field in ES:

%{host}' is not an IP string literal.

Hi @Jenni,
thanks for your answering, I am wondering why the config of csv output would affect others output config, isn't it weird?
I tried remove the config of csv output, there would not output any warning error in logstash.
But I need the csv output config..., I am using elastiflow to handle netflow dataset, I found one of elastiflow filter config has [node][ipaddr] => ${host} , but I don't know why csv output config would affect filter config..

My logstash pipeline.yml

- pipeline.id: main
  path.config: "/etc/logstash/conf.d/*.conf"

- pipeline.id: elastiflow
  path.config: "/etc/logstash/elastiflow/conf.d/*.conf"

I used to put my csv output config in /etc/logstash/elastiflow/conf.d/, and I move this file to /etc/logstash/conf.d/, after that, there is no errors like

[2020-07-13T06:18:01,848][WARN ][logstash.outputs.elasticsearch][elastiflow][output_elasticsearch_single] Could not index event to Elasticsearch. {:status=>400, :action=>["index", {:_id=>nil, :_index=>"elastiflow-3.5.3-2020.07.13", :routing=>nil, :_type=>"_doc"}, #<LogStash::Event:0x4ab506ed>], :response=>{"index"=>{"_index"=>"elastiflow-3.5.3-2020.07.13", "_type"=>"_doc", "_id"=>"i6fSRnMBB49SfiIit8f0", "status"=>400, "error"=>{"type"=>"mapper_parsing_exception", "reason"=>"failed to parse field [node.ipaddr] of type [ip] in document with id 'i6fSRnMBB49SfiIit8f0'. Preview of field's value: '%{host}'", "caused_by"=>{"type"=>"illegal_argument_exception", "reason"=>"'%{host}' is not an IP string literal."}}}}}
[2020-07-13T06:18:01,851][WARN ][logstash.outputs.elasticsearch][elastiflow][output_elasticsearch_single] Could not index event to Elasticsearch. {:status=>400, :action=>["index", {:_id=>nil, :_index=>"elastiflow-3.5.3-2020.07.13", :routing=>nil, :_type=>"_doc"}, #<LogStash::Event:0x64a2d3db>], :response=>{"index"=>{"_index"=>"elastiflow-3.5.3-2020.07.13", "_type"=>"_doc", "_id"=>"jKfSRnMBB49SfiIit8f0", "status"=>400, "error"=>{"type"=>"mapper_parsing_exception", "reason"=>"failed to parse field [node.ipaddr] of type [ip] in document with id 'jKfSRnMBB49SfiIit8f0'. Preview of field's value: '%{host}'", "caused_by"=>{"type"=>"illegal_argument_exception", "reason"=>"'%{host}' is not an IP string literal."}}}}}
[2020-07-13T06:18:01,852][WARN ][logstash.outputs.elasticsearch][elastiflow][output_elasticsearch_single] Could not index event to Elasticsearch. {:status=>400, :action=>["index", {:_id=>nil, :_index=>"elastiflow-3.5.3-2020.07.13", :routing=>nil, :_type=>"_doc"}, #<LogStash::Event:0x576197de>], :response=>{"index"=>{"_index"=>"elastiflow-3.5.3-2020.07.13", "_type"=>"_doc", "_id"=>"jafSRnMBB49SfiIit8f0", "status"=>400, "error"=>{"type"=>"mapper_parsing_exception", "reason"=>"failed to parse field [node.ipaddr] of type [ip] in document with id 'jafSRnMBB49SfiIit8f0'. Preview of field's value: '%{host}'", "caused_by"=>{"type"=>"illegal_argument_exception", "reason"=>"'%{host}' is not an IP string literal."}}}}}
[2020-07-13T06:18:01,854][WARN ][logstash.outputs.elasticsearch][elastiflow][output_elasticsearch_single] Could not index event to Elasticsearch. {:status=>400, :action=>["index", {:_id=>nil, :_index=>"elastiflow-3.5.3-2020.07.13", :routing=>nil, :_type=>"_doc"}, #<LogStash::Event:0x4253a2f1>], :response=>{"index"=>{"_index"=>"elastiflow-3.5.3-2020.07.13", "_type"=>"_doc", "_id"=>"jqfSRnMBB49SfiIit8f0", "status"=>400, "error"=>{"type"=>"mapper_parsing_exception", "reason"=>"failed to parse field [node.ipaddr] of type [ip] in document with id 'jqfSRnMBB49SfiIit8f0'. Preview of field's value: '%{host}'", "caused_by"=>{"type"=>"illegal_argument_exception", "reason"=>"'%{host}' is not an IP string literal."}}}}}

But I also have another question, the index I want to output to csv file would insert new document continuously, so I tried to use schedule to keep reading index, but same data would be recorded repeatedly in csv file, is it possible to read only the updated part?

I just did a small test that worked fine for me with my monthly indices. What happens when you try to use date math for your daily indices (<yourindex-{now/d}>)?

input {
  elasticsearch {
    schedule => "* * * * * Europe/Berlin"
    hosts => "localhost"
    index => "<myindex-{now/M{yyyy.MM}}>"
  }
}
output {
  stdout{}
}

Regarding your question about only loading new data: The ES input does not have something like a tracking column like the JDBC input has. So your best bet would be to use a date range to filter your query.

2 Likes

Hi @Jenni,
thanks for your answering, I didn't try this format, only tried percent encoding of date math characters format.
I think I will try another way to write csv file to loading new data, thanks for your helping, it's really helpful!

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.