Help with processing metadata at start of file

Hi,

I am trying to process a file which has the following format

CreationDate;Application;ReportType;FromDate;UntilDate
2020-01-01 03:15:00;VOPS;System Report;2019-12-01 00:00:00;2020-01-01 00:00:10

SerialNo;Value;FinalRechargeValue;CurrencyCode;RechargePeriod;ExpirationDate;VoucherTypeID;PackageID;DealerID;VoucherState;PreviousState;UserName;MSISDN;MSISDNProviderID;VoucherProviderID;Account;DisabledDate;DeliveredDate;EnabledDate;UsageReqDate;UsedDate;BlockedDate;DeletedDate;ProductionType
9000000000003204;5000;;USD;2;2020-12-31 23:59:59;100;90000000000032;Dealer;U;R;ccID;8399393;;300;200;2019-11-21 10:11:11;2019-12-04 11:48:25;2019-12-10 10:39:29;2019-12-10 10:40:38;2019-12-10 10:40:38;;;0
9000000000003200;5000;;USD;20;2020-12-31 23:59:59;100;90000000000032;Dealer;U;R;ccID;8908000;;300;200;2019-11-21 10:11:11;2019-12-04 11:48:25;2019-12-10 10:13:08;2019-12-10 10:16:35;2019-12-10 10:16:35;;;0

Expected Output:
9000000000003204;5000;;USD;2;2020-12-31 23:59:59;100;90000000000032;Dealer;U;R;ccID;8399393;;300;200;2019-11-21 10:11:11;2019-12-04 11:48:25;2019-12-10 10:39:29;2019-12-10 10:40:38;2019-12-10 10:40:38;;;0;2020-01-01 03:15:00;VOPS;System Report;2019-12-01 00:00:00;2020-01-01 00:00:10

9000000000003200;5000;;USD;20;2020-12-31 23:59:59;100;90000000000032;Dealer;U;R;ccID;8908000;;300;200;2019-11-21 10:11:11;2019-12-04 11:48:25;2019-12-10 10:13:08;2019-12-10 10:16:35;2019-12-10 10:16:35;;;0;2020-01-01 03:15:00;VOPS;System Report;2019-12-01 00:00:00;2020-01-01 00:00:10

The first two lines contain the header and the lines afterwards contain data.

Is there a way in logstash to process the header and add it to every event ?

My current approach is to count number of fields in one row and decide based on that but I am unable to persist the metadata and attach it to every event that follows in the file. Any pointers on how to achieve this?

if "CreationDate" in [message] or "Events" in [message] or [message] =~ /^$/
{
drop {}
}
ruby
{
code => 'event.set("[field_count]", event.get("message").split(";").count())'
}

if [field_count] == 5
{
    csv
    {
        columns =>
        [
            "[@metadata][CreationDate]",
            "[@metadata][Application]",
            "[@metadata][ReportType]",
            "[@metadata][FromDate]",
            "[@metadata][UntilDate]"
        ]
        separator => ";"
    }

Take a look at this post.

You must set pipeline.workers to 1 and also disable the java execution engine to preserve the order of events. The solution does not scale, and I regard all solutions that involve ruby class variables as fragile. But if you need to get the job done that is the approach I would suggest.

1 Like

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.