Dynamic Data Type

Hi all,

I have a data format as shown in the attached image -


I'm able to import the data via the CSV input plugin which works great. However, I'm stuck on the next step which is to map the rows to numbers so that the data can be analyzed in kibana. The issue is that the values should generally be numbers as in the second two columns, however, if there is an error with the data point at some point in time, an error code will be generated as shown in the last two columns. Is there some way to map the columns to number datatype while also handling the occasional case where the value will be a string?

For reference, below is my current logstash config which needs to be expanded upon

input {
file {
path => "C:/Users/zach/Downloads/pdr*.csv"
start_position => "beginning"
sincedb_path => "NUL"

filter {
csv {
separator => ","
autodetect_column_names => true
autogenerate_column_names => true

output {
stdout { codec => rubydebug }

elasticsearch {
hosts => ["localhost:9200"]
index => "pdr-data"

In elasticsearch, if you have a template, then if a field is expected to be an integer I think (I have not tested) that you would get a mapping exception if you try to ingest a document where it is a string that cannot be parsed as a number.

If you do not have a template then you run the risk that the first document indexed contains a string in that field and the field type gets set to text.

You could record the fact that the field contained an error in another field, and then remove it. Something like

    ruby {
        code => '
            errors = []
            event.to_hash.each { |k, v|
                if k =~ /column[0-9]+/
                    unless v.to_f.to_s == v.to_s
                        errors << k
            if errors != []
                event.set("errorFields", errors)

Thanks @Badger! I used a slightly different method but the solution was spot on. For reference,

 ruby {
    code => "
        event.to_hash.each { |k, v|
            if !['@version','@timestamp','message','path','Timestamp','host'].include?(k)
                if v.include? 'ERR'
        path = event.get('[path]')
        unitExists = path.include? 'pdr'
        if unitExists
            filename = event.get('[path]').split('/').last
            pdrID = filename[0...-19]

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.