Read logfile once, insert two events to elastic

Hello, I have a scenario where I read a log file and want to insert two entries to elastic.

Basic example:

Dummy log

20/08/25T12:00:00 [random-id-01]: user1 connected
20/08/25T12:00:01 [random-id-01]: uploaded file.xml
20/08/25T12:00:02 [random-id-01]: disconnected

Example events created

// each line
{ logTime: "20/08/25T12:00:00", id: "random-id-01", message: "user1 connected" }
{ logTime: "20/08/25T12:00:01", id: "random-id-01", message: "uploaded file.xml" }
{ logTime: "20/08/25T12:00:02", id: "random-id-01", message: "disconnected" }

// aggregated event
{ logTime: "20/08/25T12:00:02", id: "random-id-01", user: "user1", action: "upload", filename: "file.xml" }

I created two Logstash .conf files reading the same log file. But am I correct in assuming this will cause problems without defining the sincedb_path, that they will step on each other's toes? Assuming that is correct, would you suggest setting a sincedb_path?

I also tried using the clone filter plugin, but I'm not able to get that to work.

Any suggestions how you would usually handle this kind of case? Two .conf files? Clone filter plugin? different solution?

Hello @nilsen

As i am not sure about the exact end requirement , looking at a similar usecase if it can be helpful to you :

So if i see your logs & if this is available in elastic:

{ logTime: "20/08/25T12:00:00", id: "random-id-01", message: "user1 connected" }
{ logTime: "20/08/25T12:00:01", id: "random-id-01", message: "uploaded file.xml" }
{ logTime: "20/08/25T12:00:02", id: "random-id-01", message: "disconnected" }

Using ES|QL we can get the aggregated record :

FROM test-message
| EVAL 
    user = REPLACE(CASE(message LIKE "* connected*", message, null), " connected", ""),
    filename = REPLACE(CASE(message LIKE "*uploaded*", message, null), "uploaded ", ""),
    action = CASE(message LIKE "*uploaded*", "upload", null),
    disconnect_time = CASE(message LIKE "*disconnected*", logTime, null)
| STATS 
    logTime = MAX(disconnect_time),
    user = MAX(user),
    action = MAX(action),
    filename = MAX(filename)
    BY id
| WHERE logTime IS NOT NULL AND user IS NOT NULL AND action IS NOT NULL AND filename IS NOT NULL
| KEEP logTime, id, user, action, filename
| SORT logTime ASC

Thanks!!

1 Like

If I understand you correctly, you would rather just insert the raw lines into an elastic index, and then use other tools to extract/aggregate the data?

Currently I have two Logstash .conf files, one is just pushing the raw lines to my file-log* index, and another one using the aggregate filter plugin to insert into my file-aggregated* index.

You can use pipeline-to-pipeline communication with a forked path pattern.

Thank you for your response. May I ask if my assumption is correct that my two Logstash configurations conflict as they tail the same logfile?

From the documentation “A different sincedb_path must be used for each input. Using the same path will cause issues. The read checkpoints for each input must be stored in a different path so the information does not override.”

1 Like

Thank you, your pipeline-to-pipeline forked path pattern suggestion worked well. Though, it seems like the examples I find often reuse already handled/processed events, for example pushing the same result to two different outputs with small adjustments. And since I parse log lines from scratch every time, I could have just used different sincedb_path to solve my issue? No idea if that is an anti-pattern or would have some other drawbacks though.

If the two pipelines have nothing in common then yes, you could just use two file inputs. But that would be unusual. Normally there is some parsing done in a pipeline which then sends events to two other pipelines to fine tune the events as needed for their destinations.

1 Like