Streaming logs with csv issue

Hi,
I want to stream logs from Postgres to elk but it doesn't parse the log properly.

Logstash.conf file:

input {
	file {
		path => "/path/to/log.csv"
		start_position => "beginning"
		sincedb_path => "/dev/null"
	}
}
filter {
	 csv {
            separator => ","
            quote_char => '"'
            columns => [
                        "field_1",
			"field_2",
			"field_3",
			"field_4",
			"field_5"
    		       ]
        }
}
output {
	elasticsearch {
		hosts => "localhost"
		index => "test1"
	}
}

Example of csv file input:

2025-07-22T15:10:00:0000,...,
"SELECT
        c.customer_id,
        c.name AS customer_name,
        DATE_TRUNC('month', o.order_date) AS month,
        SUM(oi.quantity * oi.unit_price) AS total_spent
    FROM
        customers c
        JOIN orders o ON c.customer_id = o.customer_id
        JOIN order_items oi ON o.order_id = oi.order_id
    WHERE
        o.order_date >= NOW() - INTERVAL '12 months'
    GROUP BY
        c.customer_id,
        c.name,
        month",
...

Any suggestions would be appreciated.

Thanks

Hi @Idan_Klatza Welcome to the community.

Can you provide several full entries so we can see (you can anonymize data where needed) we need to see what a couple records look like..

In short I see 2 parts

  1. You will need a to manage multiline input

  2. The "s and embedded ,s (commas) may or may not come into play; that is why we would need to see more details on the actual log file. The details matter.

There is also 2 different ways to approach CSVs with a codec or filter

In the file input itself (codec) and in a filter

perhaps @Badger might provide guidance ... the best approach here...

1 Like

As Stephen says, we really need to see some example data to be sure, but I can make a start using the example you showed. With the file

2025-07-22T15:10:00:0000,...,
"SELECT
        c.customer_id,
        c.name AS customer_name,
    FROM
        customers c",
...
2025-07-22T15:10:00:0000,...,
"Some string containing ,
        word!",
Foo

we can use a multiline codec on a file input to read that as two events

codec => multiline {
    pattern => "^\d{4}-\d{2}-" 
    negate => true 
    what => previous 
    auto_flush_interval => 2
}

which aims to chop up the file using lines that start with "4 numbers dash 2 numbers dash". This will produce an event like

   "message" => "2025-07-22T15:10:00:0000,...,\n\"Some string containing , \n        word!\",\nFoo",

Your csv filter will turn that into

   "field_1" => "2025-07-22T15:10:00:0000",
   "field_2" => "..."
   "field_3" => nil,

which looks wrong. The problem is that if double-quotes are used to surround a field they have to surround the entire field. In your file there is a newline that starts the field, and it comes before the opening double quote. That is,

Foo,"
Bar",...

is a valid CSV line.

Foo,
"Bar",...

is not.

You can fix that using mutate

mutate { gsub => [ "message", ',\n', ',' ] }

which as well as fixing the problem will strip some newlines from the SQL. It is probably possible to develop a more complex regexp with lookaheads that avoids that problem, but I don't know how to do it.

If you add that simple mutate+gsub you will get

   "field_1" => "2025-07-22T15:10:00:0000",
   "field_2" => "..."
   "field_3" => "Some string containing , \n        word!",
   "field_4" => "Foo",
1 Like

I had to set the time for each iteration to separate each message with 'pattern => "^\d{4}-\d{2}-"', this helped me a lot. Thank you.