CSV files have different columns

Hi everyone,

Issue:
I have two kinds CSV file, one kind (default) has two columns, and the other one (custom) has about seventeen columns.

Default CSV:

rosbagTimestamp data
1.56E+18 53829632
1.56E+18 53829632
1.56E+18 53829632
1.56E+18 53829632
1.56E+18 53829632
1.56E+18 53829632
1.56E+18 53829632
1.56E+18 53829632
1.56E+18 53829632
1.56E+18 53829632
1.56E+18 53829632

Custom CSV:

rosbagTimestamp header seq stamp secs nsecs frame_id status goal_id stamp secs nsecs id status text feedback message percent_compressed percent_uploaded duration_time
1.56E+18 1 1.56E+09 9.93E+08 '' 1.56E+09 8.51E+08 "/uploader-1-1557443728.850739955" 1 "This goal has been accepted by the simple action server" "LabelAggregationStrategy complete" 0 4 0
1.56E+18 2 1.56E+09 13680934 '' 1.56E+09 8.51E+08 "/uploader-1-1557443728.850739955" 1 "This goal has been accepted by the simple action server" "LabelConversionStrategy complete" 0 8 0
1.56E+18 3 1.56E+09 23626089 '' 1.56E+09 8.51E+08 "/uploader-1-1557443728.850739955" 1 "This goal has been accepted by the simple action server" "AuxAggregationStrategy complete" 0 12 0
1.56E+18 4 1.56E+09 4.95E+08 '' 1.56E+09 8.51E+08 "/uploader-1-1557443728.850739955" 1 "This goal has been accepted by the simple action server" "WaveAggregationStrategy complete" 0 16 0
1.56E+18 5 1.56E+09 9.54E+08 '' 1.56E+09 8.51E+08 "/uploader-1-1557443728.850739955" 1 "This goal has been accepted by the simple action server" "LabelSplitStrategy complete" 0 20 0
1.56E+18 6 1.56E+09 3.91E+08 '' 1.56E+09 8.51E+08 "/uploader-1-1557443728.850739955" 1 "This goal has been accepted by the simple action server" "AuxConversionStrategy complete" 0 25 0
1.56E+18 7 1.56E+09 5.23E+08 '' 1.56E+09 8.51E+08 "/uploader-1-1557443728.850739955" 1 "This goal has been accepted by the simple action server" "LogAggregationStrategy complete" 0 29 4

Objective:
I am trying to filter the data, but my logstash configuration works for only one kind of csv at a time. The two kinds of files are mixed in a directory, so I cannot seprate them manually.

Questions:

  1. Is there any way I can count the number of columns of these csv files using Logstash and apply a different configuration for each?
  2. What is the best and easiest way of handling multiple coniguration formats in a single configuration file?

I appreciate any suggestions/feedback :slightly_smiling_face:

Answered here.

1 Like

Thank you so much @Badger!

one more question, if you had two kinds of CSV files that lets say, they have +15 columns, what is the best way to apply different filtering patterns to these two CSV kinds? Is counting the number of columns still the best option or can we do better?

You could use the add_field option on the csv filters to add a document_type field. Then make the filtering conditional on that, either

if [docment_type] == "oneThing" {
    #Filters for oneThing
}

Or possibly using pipeline-to-pipeline communication with a distributor pattern. If the processing is only slightly different I would lean towards conditionals. If it is significantly different I would lean towards pipelines.

1 Like

@Badger But both documents are type CSV, I don't understand why I need to check the document_type?

If you have two kinds of CSV then you need a field which tells which kind an event is from.

@Badger, so basically the idea is to add a dummy/fake field containing a value that can help me identify the CSV files uniquely?

Yes. I just realized I might have misunderstood the second question. Do you have two types of CSVs that both have 15 columns, or do you have two types of CSVs that have different number of columns.

If it is the former you are going to have to find a regular expression that allows you recognize one of them (and anything that does not match is the other).

1 Like

One CSV file has around 17 columns, the other has around 100 columns which some of the column values in the csv are of the form csv key, value pairs. Here is an example of it:

rosbagTimestamp header seq stamp secs nsecs frame_id status - level name message hardware_id values - key value - key value - key value - key value - key value - key value - key value - key value - key value - key value - key value - key value
1.55744E+18 1 1557443693 896265029 '' 0 "Memory Usage (28a3b0c)" "OK" "28a3b0c" "Update Status" "OK" "Time Since Update" "2.00130319595" "Memory Status" "OK" "Total Memory (Physical)" "7855M" "Used Memory (Physical)" "645M" "Free Memory (Physical)" "6309M" "Total Memory (Swap)" "0M" "Used Memory (Swap)" "0M" "Free Memory (Swap)" "0M" "Total Memory" "7855M" "Used Memory" "645M" "Free Memory" "6309M"
1.55744E+18 1 1557443694 274869918 '' 2 "NTP offset from 28a3b0c to ntp.ubuntu.com" "Error Running ntpdate. Returned 127" "28a3b0c" "Offset (us)" "N/A" "Offset tolerance (us)" "500.0" "Offset tolerance (us) for Error" "5000000.0" "Output" '' "Errors" "-q
1.55744E+18 1 1557443694 529937028 '' 0 "CPU Temperature (28a3b0c)" "OK" "28a3b0c" "Update Status" "OK" "Time Since Update" "3.0723490715" "Core 0 Temperature" "43.0DegC" "Core 1 Temperature" "43.5DegC" "Core 2 Temperature" "41.5DegC" "Core 3 Temperature" "40.0DegC" "Core 4 Temperature" "40.25DegC" "Core 5 Temperature" "42.4DegC" 2 "CPU Usage (28a3b0c)" "Incorrect number of CPU cores, Clock speed error" "28a3b0c" "Update Status" "OK" "Time Since Update" "2.0056579113" "Clock speed error" '' "Output" '' "Core 0 Status" "OK" "Core 0 User" "0.00%" "Core 0 Nice" "24.24%" "Core 0 System" "0.00%" "Core 0 Idle" "20.20%" "Core 1 Status" "OK" "Core 1 User" "0.00%" "Core 1 Nice" "0.00%" "Core 1 System" "0.00%" "Core 1 Idle" "0.00%" "Core 2 Status" "OK" "Core 2 User" "0.00%" "Core 2 Nice" "0.00%" "Core 2 System" "0.00%" "Core 2 Idle" "0.00%" "Core 3 Status" "OK" "Core 3 User" "0.00%" "Core 3 Nice" "42.00%" "Core 3 System" "0.00%" "Core 3 Idle" "17.00%" "Core 4 Status" "OK" "Core 4 User" "0.00%" "Core 4 Nice" "9.28%" "Core 4 System" "0.00%" "Core 4 Idle" "22.68%" "Core 5 Status" "OK" "Core 5 User" "0.00%" "Core 5 Nice" "21.21%" "Core 5 System" "0.00%" "Core 5 Idle" "40.40%" "Load Average Status" "OK" "Load Average (1min)" "33.0%" "Load Average (5min)" "50.75%" "Load Average (15min)" "55.25%"

OK, so count the number of columns using the method I linked to, then save that in a field on the event (possibly a field within the [@metadata] object), then do things conditionally based on that field.

Sure I will look into that, and let you know what happens. If I find the solution, I'll post it.

@Badger, question about your code. Could you please explain what is the purpose of having "[@metadata][fields]" inside event.set(...) in your code down below? The objective is to count the number of columns, so I don't understand why we need to take @metadata into account, I thought jsut having event.get("message").count(",") + 1 is enough. I appreciate if you can explain your logic.

ruby { code => 'event.set("[@metadata][fields]", 1 + event.get("message").count(","))' }
if [@metadata][fields] == 73 {
     csv { ... }
} else {
     csv { ... }
}

You need to store the count somewhere so that you can test it in the conditional.

1 Like

Oh I see, can I store in a variable like result or am I restricted to [@metadata][fields]?

You can call it anything you want.

Yes, it worked!

ruby {
            code => 'event.set("[column_count]", 1 + event.get("[message]").count(","))
}