Logstash 7.0.1 - CSV upload with subheaders

Hello, I am trying to upload a csv file using Logstash 7.0.1 that contains subheaders. For example:

"netflow": {
      "flow_seq_num": 55729,
      "flowset_id": 259,
      "flow_end_msec": 1562002327000,
      "version": 9,
      "layer2OctetDeltaCount": 802218000,
      "in_bytes": 792669846,
      "in_pkts": 530453,
      "flow_start_msec": 1562002315000
    }

However, it seems to end up all being stored in a message: block when uploading.

The following are the csv fields for download:

output{
  csv{
    fields => ["[node][hostname]","[node][ipaddr]", "[event][host]", "[event][type]", "@version", 
	"@timestamp","[flow][ip_protocol]", "[flow][packets]", "[flow][src_addr]", "[flow][src_hostname]", 
	"[flow][src_port_name]", "[flow][direction]", "[flow][traffic_locality]", "[flow][src_autonomous_system]", 
	"[flow][dst_port_name]","[flow][input_snmp]", "[flow][ip_version]", "[flow][bytes]", "[flow][dst_addr]", 
	"[flow][sampling_interval]", "[flow][dst_hostname]", "[flow][dst_autonomous_system]", "[flow][input_ifname]",
	"[netflow][flow_seq_num]", "[netflow][flowset_id]","[netflow][flow_end_msec]", "[netflow][version]",
	"[netflow][layer2OctetDeltaCount]","[netflow][in_bytes]","[netflow][in_pkts]","[netflow][flow_start_msec]"]
    path => "Flow_csv.csv"
  }
}

For CSV Upload, I use the same fields, but for the columns:

input {
  file {
    path => "home/data/*.csv"
    start_position => "beginning"
   sincedb_path => "/dev/null"
  }
}
filter {
  csv {
      separator => ","
      columns => ["[node][hostname]","[node][ipaddr]", "[event][host]", "[event][type]", "@version",
        "@timestamp","[flow][ip_protocol]", "[flow][packets]", "[flow][src_addr]", "[flow][src_hostname]",
        "[flow][src_port_name]", "[flow][direction]", "[flow][traffic_locality]", "[flow][src_autonomous_system]",
        "[flow][dst_port_name]","[flow][input_snmp]", "[flow][ip_version]", "[flow][bytes]", "[flow][dst_addr]",
        "[flow][sampling_interval]", "[flow][dst_hostname]", "[flow][dst_autonomous_system]", "[flow][input_ifname]",
        "[netflow][flow_seq_num]", "[netflow][flowset_id]","[netflow][flow_end_msec]", "[netflow][version]",
        "[netflow][layer2OctetDeltaCount]","[netflow][in_bytes]","[netflow][in_pkts]","[netflow][flow_start_msec]"]
  }
}
output {
   elasticsearch {
     hosts => "ipaddress.com:9200"
     index => "elastiflow-2019-07-01"
  }
}

The errors I get:

[2019-07-01T15:31:36,841][WARN ][logstash.filters.csv     ] Error parsing csv {:field=>"message", :source=>"10.1.1.253,10.1.1.253,10.1.1.253,netflow,3.5.0,2019-07-01T19:05:19.000Z,UDP,3,10.1.1.253,10.1.1.253,UDP/50101,unspecified,private,private,netflow (UDP/2055),541,IPv4,400,10.2.1.50,0,10.2.1.50,private,index: 541,149803,257,,9,,400,3,", :exception=>#<TypeError: wrong argument type String (expected LogStash::Timestamp)>}

Is there an issue with the upload formatting?

Thanks,
Eric

The field @timestamp has type Logstash::TimeStamp, but you are trying to set it to a string value. In your list of columns, change "@timestamp" to
"[@metadata][timestamp]" then add a date filter to parse it

date { match => [ "[@metadata][timestamp]", ISO8601 ] }

That worked!

Thank you
Eric

One more question, when I uploaded the editted conf file, is there a way to let ES create the types associated with each field? Or is that something that will manually have to be done in the management of Kibana?

Thanks,
Eric

Yes, you can iterate over the fields in an object and see which can be converted to numeric forms using a ruby filter with code like this.

Great, Thank you!

Eric

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.