Denormalize data within a log file

I've got 8000 log files being imported to Logstash via Filebeat. The log files all look something like this:

START-OF-LOG: 3.0
LOCATION: DX
CALLSIGN: SP1EGN
CLUB: SP-CW-C
CONTEST: CQ-WW-CW
CATEGORY-OPERATOR: SINGLE-OP
CATEGORY-ASSISTED: NON-ASSISTED
CATEGORY-BAND: 40M
CATEGORY-MODE: CW
CATEGORY-POWER: LOW
CATEGORY-STATION: FIXED
CATEGORY-TIME: 6-HOURS
CATEGORY-TRANSMITTER: ONE
CATEGORY-OVERLAY: CLASSIC
CLAIMED-SCORE: 42
OPERATORS: SP1EGN
NAME: Robert Nowak
CERTIFICATE: NO
CREATED-BY: N1MM Logger+ 1.0.6903.0
QSO:  7000 CW 2017-11-26 1738 SP1EGN        599 15     EC2DX         599 14
QSO:  7000 CW 2017-11-26 1748 SP1EGN        599 15     RZ7T          599 16
QSO:  7000 CW 2017-11-26 1753 SP1EGN        599 15     US1Q          599 16
QSO:  7000 CW 2017-11-26 1805 SP1EGN        599 15     RX7M          599 16
QSO:  7000 CW 2017-11-26 2055 SP1EGN        599 15     IO2X          599 15
QSO:  7000 CW 2017-11-26 2102 SP1EGN        599 15     UA7K          599 16
END-OF-LOG:

All the rows above the QSO rows are common attributes of all the QSO rows. The number of QSO rows can go up to 15,000 in a single log file. I want to add the attribute rows as key:value pairs to all the QSO documents so they can be used as dimensions in querying and grouping. There is no guarantee that the number and names of the key:value pairs will be the same in all the input files. At this point, I have a pipeline definition that successfully parses each row type. But we want to merge those common attributes into the QSO documents.

I don't know how to do this and whether this should be done in the Logstash pipeline or in the Filebeat config.

Any assist would be appreciated.

I just found the Elasticsearch filter plugin. Since these top rows are already processed in as documents by the time we get to the QSO rows, they can be retrieved through a query filtered on "source". Does that sound like a path to pursue? My only concern is that it sounds expensive.

And I'm not sure how to add the key and value information that comes back from all the multiple documents returned by the query.

Ok, this is starting to work. I have this code for the Elasticsearch filter:

elasticsearch {
   query => "rec_type:meta and source:/var/log/cabrillo/zs9z.log"
   fields => { "key" => "key" }
}

It returns the value of "key" from the last document returned by the query, which is "END-OF-LOG". How do we capture all the keys and values from all the documents returned by the query?

Thanks for your help . . .

Maybe it will help to show what I have written so far:

input {
    beats {
        port => "5044"
    }
}
filter {
   if [message] =~ '^QSO' {
        mutate {
           gsub => [
           # Replace multiple space groups with a single space in QSO line
           "message", "\s\s+", " "
           ]
        }
        grok {
           # Parse a QSO line
           match => { "message" => "%{POSINT:freq} %{NOTSPACE:mode} %{NOTSPACE:date} %{NOTSPACE:time} %{NOTSPACE:txCall} %{NOTSPACE:txRST} %{NOTSPACE:txExch} %{NOTSPACE:rxCall} %{NOTSPACE:rxRST} %{NOTSPACE:rxExch}" }
        }
        mutate {
          add_field => {
            "timestamp" => "%{date} %{time}"
            "rec_type" => "QSO"
          }
        }
        date {
        match => ["timestamp", "YYYY-MM-dd HHmm", "yyyy-MM-dd HHmm"]
        target => "@timestamp"
        }
        elasticsearch {
           query => "rec_type:meta and source:/var/log/cabrillo/zs9z.log"
           fields => { "key" => "key" }
        }
   } else if [message] !~ '^QSO' {
        grok {
           # Parse a metadata line
           match => { "message" => "%{NOTSPACE:key}%{NOTSPACE:delimiter}%{SPACE:space}%{GREEDYDATA:value}" }
        }
        mutate {
           add_field => {
              "rec_type" => "meta"
           }
        }
   }
   mutate {
      convert => {
         "freq" => "integer"
      }
   }
}
output {
    elasticsearch {
        hosts => [ "localhost:9200" ]
    }
     stdout {
        codec => rubydebug {
            metadata => true
          }
     }
}

The Elasticsearch query string includes the fixed string "/var/log/cabrillo/zs9z.log" for the source just for testing.

Going down the ES filter road is problematic because there is no certainty that the perceived ordering will always be true.

What is the "data point" here? Is it a single QSO line plus attributes (CALLSIGN: SP1EGN etc)?
Or is it each file?
Like:

{
  "LOCATION": "DX",
  "CALLSIGN": "SP1EGN",
  "CLUB": "SP-CW-C",
  "CONTEST": "CQ-WW-CW",
  // more attributes
  "QSO LIST": [
    {
      "TIME": "2017-11-26 17:38:00",
      "NUMS1": "599 15",
      "CODE": "EC2DX",
      "NUMS2": "599 14"
    },
    {
      "TIME": "2017-11-26 17:48:00",
      "NUMS1": "599 15",
      "CODE": "RZ7T",
      "NUMS2": "599 16"
    }
    // more QSO lines
  ]
}

I suppose another question would be, of the 8000 files what is the size of the largest?

I have a possible solution in mind that uses the file input with a quirk that makes it possible to read the whole file as a single string. With a series of filters you can produce one ES document with an array QSO data or a series or documents with one QSO line and all the attributes repeated for each.

Its not that hard, but it needs a real step by step approach.

Hi Guy,

Thanks for looking at this with me!

These are logs of the activity of radio stations during an event that lasts 48 hours. The logs are submitted for various kinds of analysis.

Each log starts with a series of metadata fields about the station submitting the log. I have tagged those as meta in my filter.

The term QSO refers to a single radio contact. Each QSO row describes one radio contact. It includes the Frequency, transmission type (Mode), Date, Time, Callsigns of both stations and some information exchanged during the contact.

Many QSOs are reported twice - once by each station. The transmitted and received information in the QSO data from each of the two stations will be reversed in those cases. They may not match exactly, because station operators may make an error in recording the data or clocks might not be exactly in sync. One goal we will have is to match up those double-reported QSOs, so we can count them just once and know how many overall contacts were made in the event.

The largest logs we see will have about 20,000 QSO lines. The metadata is less than 30 lines.

We will want to visualize the data in a number of ways:

  • Counts (and other aggregates) by segment (using the metadata for segmentation)
  • Counts (and other aggregates) by segment (using QSO data for segmentation)
  • Date Histograms
  • Histograms by Frequency
  • We will want to do these for individual stations and for groups matching some filter criteria.

At a future point, we will want to map the Callsigns to countries, and do some geographic analysis.

Of the two output types you describe, I think I know how to write queries for the flat list of full QSOs.

Thanks again.

Jim

This is what my output for a QSO line looks now (including my Elasticsearch filter experiment):

{
          "date" => "2017-11-25",
        "offset" => 726,
     "@metadata" => {
              "type" => "doc",
              "beat" => "filebeat",
           "version" => "6.3.1",
        "ip_address" => "127.0.0.1"
    },
        "rxCall" => "G4EZT",
          "freq" => 28010,
    "prospector" => {
        "type" => "log"
    },
        "source" => "/var/log/cabrillo/zs9z.log",
       "message" => "QSO: 28010 CW 2017-11-25 1249 ZS9Z 599 38 G4EZT 599 14",
        "txCall" => "ZS9Z",
          "tags" => [
        [0] "beats_input_codec_plain_applied"
    ],
          "mode" => "CW",
         "input" => {
        "type" => "log"
    },
        "txExch" => "38",
         "rxRST" => "599",
    "@timestamp" => 2017-11-25T12:49:00.000Z,
      "rec_type" => "QSO",
         "txRST" => "599",
          "host" => {
        "name" => "4b04dc80a457"
    },
      "@version" => "1",
          "beat" => {
        "hostname" => "4b04dc80a457",
            "name" => "4b04dc80a457",
         "version" => "6.3.1"
    },
        "rxExch" => "14",
          "time" => "1249",
           "key" => "END-OF-LOG",
     "timestamp" => "2017-11-25 1249"
}

This is what the meta output looks like:

{
        "offset" => 94,
     "@metadata" => {
              "type" => "doc",
              "beat" => "filebeat",
           "version" => "6.3.1",
        "ip_address" => "127.0.0.1"
    },
    "prospector" => {
        "type" => "log"
    },
        "source" => "/var/log/cabrillo/zs9z.log",
       "message" => "CATEGORY-ASSISTED: NON-ASSISTED",
         "space" => " ",
          "tags" => [
        [0] "beats_input_codec_plain_applied"
    ],
         "input" => {
        "type" => "log"
    },
    "@timestamp" => 2018-07-24T12:52:56.340Z,
      "rec_type" => "meta",
     "delimiter" => ":",
          "host" => {
        "name" => "4b04dc80a457"
    },
      "@version" => "1",
          "beat" => {
        "hostname" => "4b04dc80a457",
            "name" => "4b04dc80a457",
         "version" => "6.3.1"
    },
         "value" => "NON-ASSISTED",
           "key" => "CATEGORY-ASSISTED"
}

How big in Kilobytes?

How do you receive the log files?

Hi Guy,

The largest logs are about 1.2mb.

Right now the process of receiving logs is manual via upload to a server directory or via email. They are only being collected there. There is no automatic processing at this point.

I have all 8000 logs on my local machine and I've been processing them by simply moving them to the directory watched by filebeat.

Jim

Great, we can use the logstash file input in read mode + plus the "read whole file as one string" quirk.

With careful handling the memory consumed by each Logstash event (file) will not be too great.

By the way, let me give a couple of hints about the future . . .

  • Note that this data is optimal for graph analytics, where the Vertices are the stations and the Edges are the QSOs.
  • We would love to redesign the system to be able to receive individual QSOs in real-time. When the station operator presses the Enter button, the QSO gets logged locally to a full log file, and it gets sent to a central server where it is processed immediately into Logstash.

For now, just doing normal analytics on the static data set is the goal. Points above only added so we don't do something now that will make it harder to do the next steps later.

Jim

I got this for starters (using the generator input for testing).
Config:

input {
  generator {
    message => "START-OF-LOG: 3.0
LOCATION: DX
CALLSIGN: SP1EGN
CLUB: SP-CW-C
CONTEST: CQ-WW-CW
CATEGORY-OPERATOR: SINGLE-OP
CATEGORY-ASSISTED: NON-ASSISTED
CATEGORY-BAND: 40M
CATEGORY-MODE: CW
CATEGORY-POWER: LOW
CATEGORY-STATION: FIXED
CATEGORY-TIME: 6-HOURS
CATEGORY-TRANSMITTER: ONE
CATEGORY-OVERLAY: CLASSIC
CLAIMED-SCORE: 42
OPERATORS: SP1EGN
NAME: Robert Nowak
CERTIFICATE: NO
CREATED-BY: N1MM Logger+ 1.0.6903.0
QSO:  7000 CW 2017-11-26 1738 SP1EGN        599 15     EC2DX         599 14
QSO:  7000 CW 2017-11-26 1748 SP1EGN        599 15     RZ7T          599 16
QSO:  7000 CW 2017-11-26 1753 SP1EGN        599 15     US1Q          599 16
QSO:  7000 CW 2017-11-26 1805 SP1EGN        599 15     RX7M          599 16
QSO:  7000 CW 2017-11-26 2055 SP1EGN        599 15     IO2X          599 15
QSO:  7000 CW 2017-11-26 2102 SP1EGN        599 15     UA7K          599 16
END-OF-LOG:"
    count => 1
  }
}

filter {
  kv {
     field_split_pattern => "\n"
     value_split => ":"
     trim_value  => " "
     trim_key => " "
     transform_key => "lowercase"
     exclude_keys => ["start-of-log", "end-of-log"]
     source => "message"
     remove_field => ["message", "sequence"]
  }
  split {
    field => "[qso]"
  }
  mutate {
    rename => {"[qso]" => "[qso1]"}
  }
  dissect {
    mapping => {
            # 7000 CW 2017-11-26 2102 SP1EGN        599 15     UA7K          599 16
      qso1 => '%{[qso][freq]} %{[qso][mode]} %{[qso][date]} %{[qso][time]} %{[qso][txCall]->} %{[qso][txRST]} %{[qso][txExch]->} %{[qso][rxCall]->} %{[qso][rxRST]} %{[qso][rxExch]}'
    }
    remove_field => ["qso1"]
  }
}

output {
  stdout {
    codec => rubydebug {metadata => true}
  }
}

Output: One Event as a single QSO with copied attributes (there were 5 others, the split filter does this, more on this later)

{
                 "contest" => "CQ-WW-CW",
       "category-assisted" => "NON-ASSISTED",
             "certificate" => "NO",
        "category-station" => "FIXED",
          "category-power" => "LOW",
                     "qso" => {
          "date" => "2017-11-26",
          "mode" => "CW",
        "txExch" => "15",
         "rxRST" => "599",
         "txRST" => "599",
        "rxCall" => "UA7K",
          "freq" => "7000",
        "rxExch" => "16",
          "time" => "2102",
        "txCall" => "SP1EGN"
    },
           "category-band" => "40M",
           "category-time" => "6-HOURS",
    "category-transmitter" => "ONE",
              "@timestamp" => 2018-07-24T15:33:48.690Z,
       "category-operator" => "SINGLE-OP",
               "operators" => "SP1EGN",
        "category-overlay" => "CLASSIC",
                    "host" => "Elastics-MacBook-Pro.local",
                    "club" => "SP-CW-C",
                "callsign" => "SP1EGN",
                "@version" => "1",
                    "name" => "Robert Nowak",
                "location" => "DX",
           "claimed-score" => "42",
           "category-mode" => "CW",
              "created-by" => "N1MM Logger+ 1.0.6903.0"
}

This is awesome. One thing that's important is to get a useful timestamp based on the date and time reported for the QSO. Since all our time analytics will be based on that timestamp, I am guessing it is best to replace @timestamp with that value, and move the default value of @timestamp to another field, in case we need to know when the file was processed.

I've just observed that some of the logs have an extra field at the end of the QSO line. Not all logs have it. It is a non-negative integer (can be 0). Here's a sample with that value included.

START-OF-LOG: 3.0
CALLSIGN: K3LR
CONTEST: CQ-WW-CW
CATEGORY-OPERATOR: MULTI-OP
CATEGORY-ASSISTED: ASSISTED
CATEGORY-BAND: ALL
CATEGORY-POWER: HIGH
CATEGORY-MODE: CW
CATEGORY-TRANSMITTER: UNLIMITED
CATEGORY-OVERLAY:
CLAIMED-SCORE: 18166686
NAME: T. J. Duffy
LOCATION: PA
OPERATORS: K3LR K3UA G4TSH W2RQ N2NC N3SD K5GN
OPERATORS: N9RV N6TV N6AN N3GJ VE3RA
CLUB: NORTH COAST 
CERTIFICATE: NO
CREATED-BY: CT Version 10.04.001
QSO:  7017 CW 2017-11-25 0000 K3LR          599 5      VE9ML         599 05     1
QSO:  3500 CW 2017-11-25 0000 K3LR          599 5      NQ8O          599 04     1
QSO: 14008 CW 2017-11-25 0000 K3LR          599 5      NH2DX         599 27     1
QSO:  3500 CW 2017-11-25 0000 K3LR          599 5      DK0TA         599 14     1
QSO:  3500 CW 2017-11-25 0000 K3LR          599 5      TM6M          599 14     1
QSO:  3500 CW 2017-11-25 0000 K3LR          599 5      DJ5AZ         599 14     1
QSO:  7017 CW 2017-11-25 0000 K3LR          599 5      DR4A          599 14     1
QSO:  7049 CW 2017-11-25 0000 K3LR          599 5      P40L          599 09     1
QSO:  7017 CW 2017-11-25 0001 K3LR          599 5      LA2AB         599 14     1
QSO:  3500 CW 2017-11-25 0001 K3LR          599 5      VE9ML         599 05     1
QSO:  3500 CW 2017-11-25 0001 K3LR          599 5      DK3GI         599 14     1
QSO: 14040 CW 2017-11-25 0001 K3LR          599 5      P40W          599 09     1

We add the date filter for that, its regular practice. Needs a slight change to the dissect mapping to concatenate date to time as one field, [qso][datetime]

Are the QSO date times UTC?

As for the extra integer, we just do a mutate gsub with a regex pattern, if the field does not end with the pattern - it is not changed.
Last three filters:

  dissect {
    mapping => {
            # 7000 CW 2017-11-26 2102 SP1EGN        599 15     UA7K          599 16
      qso1 => '%{[qso][freq]} %{[qso][mode]} %{[qso][datetime]} %{+[qso][datetime]} %{[qso][txCall]->} %{[qso][txRST]} %{[qso][txExch]->} %{[qso][rxCall]->} %{[qso][rxRST]} %{[qso][rxExch]}'
    }
    remove_field => ["qso1"]
  }
  mutate {
    gsub => ["[qso][rxExch]", "\s+\d+$", ""]
  }
  date {
    match => [ "[qso][datetime]", "yyyy-MM-dd HHmm"]
    remove_field => ["[qso][datetime]"] # optional
  }

Sample output (rxCall = RZ7T) for these two QSO lines:

QSO:  7000 CW 2017-11-26 1738 SP1EGN        599 15     EC2DX         599 14
QSO:  7000 CW 2017-11-26 1748 SP1EGN        599 15     RZ7T          599 16 1

is

{
                 "contest" => "CQ-WW-CW",
       "category-assisted" => "NON-ASSISTED",
        "category-station" => "FIXED",
             "certificate" => "NO",
          "category-power" => "LOW",
                     "qso" => {
          "mode" => "CW",
        "txExch" => "15",
         "rxRST" => "599",
         "txRST" => "599",
        "rxCall" => "RZ7T",
          "freq" => "7000",
        "rxExch" => "16",
        "txCall" => "SP1EGN"
    },
           "category-band" => "40M",
           "category-time" => "6-HOURS",
    "category-transmitter" => "ONE",
       "category-operator" => "SINGLE-OP",
              "@timestamp" => 2017-11-26T17:48:00.000Z,
               "operators" => "SP1EGN",
        "category-overlay" => "CLASSIC",
                "callsign" => "SP1EGN",
                "@version" => "1",
                    "club" => "SP-CW-C",
                    "name" => "Robert Nowak",
                    "host" => "Elastics-MacBook-Pro.local",
           "claimed-score" => "42",
                "location" => "DX",
           "category-mode" => "CW",
              "created-by" => "N1MM Logger+ 1.0.6903.0"
}

Yes. Times are all delivered in UTC.

Check out this set of QSO records extracted from the logs of two different stations.

Reported in the log from station 8P1W:

QSO: 28018 CW 2017-11-25 1454 8P1W          599     08 K3LR          599      5
QSO: 21003 CW 2017-11-25 1736 8P1W          599     08 K3LR          599      5
QSO: 14021 CW 2017-11-25 1740 8P1W          599     08 K3LR          599      5
QSO:  3520 CW 2017-11-26 0213 8P1W          599     08 K3LR          599      5
QSO:  7030 CW 2017-11-26 1045 8P1W          599     08 K3LR          599      5

Same QSOs reported in the log from station K3LR:

QSO: 28017 CW 2017-11-25 1455 K3LR          599 5      8P1W          599 08     1
QSO: 21003 CW 2017-11-25 1736 K3LR          599 5      8P1W          599 08     1
QSO: 14020 CW 2017-11-25 1741 K3LR          599 5      8P1W          599 08     1
QSO:  3519 CW 2017-11-26 0214 K3LR          599 5      8P1W          599 08     1
QSO:  7030 CW 2017-11-26 1046 K3LR          599 5      8P1W          599 08     1

Note that the frequency and time reported may be slightly different, but these are clearly reports of the same QSO. I am thinking about creating a hash field that combines the two callsigns and frequency in a way that generates the same hash value, independent of which field the callsigns appear in. Counting unique hash codes gives us the total number of QSOs.

Jim

That would work as long as you use other fields to group by "contest". I think we have more than one filter for that.

I'm ready to give this a try on a few files. Can you just give me the input code to read in the whole file as a single string, looking like the generator input? Thanks.

Is there any difference between doing this with file input to logstash versus using filebeat? Is the functionality the same, except the ability of filebeat to push data from different source machines to the centralized beat input?

Sure. A few caveats. I used a preview release of Logstash v 6.3.2 (its being released now, as I type)
In the logstash file config/logstash.yml

config.support_escapes: true
pipeline.batch.size: 1
pipeline.workers: 1

one worker and one event per batch - because the file with 20 000 QSO lines (due to the split) will explode the batch to 20 000. There will be a lot of duplicated data. You might be advised to do some surgery on the very big files - copy and remove the top half of the QSO lines from one and the bottom half from the other.

You will need to install the latest version or the file input
bin/logstash-plugin install logstash-input-file --version 4.1.5

Also we should only read one file at a time too.
File input:

  file {
    path => "/path/to/radio/sample.txt" # replace
    sincedb_path => "/dev/null" # replace with real path to a sincedb file when ready
    delimiter => "§¶¶§" # improbable delimiter, all data is accumulated until EOF
    mode => "read"
    max_open_files => 1
    file_completed_action => "log"
    file_completed_log_path => "/path/to/radio/completed.txt" # replace
  }

Good luck.

I've got it very close to successful. There's just one significant issue. There are lots of cases where the value field is empty in the source kv pairs. kv is pulling in the next line as the value in those cases, and that next line isn't getting parsed. I've been struggling to figure out how to force kv to make a better choice . . .

With this input file, you can see the problem:

START-OF-LOG: 3.0
CALLSIGN: DH1PAL
CONTEST: CQ-WW-CW
CATEGORY-OPERATOR: SINGLE-OP
CATEGORY-ASSISTED: non-assisted
CATEGORY-BAND: 20M
CATEGORY-POWER: HIGH
CATEGORY-MODE: CW
CATEGORY-TRANSMITTER: One
CLAIMED-SCORE: 6
LOCATION: DX
OPERATORS: DH1PAL
CREATED-BY: hand typed 
NAME: WERNER THEIS
SOAPBOX:  
QSO: 14000 CW 2017-11-26 1721 DH1PAL        599 14     RI1ANC        599 29     0
END-OF-LOG:

See the output here. Our first line "soapbox" returns the next line - QSO. This happens when the value in any key:value is empty:

{
                 "soapbox" => "QSO: 14000 CW 2017-11-26 1721 DH1PAL        599 14     RI1ANC        599 29     0",
                 "contest" => "CQ-WW-CW",
               "@metadata" => {
        "path" => "/var/log/cabrillo/dh1pal.log",
        "host" => "4b04dc80a457"
    },
       "category-assisted" => "non-assisted",
          "category-power" => "HIGH",
           "category-band" => "20M",
                    "tags" => [
        [0] "_split_type_failure"
    ],
    "category-transmitter" => "One",
                    "path" => "/var/log/cabrillo/dh1pal.log",
       "category-operator" => "SINGLE-OP",
              "@timestamp" => 2018-07-24T20:06:42.361Z,
               "operators" => "DH1PAL",
                    "host" => "4b04dc80a457",
                    "name" => "WERNER THEIS",
                "callsign" => "DH1PAL",
                "@version" => "1",
           "claimed-score" => "6",
                "location" => "DX",
           "category-mode" => "CW",
              "created-by" => "hand typed"
}

Here is current state:

input {
  file {
    path => "/var/log/cabrillo/*" # replace
    sincedb_path => "/dev/null" # replace with real path to a sincedb file when ready
    delimiter => "§¶¶§" # improbable delimiter, all data is accumulated until EOF
    mode => "read"
    max_open_files => 1
    file_completed_action => "log"
    file_completed_log_path => "/var/log/cabrillo/completed/logs_completed.txt" # replace
  }
}

filter {
  kv {
     field_split_pattern => "\n"
     value_split => ":"
     trim_value  => " "
     trim_key => " "
     transform_key => "lowercase"
     exclude_keys => ["start-of-log", "end-of-log"]
     source => "message"
     remove_field => ["message", "sequence"]
  }
  split {
    field => "[qso]"
  }
  mutate {
    rename => {"[qso]" => "[qso1]"}
  }
  dissect {
    mapping => {
            # 7000 CW 2017-11-26 2102 SP1EGN        599 15     UA7K          599 16
      qso1 => '%{[qso][freq]} %{[qso][mode]} %{[qso][datetime]} %{+[qso][datetime]} %{[qso][txCall]->} %{[qso][txRST]} %{[qso][txExch]->} %{[qso][rxCall]->} %{[qso][rxRST]} %{[qso][rxExch]}'
    }
    remove_field => ["qso1"]
  }
  mutate {
    gsub => ["[qso][rxExch]", "\s+\d+$", ""]
  }
  date {
    match => [ "[qso][datetime]", "yyyy-MM-dd HHmm"]
    remove_field => ["[qso][datetime]"] # optional
  }
}

output {
  stdout {
    codec => rubydebug {metadata => true}
  }
}

Hi Jim,

You will probably get a few more data scrubbing issues. Scrub the data before the kv filter.

Add a mutate gsub before the kv filter that intercepts those empty value "records" and adds a static value, say, "NOT GIVEN" or "N/A".

But this raises an issue that you need to think about.

Should you define a list of keys that you do want to include, a "whitelist"?

This could make the ES docs exactly uniform, the ES mappings can be concrete.

If you do then, in the kv filter, use the include_keys setting with an array of string keys that are the whitelist and remove the exclude_keys setting.

Filter section now is:

filter {
  # scrub data first
  mutate {
    # for additional cleaning, add more to this array in sets of three
    gsub => ["message", ":[ ]*$", ": N/A"] 
  }
  kv {
     field_split_pattern => "$"
     value_split => ":"
     trim_value  => " "
     trim_key => " "
     transform_key => "lowercase"
     exclude_keys => ["start-of-log", "end-of-log"]
     source => "message"
     remove_field => ["message", "sequence"]
  }
  split {
    field => "[qso]"
  }
  mutate {
    rename => {"[qso]" => "[qso1]"}
  }
  dissect {
    mapping => {
            # 7000 CW 2017-11-26 2102 SP1EGN        599 15     UA7K          599 16
      qso1 => '%{[qso][freq]} %{[qso][mode]} %{[qso][datetime]} %{+[qso][datetime]} %{[qso][txCall]->} %{[qso][txRST]} %{[qso][txExch]->} %{[qso][rxCall]->} %{[qso][rxRST]} %{[qso][rxExch]}'
    }
    remove_field => ["qso1"]
  }
  mutate {
    gsub => ["[qso][rxExch]", "\s+\d+$", ""]
  }
  date {
    match => [ "[qso][datetime]", "yyyy-MM-dd HHmm"]
    remove_field => ["[qso][datetime]"] # optional
  }
}

Let me know how you get on.

Going very well, Guy! I've got more scrubbing happening, and now processing some logs into the system. I switched back to filebeat because it seemed to be handling file management better. Yes, I was able to fool it into forwarding the whole file as a single event.

I'm finding that my biggest blockers are not conceptual - they are syntactical!

Like . . . I want to force the field [qso][freq] to be integer. I'm sure I can do it with mutate convert. But, it would be cleaner to simply specify it right in the dissect mapping line, and I'm sure its possible. Can't figure out the syntax to accomplish that, though.

This is getting down to the simple things, I believe . . . thanks for all your help!

Jim