How to deal with inconsistent dates (day or month or year may be missing)?

Hi,

First of all, I'm a beginner with the ELK stack, so please don't hurt me if I did stupid things :laughing:

I am importing CSV files that have the following structure:

"nomprenom";"sexe";"datenaiss";"lieunaiss";"commnaiss";"paysnaiss";"datedeces";"lieudeces";"actedeces"
"ABILY*FRANCOIS SIMON MARIE/";"1";"19600216";"29105";"LANDIVISIAU";"";"20200718";"01004";"143"
"HUC*JEANNINE GABRIELLE ANNA/";"2";"19270209";"81125";"LACAZE";"";"20200716";"01004";"142"

The columns datenaiss and datedeces contain birth dates and death dates respectively.

I wanted to import these pieces of information as dates in Elasticsearch so that I can calculate some statistics, such as the mean lifetime, or to be able to select records with conditions like "datedeces between 15 january 2014 and 25 september 2019".

However the dates are sometimes not fully known. If the year isn't known, it will be 0000. If the month or the day isn't known, it will be 00. So I can have dates like 19780500 or 20190025 or 00000930. So I can't import them as dates, as logstash won't be able to convert those strings to dates.

I decided to split each date into three fields: annee, mois and jour (year, month, and day), each one being a number. But is there a way to handle in Kibana the combination of these three fields as a date ?

Here is my Logstash config:

input {
    s3 {
        access_key_id => "myaccesskeyid"
        secret_access_key => "mysecretaccesskey"
        bucket => "mybucket"
        endpoint => "https://s3.nl-ams.scw.cloud"
        region => "nl-ams"
        watch_for_new_files => "false"
    }
}

filter {
    csv {
        separator => ";"
        skip_header => "true"
        columns => ["nomprenom", "sexe", "datenaiss", "lieunaiss", "commnaiss", "paysnaiss", "datedeces", "lieudeces", "actedeces"]
    }

    mutate {
        convert => { "sexe" => "integer" }
        split => { "nomprenom" => "*" }

        add_field => {
            "nom" => "%{[nomprenom][0]}"
            "prenoms" => "%{[nomprenom][1]}"
        }
    }

    mutate {
        gsub => ["prenoms", "/", ""]
    }

    ruby {
        code => 'event.set("[naiss][date][annee]", event.get("datenaiss")[0..3]);
                 event.set("[naiss][date][mois]", event.get("datenaiss")[4..5]);
                 event.set("[naiss][date][jour]", event.get("datenaiss")[6..7]);
                 event.set("[deces][date][annee]", event.get("datedeces")[0..3]);
                 event.set("[deces][date][mois]", event.get("datedeces")[4..5]);
                 event.set("[deces][date][jour]", event.get("datedeces")[6..7]);'
    }

   mutate {
       rename => {
           "lieunaiss" => "[naiss][codelieu]"
           "commnaiss" => "[naiss][commune]"
           "paysnaiss" => "[naiss][pays]"
           "lieudeces" => "[deces][codelieu]"
           "actedeces" => "[deces][numacte]"
       }
       remove_field => ["nomprenom", "message", "host", "geoip", "datenaiss", "datedeces"]
   }

   if [naiss][pays] == "" {
       mutate {
           replace => ["[naiss][pays]", "FRANCE"]
       }
   }

   mutate {
          convert => {
                 "[naiss][date][annee]" => "integer"
                 "[naiss][date][mois]" => "integer"
                 "[naiss][date][jour]" => "integer"
                 "[deces][date][annee]" => "integer"
                 "[deces][date][mois]" => "integer"
                 "[deces][date][jour]" => "integer"
          }
   }
}

output {
       elasticsearch {
              hosts => ["http://localhost:9200"]
              index => "logstash-insee-deces"
              workers => 1
              user => "myuser"
              password => "mypassword"
       }
}

The resulting data structure is:

{
  "_index": "logstash-insee-deces",
  "_type": "_doc",
  "_id": "LOW53HQBVzcA8wC1_M15",
  "_version": 1,
  "_score": null,
  "_source": {
    "prenoms": "ALFRED VICTOR",
    "naiss": {
      "date": {
        "mois": 1,
        "jour": 5,
        "annee": 1899
      },
      "pays": "FRANCE",
      "codelieu": "02691",
      "commune": "SAINT-QUENTIN"
    },
    "sexe": 1,
    "@timestamp": "2020-09-30T01:56:50.110Z",
    "nom": "RAYEZ",
    "deces": {
      "date": {
        "mois": 4,
        "jour": 5,
        "annee": 1972
      },
      "codelieu": "93007",
      "numacte": "103"
    },
    "@version": "1"
  },
  "fields": {
    "@timestamp": [
      "2020-09-30T01:56:50.110Z"
    ]
  },
  "sort": [
    1601431010110
  ]
}

codelieu fields and numacte field are strings because they can also contain letters.

So... my three big questions are:

  1. Is there a way to create a date from three fields in Kibana, knowing that sometimes the day or the month or the year may be missing ?
  2. How would you deal with those inconsistent dates ?
  3. Is my logstash configuration optimizable? (aka: is there a simpler and/or faster way of doing the things I'm doing?)

Thank you very much :smile:

William Blondel

Welcome to our community! :smiley:

You can't get around this problem unfortunately, because your timestamps are incorrect no matter how you divide the values.

1 Like

Thank you!

Okay, I understand.

I just thought of something.

Is there a way to do that in logstash:

  • try to convert datenaiss from the csv to a date
  • if it succeeds, put the result in a date field. if it fails, don't put any date field
  • do the same for datedeces

I realise that with this solution, all the records where a piece of the birth date or the death date is unknown will be missing from my statistics. But I just checked: out of 25,354,395 records, only 93,234 records have a missing piece of date. So it is not a big deal.

Use a date filter, if it fails to parse the date (because it has month 00, for example) then it will not modify the target field.

I found a way to do what I want!

My logstash config file is now very heavy, but at least I can do all kind of statistics on Kibana now.

This is what I did (example below is for the birth date, but I did the same process for the death date)

    # Birth date
    # ==========

    # 1. Try to parse using format yyyyMMdd
    date {
        match => [ "datenaiss", "yyyyMMdd" ]
        timezone => "UTC"
        target => "[naiss][date][complete]"
        tag_on_failure => ["_datenaiss_dateparsefailure_01"]
    }

    # 2. If it fails, try to parse using format yyyyMM
    if "_datenaiss_dateparsefailure_01" in [tags] {
        mutate {
            add_field => { "naiss_annee_mois" => "%{[naiss][date][annee]}%{[naiss][date][mois]}" }
        }

        date {
            match => [ "naiss_annee_mois", "yyyyMM" ]
            timezone => "UTC"
            target => "[naiss][date][complete]"
            tag_on_failure => ["_datenaiss_dateparsefailure_02"]
        }

        # 3. If it fails, try to parse using format yyyy
        if "_datenaiss_dateparsefailure_02" in [tags] {
            date {
                match => [ "[naiss][date][annee]", "yyyy" ]
                timezone => "UTC"
                target => "[naiss][date][complete]"
                tag_on_failure => ["_datenaiss_dateparsefailure_03"]
            }
        }
    }

(I then delete the temporay naiss_annee_mois field as it's not needed anymore.)

This allows me to have a populated birthdate date field and a deathdate date field for every record, as long as at least the year is known (99.99% of my records).

However, in this config, when a date with an unknown day is indexed, the day defaults to 01 ("197906", "yyyyMM" gets saved as "1979-06-01"). Same thing when I index just a year ("1979", "yyyy" gets saved as "1979-01-01"). But that is not a problem, because I saved the original pieces of the dates in seperate fields, so in Kibana/ES I can filter records where the day or the month isn't known. And that is perfect !

Only one small inconvenient: if the day is known but the month is unknown (eg. 19780030), the indexed date will be 19780101, so I'll "lose" the information about the day on date histograms, but that's not a big deal. I still have the original values anyway.

My full Logstash configuration now:

input {
   s3 {
       access_key_id => "myaccesskeyid"
       secret_access_key => "mysecretaccesskey"
       bucket => "mybucket"
       endpoint => "https://s3.nl-ams.scw.cloud"
       region => "nl-ams"
       watch_for_new_files => "false"
   }
}

filter {
   csv {
       separator => ";"
       skip_header => "true"
       columns => ["nomprenom", "sexe", "datenaiss", "lieunaiss", "commnaiss", "paysnaiss", "datedeces", "lieudeces", "actedeces"]
   }

   mutate {
       convert => { "sexe" => "integer" }
       split => { "nomprenom" => "*" }

       add_field => {
           "nom" => "%{[nomprenom][0]}"
           "prenoms" => "%{[nomprenom][1]}"
       }
   }

   mutate {
       gsub => ["prenoms", "/", ""]
   }

   # === BIRTH DATE AND DEATH DATE PARSING ===

   # Store day, month, and year in a separate field so that no info is lost.
   # Day, month, or year can be 0 in the data source.
   ruby {
       code => 'event.set("[naiss][date][annee]", event.get("datenaiss")[0..3]);
                event.set("[naiss][date][mois]", event.get("datenaiss")[4..5]);
                event.set("[naiss][date][jour]", event.get("datenaiss")[6..7]);
                event.set("[deces][date][annee]", event.get("datedeces")[0..3]);
                event.set("[deces][date][mois]", event.get("datedeces")[4..5]);
                event.set("[deces][date][jour]", event.get("datedeces")[6..7]);'
   }

   # Birth date
   # ==========

   # 1. Try to parse using format yyyyMMdd
   date {
       match => [ "datenaiss", "yyyyMMdd" ]
       timezone => "UTC"
       target => "[naiss][date][complete]"
       tag_on_failure => ["_datenaiss_dateparsefailure_01"]
   }

   # 2. If it fails, try to parse using format yyyyMM
   if "_datenaiss_dateparsefailure_01" in [tags] {
       mutate {
           add_field => { "naiss_annee_mois" => "%{[naiss][date][annee]}%{[naiss][date][mois]}" }
       }

       date {
           match => [ "naiss_annee_mois", "yyyyMM" ]
           timezone => "UTC"
           target => "[naiss][date][complete]"
           tag_on_failure => ["_datenaiss_dateparsefailure_02"]
       }

       # 3. If it fails, try to parse using format yyyy
       if "_datenaiss_dateparsefailure_02" in [tags] {
           date {
               match => [ "[naiss][date][annee]", "yyyy" ]
               timezone => "UTC"
               target => "[naiss][date][complete]"
               tag_on_failure => ["_datenaiss_dateparsefailure_03"]
           }
       }
   }

   # Death date, same process
   # ========================

   # 1. Try to parse using format yyyyMMdd
   date {
       match => [ "datedeces", "yyyyMMdd" ]
       timezone => "UTC"
       target => "[deces][date][complete]"
       tag_on_failure => ["_datedeces_dateparsefailure_01"]
   }

   # 2. If it fails, try to parse using format yyyyMM
   if "_datedeces_dateparsefailure_01" in [tags] {
       mutate {
           add_field => { "deces_annee_mois" => "%{[deces][date][annee]}%{[deces][date][mois]}" }
       }

       date {
           match => [ "deces_annee_mois", "yyyyMM" ]
           timezone => "UTC"
           target => "[deces][date][complete]"
           tag_on_failure => ["_datedeces_dateparsefailure_02"]
       }

       # 3. If it fails, try to parse using format yyyy
       if "_datedeces_dateparsefailure_02" in [tags] {
           date {
               match => [ "[deces][date][annee]", "yyyy" ]
               timezone => "UTC"
               target => "[deces][date][complete]"
               tag_on_failure => ["_datedeces_dateparsefailure_03"]
           }
       }
   }

   # Rename other fields
   mutate {
       rename => {
           "lieunaiss" => "[naiss][codelieu]"
           "commnaiss" => "[naiss][commune]"
           "paysnaiss" => "[naiss][pays]"
           "lieudeces" => "[deces][codelieu]"
           "actedeces" => "[deces][numacte]"
       }
       remove_field => ["nomprenom", "message", "host", "geoip", "datenaiss", "datedeces", "deces_annee_mois", "naiss_annee_mois"]
   }

   # Set default value for birth country
   if [naiss][pays] == "" {
       mutate {
           replace => ["[naiss][pays]", "FRANCE"]
       }
   }

   # Convert date pieces to integer
   mutate {
       convert => {
           "[naiss][date][annee]" => "integer"
           "[naiss][date][mois]" => "integer"
           "[naiss][date][jour]" => "integer"
           "[deces][date][annee]" => "integer"
           "[deces][date][mois]" => "integer"
           "[deces][date][jour]" => "integer"
       }
   }
}

output {
   elasticsearch {
       hosts => ["http://localhost:9200"]
       index => "logstash-insee-deces"
       workers => 1
       user => "myuser"
       password => "mypassword"
   }
}

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.