Parsing and combining different CSV files in logstash

Hi, I am looking for some pointers on how to best tackle this problem.

I have two csv files, roughly with 20 columns each. One of the CSV files contains a field (hostname) that is part of a field of the second csv file (list_of_hostnames).

I want to combine both csv files in logstash so that whenever the hostname field partially matches "list_of_hostnames" of the second csv file, it gets combined in elasticsearch into one document (denormalized).

What is the best way to do this? Both CSV files should be read once, no data is appended to it later one

If I am completely misusing the ELK stack here for something it cannot do, can you provide me pointers on how to best tackle this. I like the analytics in Kibana, so ultimately I want to get the data over there

You might be able to load one of the files using the translate filter plugin (may require the file to be transformed) and perform a lookup against this (exact or based on regular expressions) for each record in the other file (read through file input or stdin).

I was under the impression that the translate plugin could only be used for a two column csv (key => value). How can this be used to join entire rows based on the match of one column?

It is indeed used for key -> value lookups, but the value can be a string (csv or JSON) that can be converted into multiple values through a separate filter once it has been retrieved. This is why I mentioned that the file format may need to be transformed.

Ok, thanks, I didn't know that. How can I make sure that this translation file is read in first, before the actual log?

The translate file is read when the filter is initialised, so you need to make sure it is available before you start Logstash.

Thanks, I will try this out

Hi, apparently that doesnt work in my case. In my case the dictionary keys are often a concatenation of multiple ip addresses. So it will never match the field of my original input data (key is larger then the datafield that needs to be translated). How can I handle that?

Can you provide some examples and describe the logic? Could you perhaps denormalise the lookup file used by the translate plugin?

If the logic is complex, it may be easier to do this through a custom script outside Logstash.

general logfile has a field "ip"

I want to enrich these logs with vulnerability information that is available. Unfortunately that vulnerability information in the database is horribly structured. It contains a field "vulnerable IP's", but this is often just a comma separated list of the ip's that are vulnerable for a certain vulnerability.

Unfortunately I do not have the data that is in the vulnerability database under control. This gets added by different people

I had a similar requirement recently and what worked for me was indexing one of the datasets 'as is' in ES, and enrich the other one using the elasticsearch filter in Logstash. This filter allowed me to make a lookup in the other dataset using queries (with field substitution even!), and copy all the fields needed to enrich my events, that matched my queries. I was matching Japanese prefectures, municipalities and address in both Japanese/Chinese characters and it all worked flawlessly.

Below a piece of my configuration and the query used to match my stuff:

elasticsearch {
      hosts           => "elasticserver:9200"
      index           => "logstash-zipcodes"
      query_template  => "../elastic_queries/es_query.json"
      fields => [ ["cod_municipality","cod_municipality"],
                       ["cod_area","cod_area"],
                       ["address_complement","address_complement"],
                       ["geoip","geoip"],
                       ["latitude","latitude"],
                       ["longitude","longitude"] ]
}

Above, the fields pair means the field name you are copying from the other index, and the destination field on your new event, the last one doesn't need to match the first.

es_query.json

{
  "query": {
    "bool": {
      "must": [
        { "term" :  { "prefecture.keyword"   : "%{prefecture_jp}" }},
        { "term" :  { "municipality.keyword" : "%{municipality_jp}" }}
      ],
      "should": [
        { "wildcard" : { "address_complement.keyword": "%{address_jp}%{wildcard}" }}
      ]

    }
  },
  "from": 0,
  "size": 1,
  "_source": [
    "cod_municipality",
    "cod_area",
    "address_complement",
    "geoip",
    "latitude",
    "longitude"
  ]
}

That %{wildcard} field contains only a '*' because I couldn't make it work adding this directly to the query.

The query brings back only the first document matched against the fields prefecture and municipality, and if there is anything that looks like the address_complement it will match that document. If not, we stay with the documents matched by the first 2 terms.

You need add to the _source, the fields you want returned from the dataset being looked up.

Documentation:
https://www.elastic.co/guide/en/logstash/current/plugins-filters-elasticsearch.html

1 Like

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.