Ingesting CSV reports through Filebeat to Logstash

Hello,

I am facing a critical problem on writing a filter config for logstash to parse the incoming csv data (having 50+ headers and 300 MB) .
I tried many ways, but, I was unable to index the data on Elasticsearch. I could see that LS is able to receive the events but, there are some parsing errors,

Please suggest me a config file that I can use to capture all my csv data in Kibana.

This is something how I have written my config file (need to add more headers):

filter {

csv {
columns => ["Asset Alternate IPv4 Addresses", "Asset Alternate IPv6 Addresses", "Asset Criticality","Asset ID", "Asset IP Address" ]
separator => ","
convert => {
"Asset ID" => "integer"
}
}
mutate {
add_field => { "received_at" => "%{@timestamp}" }
}
}

Thanks,
Joshua.

What parsing errors? What does the input data look like?

1 Like

the input file something like this.

input {
beats {
port => 5044
type => "rapid7csv"
}
}

and the errors are something like this:

Caused by: org.logstash.beats.BeatsParser$InvalidFrameProtocolException: Invalid Frame Type, received: 82
at org.logstash.beats.BeatsParser.decode(BeatsParser.java:92) ~[logstash-input-beats-5.0.14.jar:?]
at io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:489) ~[netty-all-4.1.18.Final.jar:4.1.18.Final]
at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:428) ~[netty-all-4.1.18.Final.jar:4.1.18.Final]
... 8 more
[WARN ] 2018-06-27 16:09:17.908 [nioEventLoopGroup-3-2] DefaultChannelPipeline - An exceptionCaught() event was fired, and it reached at the tail of the pipeline. It usually means the last handler in the pipeline did not handle the exception.
io.netty.handler.codec.DecoderException: org.logstash.beats.BeatsParser$InvalidFrameProtocolException: Invalid Frame Type, received: 86

What does the Filebeat configuration look like? Please show all non-comment lines in your filebeat.yml formatted as preformatted text using markdown notation or the </> toolbar button.

Hmm. That looks correct. Then I'm not sure what's up.

@magnusbaeck,

this is how my whole conf file looks like,

input {
  file {
    path => "/etc/elasticsearch/data.csv"
    start_position => "beginning"
    sincedb_path => "/dev/null"
  }
}
filter {
  csv {
     columns => [ "Asset Alternate IPv4 Addresses", "Asset Alternate IPv6 Addresses", "Asset Criticality","Asset ID",
"Asset IP Address" ]  
   separator => ","
   convert => {
        "Asset ID" => "integer"
     }
   }
   if [message] =~ "" {   #DELETE HEADER
      drop { }
   }
   mutate {
     gsub => ["message","\"","'"]
   }
   mutate {
     add_field => { "received_at" => "%{@timestamp}" }
   }
   mutate {
     remove_field => [ "Vulnerability Proof" ]
   }
}
output {
   elasticsearch {
     action => "index"
     hosts => "10.xx.x.xx:9200"
     user => elastic
     password => Elasticadmin
     sniffing => true
     manage_template => false
     index => "rapid7"
   }
   stdout { codec => dots }
}

and the errors are as follows:

[WARN ] 2018-06-27 17:31:12.326 [Ruby-0-Thread-6@[main]>worker0: :1] csv - Error parsing csv {:field=>"message", :source=>"", :exception=>#<NoMethodError: undefined method `each_index' for nil:NilClass>}
[WARN ] 2018-06-27 17:31:12.327 [Ruby-0-Thread-7@[main]>worker1: :1] csv - Error parsing csv {:field=>"message", :source=>"", :exception=>#<NoMethodError: undefined method `each_index' for nil:NilClass>}
[WARN ] 2018-06-27 17:31:12.330 [Ruby-0-Thread-7@[main]>worker1: :1] csv - Error parsing csv {:field=>"message", :source=>"Before issuing a certificate, a Certification Authority (CA) must check the identity of the entity requesting the certificate, as specified in the CA's Certification Practice Statement (CPS). Thus, standard certificate validation procedures require the subject CN field of a certificate to match the actual name of the entity presenting the certificate. For example, in a certificate presented by \"\"https://www.example.com/\"\", the CN should be \"\"www.example.com\"\". ", :exception=>#<CSV::MalformedCSVError: Illegal quoting in line 1.>}
[WARN ] 2018-06-27 17:31:12.330 [Ruby-0-Thread-7@[main]>worker1: :1] csv - Error parsing csv {:field=>"message", :source=>"", :exception=>#<NoMethodError: undefined method `each_index' for nil:NilClass>}
[WARN ] 2018-06-27 17:31:12.329 [Ruby-0-Thread-6@[main]>worker0: :1] csv - Error parsing csv {:field=>"message", :source=>"", :exception=>#<NoMethodError: undefined method `each_index' for nil:NilClass>}

I hope, this would be more specific for you to answer.

Thanks,
Joshua.

It looks like you're trying to parse empty an empty line. Move up the drop filter so it comes before the csv filter.

Hello @magnusbaeck,

this is my current config file, I need some feedback on how my config file should be inorder to replicate the same data in Elasticsearch from csv file.

input {
    beats {
    port => 5044
    #type => "datacsv"
    }
}
filter
{
    csv {
        columns => ["Asset Alternate IPv4 Addresses", "Asset Alternate IPv6 Addresses", "Asset Criticality","Asset ID", "Asset IP Address", "Asset Location", "Asset MAC Addresses", "Asset Names", "Asset OS Family", "Asset OS Name", "Asset OS Version", "Asset Owner", "Asset Risk Score", "Custom Tag", "Exploit Count", "Exploit Minimum Skill", "Exploit URLs", "Malware Kit Count", "Malware Kit Names", "Scan ID","Scan Template Name", "Service Name", "Service Port", "Service Product", "Service Protocol", "Site Importance", "Site Name", "Vulnerability Additional URLs", "Vulnerability Age", "Vulnerability CVE IDs", "Vulnerability CVE URLs", "Vulnerability CVSS Score","Vulnerability CVSSv3 Score", "Vulnerability CVSSv3 Vector" ]
        separator => ","
        #quote_char => '\'
        #convert => {
        #       "Asset IP Address" => "integer"
        #       "Service Port" => "integer"
        #       "Vulnerability Test Date" => "integer"
        #}

        #autodetect_column_names => true
        skip_empty_columns => true
        skip_empty_rows => true
    }
    #kv { prefix => "arg_" }
    #mutate {
        #gsub => ["message","\"","'"]
        #gsub => ["message","^\\s+|\*@$^&\"\\s+$", ""]
        #gsub => ["message","[\\s+\"!@#$%^&*]", ""]
    #}
    mutate
    {
       add_field => { "received_at" => "%{@timestamp}" }
       #update => { "Asset ID" => "R7_%{[columns][Asset ID]}"}
    }
    #mutate {
    #   remove_field => ["Vulnerability Proof" ]
    #}
}
output {
  elasticsearch {
    action => index
    hosts => ["10.xx.x.xx:9200"]
    user => elastic
    password => Elasticadmin
    sniffing => true
    manage_template => false
    index => r7data
  }
  #stdout { codec => plain }
  stdout {  }

and the errors are something like this

  [WARN ] 2018-06-28 12:41:32.586 [Ruby-0-Thread-6@[main]>worker0: :1] csv - Error parsing csv {:field=>"message", :source=>"

     :exception=>#<CSV::MalformedCSVError: Illegal quoting in line 1.>}

However,

I could see different columns in my kibana, but there is no data related to that column and few columns which actually has different data is showing the similar info.

Please, suggest me something inorder to fix this issue.

Thank you,
Joshua

 :exception=>#<CSV::MalformedCSVError: Illegal quoting in line 1.>}

This indicates that your CSV file is malformed. Depending on in what way it's malformed you may or may not be able to do something about it. If you show us an example line of input that's rejected we'll be able to help out.

Also, find my current filter config,

filter
{
    csv {
                autodetect_column_names => true
                separator => ","
                skip_empty_columns => true
                skip_empty_rows => true
				 trailing_spaces_trim_on_save => true
        }
        mutate {
           add_field => { "received_at" => "%{@timestamp}" }
    }mutate {
        gsub => ["message","[\"!@#$%^&*]", ""]
    }
 }

That's not a CSV file, it's a screenshot from a spreadsheet that has imported the CSV. Just copy/paste an offending line and format the line as preformatted text.

@magnusbaeck ,

Hope this works

Vulnerability Description	Vulnerability ID	Vulnerability PCI Compliance Status	Vulnerability Proof	Vulnerability Published Date	Vulnerability Reference IDs	Vulnerability Reference URLs	Vulnerability Risk Score	Vulnerability Severity Level	Vulnerability Solution	Vulnerability Tags	Vulnerability Test Date	Vulnerability Test Result Code	Vulnerability Test Result Description	Vulnerability Title	Vulnerable Since
Older versions of Apache HTTPD (prior to 2.4.X) are no longer officially supported. There may exist unreported vulnerabilities for these versions. An upgrade to the latest version should be applied to mitigate these unknown risks.	apache-httpd-obsolete	Fail	"* Running HTTP service

 * Product HTTPD exists -- Apache HTTPD 2.2.15

 * Vulnerable version of product HTTPD found -- Apache HTTPD 2.2.15"	2/2/2010			5	10	"Upgrade to the latest version of Apache HTTPD

Download and apply the upgrade from:  http://archive.apache.org/dist/httpd/httpd-2.4.30.tar.gz 


The latest version of Apache HTTPD is 2.4.30.

Many platforms and distributions provide pre-built binary packages for Apache HTTP server. These pre-built packages are usually customized and optimized for a particular distribution, therefore we recommend that you use the packages if they are available for your operating system."	Apache,Apache HTTP Server,Obsolete Software,Web	6/22/2018	vv	Vulnerable Version	Obsolete Version of Apache HTTPD	1/5/2018
Older versions of Apache HTTPD (prior to 2.4.X) are no longer officially supported. There may exist unreported vulnerabilities for these versions. An upgrade to the latest version should be applied to mitigate these unknown risks.	apache-httpd-obsolete	Fail	"* Running HTTP service

 * Product HTTPD exists -- Apache HTTPD 2.2.22

 * Vulnerable version of product HTTPD found -- Apache HTTPD 2.2.22"	2/2/2010			5	10	"Upgrade to the latest version of Apache HTTPD

Download and apply the upgrade from:  http://archive.apache.org/dist/httpd/httpd-2.4.30.tar.gz 


The latest version of Apache HTTPD is 2.4.30.

Many platforms and distributions provide pre-built binary packages for Apache HTTP server. These pre-built packages are usually customized and optimized for a particular distribution, therefore we recommend that you use the packages if they are available for your operating system."	Apache,Apache HTTP Server,Obsolete Software,Web	6/21/2018	vv	Vulnerable Version	Obsolete Version of Apache HTTPD	1/4/2018
Older versions of Apache HTTPD (prior to 2.4.X) are no longer officially supported. There may exist unreported vulnerabilities for these versions. An upgrade to the latest version should be applied to mitigate these unknown risks.	apache-httpd-obsolete	Fail	"* Running HTTP service

 * Product HTTPD exists -- Apache HTTPD 2.2.22

 * Vulnerable version of product HTTPD found -- Apache HTTPD 2.2.22"	2/2/2010			5	10	"Upgrade to the latest version of Apache HTTPD

Download and apply the upgrade from:  http://archive.apache.org/dist/httpd/httpd-2.4.30.tar.gz 


The latest version of Apache HTTPD is 2.4.30.

Many platforms and distributions provide pre-built binary packages for Apache HTTP server. These pre-built packages are usually customized and optimized for a particular distribution, therefore we recommend that you use the packages if they are available for your operating system."	Apache,Apache HTTP Server,Obsolete Software,Web	6/21/2018	vv	Vulnerable Version	Obsolete Version of Apache HTTPD	1/16/2018
Older versions of Apache HTTPD (prior to 2.4.X) are no longer officially supported. There may exist unreported vulnerabilities for these versions. An upgrade to the latest version should be applied to mitigate these unknown risks.	apache-httpd-obsolete	Fail	"* Running HTTP service

 * Product HTTPD exists -- Apache HTTPD 2.2.15

 * Vulnerable version of product HTTPD found -- Apache HTTPD 2.2.15"	2/2/2010			5	10	"Upgrade to the latest version of Apache HTTPD

Download and apply the upgrade from:  http://archive.apache.org/dist/httpd/httpd-2.4.30.tar.gz 


The latest version of Apache HTTPD is 2.4.30.

Many platforms and distributions provide pre-built binary packages for Apache HTTP server. These pre-built packages are usually customized and optimized for a particular distribution, therefore we recommend that you use the packages if they are available for your operating system."	Apache,Apache HTTP Server,Obsolete Software,Web	6/22/2018	vv	Vulnerable Version	Obsolete Version of Apache HTTPD	1/5/2018

Just for your understanding, I have uploaded the screenshot of my csv data content

@magnusbaeck,

As I mentioned earlier, my csv file contains 50+ columns with multilines in few columns as shown in the above screenshot. I am wondering whether there be any issues parsing them and can logstash parse those many columns that are shipped from filebeat. I'm unclear of why I am getting a malformed error.

I know that there is no input type of csv for filebeat. considering that, are there any limitations on input type-log to parse csv files considering my use case ?

Please , help me out to solve this issue.

Thanks,
Joshua.

A multiline CSV where multiple columns contain newline characters? And no quoting of the field values? I don't know how to reliably parse that with Logstash. What piece of software produces a file like that?

As I mentioned earlier, my csv file contains 50+ columns with multilines in few columns as shown in the above screenshot. I am wondering whether there be any issues parsing them and can logstash parse those many columns that are shipped from filebeat. I'm unclear of why I am getting a malformed error.

The number of columns is absolutely no problem. The problem is how they're delimited.

Hello @magnusbaeck,

That data is exported by rapid7, so do you suggest to remove those multiline columns and ship through filebeat or what would be the ideal way to ingest the csv file ?

Thanks,
Joshua.

Skipping the multiline columns would make things a lot easier, or if you could get Rapid7 to export the data in a non-stupid format.

Filebeat is fine but not required.

1 Like

@magnusbaeck,

I have removed those multiline columns and tried shipping through filebeat and I could see the field names as "column1, column2,....etc " instead of fetching the column names that were given in the filter config of logstash and I also tried with "autodetect_column_names => true" which eventually failed to pick the actual header names too.
But, when I tried the same use cases through input "file" logstash config , everything works fine. Now, I doubt filebeat is not the best option to parse CSV or am I making something wrong with the config files!!?

Thanks,
Joshua.

Filebeat doesn't parse CSV at all, but Logstash can parse csv regardless of the source.

Pick Filebeat or Logstash and let's focus on what you choose. Both work fine in this case. Then show your configuration, a sample line of input, and what Logstash produces with a stdout { codec => rubydebug } output.