Splitting Records With Start and End Times Into Separate Documents With A Timestamp Every X Minutes

I'm currently attempting to split a single activity SQL record with a start and end time into multiple documents in an index. Each document will have a single timestamp (and duplicated data fields) and each timestamp is 15 minutes apart: slicing up a compile run into multiple pieces. I know of no visualization tools that end in "*ana" that can visualize records with start and end time and show the reserved resource (cores, threads, whatever...) as a line or bar in a time series plot while the job was active. I only get a blip for the values at the start of the event. I want a steady line of # cores/threads/mice on wheels used over the entirety of the duration of event until end_time.

Assume a record looks something like this:

<start_time_iso8601>, <end_time_iso8601>, <resources_used>, <user>, <project>, <hostname>

I wish to break this into multiple slices where each timestamp is 15 minutes apart. So if the difference between start_time_iso8601 and end_time_iso8601 is 48 minutes, I would have five slices with data:

start_time_iso8601, <resources_used>, <user>, <project>, <hostname>
start_time_iso8601+15 min, <resources_used>, <user>, <project>, <hostname>
start_time_iso8601+30 min, <resources_used>, <user>, <project>, <hostname>
start_time_iso8601+45 min, <resources_used>, <user>, <project>, <hostname>
end_time_iso8601, <resources_used>, <user>, <project>, <hostname>

Data is coming from a SQL database. Is this even possible? I'm trying to do it in a Ruby filter using LogStash::Event.new(), but I'm clearly missing something (including complete documentation) since I end up with all kinds of filewatch errors and warnings when I do this. Another, earlier Ruby filter called in the .conf calculates the number of slices the event will require. Now I'm looking to actually generate five new events from the first one. All in the pursuit of being able to display historical data of resources used.

Any explanation here or pointers to how LogStash::Event.new() actually works would be fine.

Here's how I'm currently trying to do this based upon the few threads I've been able to find that are kind of similar:

#!/usr/bin/env ruby

# the value of `params` is the value of the hash passed to `script_params` 
# in the logstash configuration
def register(params)
    #@times = params["time_field"]
end

# the filter method receives an event and must return a list of events.
# Dropping an event means not including it in the return array,
# while creating new ones only requires you to add a new instance of
# LogStash::Event to the returned array
def filter(event)
    require 'time'

    # The number of necessary slices was found in a previous
    # filter, so generate that number of events with each
    # event having its own timestamp that is set 15 minutes
    # later than the previous event.
    num_of_slices     = event.get( "num_of_slices" )
    running_timestamp = Time.parse( event.get( "@opentimestamp" ) )
    end_timestamp     = Time.parse( event.get( "@closetimestamp" ) )
    runind            = event.get( "runidx" )

    num_of_slices.to_i.times { |i|
        newhash = event.to_hash
        gen     = LogStash::Event.new( newhash )
        gen.set( "runidxseq", runind.to_s + "_" + i.to_s )
        gen.set( "@slice_timestamp", running_timestamp )
        gen.set( "@timestamp_common", running_timestamp )

        if num_of_slices > 1
            if running_timestamp + 900 > end_timestamp
                gen.set( "@slice_timestamp", end_timestamp )
                gen.set( "@timestamp_common", end_timestamp )
            end
            running_timestamp = running_timestamp + 900
        end
        new_event_block.call( gen )
    }

    # First step recreated this event, so drop the incoming version.
    event.cancel
end

@timestamp_common is the agreed 'reference' timestamp field to allow multiple indices to plot together with a single proper x-axis date histogram. Doing it here rather than in the .conf file as a mutate operation.

Thank you for any help.

nick

Another way of doing it would be

input { generator { count => 1 lines => [
    '2024-06-06T16:37:33.463Z,2024-06-06T17:10:00.123Z,Foo',
    '2024-06-06T16:45:00.250Z,2024-06-06T17:30:00.500Z,Bar',
    '2024-06-06T17:00:00.000Z,2024-06-06T17:15:00.000Z,Baz'
] } }

filter {
    csv { columns => [ "startTime", "endTime", "label" ] }
    ruby {
        code => '
            startTime = Time.parse( event.get("startTime") )
            endTime = Time.parse( event.get("endTime") )
            numSlices = ( (endTime - startTime) / 900 ).ceil

            event.set("@timestamp", LogStash::Timestamp.new(startTime))

            currentTime = startTime + 900
            while currentTime < endTime
                clone = event.clone
                clone.set("@timestamp", LogStash::Timestamp.new(currentTime))
                new_event_block.call(clone)
                currentTime += 900
            end

            clone = event.clone
            clone.set("@timestamp", LogStash::Timestamp.new(endTime))
            new_event_block.call(clone)
        '
    }

That... looks brilliant and like it might work with some tweaks for my situation. I will give it a try, and if successful, I will respond back here so others can also benefit.

Thank you!

nick

Ok. I've finally got the time in my schedule to come back to this. It seems like it will work fine but I have one question: If my @opentimestamp and @closetimestamp are already LogStash::Timestamp objects by the time they get here, where can I find documentation on how to convert them to proper Ruby Time objects? I've tried just simple

open_time = event.get( "@opentimestamp" )

I've tried

open_time = Time.new( event.get( "@opentimestamp" ) )

And also

open_time = Time.parse( event.get( "@opentimestamp" ) )

None of these is correct. But without having them as Ruby Time objects, the

while curr_time < close_time

conditional always fails.

It's entirely possible that my search-fu is out of practice (I've tried DDG and Kagi, Google is just going to be garbage SEO results) but I can't find how to convert LogStash::Timestamp to a Ruby Time object IN THE RUBY SCRIPT. I understand I'll then have to cast back to LogStash::Timestamp on clone.set().

Once I have a working filter, I will post it in its entirety here in this thread since I can't be the only one who is doing this.

Thanks!

nick

Use Time.at. Example here.

This code works!

    ruby {
      code => '
        require "time"
    
        open_time         = Time.at( event.get( "@opentimestamp" ).to_f )
        close_time        = Time.at( event.get( "@closetimestamp" ).to_f )
        runidx            = event.get( "runidx" )
        i                 = 0
    
        runidxseq         = runidx.to_s + "_" + i.to_s
    
        # First slice.
        event.set( "runidxseq", runidxseq )
        event.set( "@timestamp_common", LogStash::Timestamp.new( open_time ) )
        #event.set( "@timestamp_common", open_time )
    
        # Add 15 minutes:
        curr_time = open_time + 900
        i += 1
    
        # Middle slices.
        while curr_time < close_time
          clone = event.clone
          clone.set( "runidxseq", runidx.to_s + "_" + i.to_s )
          clone.set( "@timestamp_common", LogStash::Timestamp.new( curr_time ) )
          new_event_block.call( clone )
          i += 1
          curr_time += 900
        end
    
        # Last slice.
        clone = event.clone
        clone.set( "runidxseq", runidx.to_s + "_" + i.to_s )
        clone.set( "@timestamp_common", LogStash::Timestamp.new( close_time ) )
        #clone.set( "@timestamp_common", close_time )
        new_event_block.call( clone )
       '
    } # /ruby

Further question, however: when I try to put this in it's own .rb file it won't work due to the change in environment (namely, it complains about new_event_block). How would it be possible to make it work as a .rb file? This is just an additional 'nice to have' as I prefer my Ruby code to be .rb files rather than jammed into the .conf but inline is fine b/c it works.

Thank you for the help!

nick

When you use a script file in a ruby filter it returns an array of events. The ruby code above lets the event object flow through and calls new_event_block for each clone. You would have to change

    event.set( "@timestamp_common", LogStash::Timestamp.new( open_time ) )

to

    event.set( "@timestamp_common", LogStash::Timestamp.new( open_time ) )
    events = []
    events << event

and then replace each new_event_block.call( clone ) call with

    events << clone

and finally add

    events

as the last line of the code to make that the return value of the function.