Postgres output from json type doesn't work, writes null values

Hello,

I need a little help, I'm trying to write from a kafka topic to a postgres table only specific fields and it doesn't work, it writes nulls. I haven't figured out how to post jsons to kafka topic so I'm getting them from a folder for now. I also tried formatting the json as single line since I was getting a separate message for each line in postgres. Also I think that kafka messages will be formatted as single line when I figure out how to post json messages, right? Anyways, here's my config:

input{
file{
path=>"C:/Work/kafka-elasticsearch-connector/*.json"
start_position=>"beginning"
}
}
output {
jdbc {
connection_string => "jdbc:postgresql://localhost:5432/test?user=postgres&password=password"
statement => [ "INSERT INTO schema.logstashdata (topic, sentutc) VALUES(?, ?)", "topicName", "sentUtc" ]
}
stdout {
codec => rubydebug
}
}

Here's an example of json I am using as test:

{
"header": {
"topicName": "TOPIC1",
"topicVer1": 1,
"sentUtc": "2020-01-27T14:07:10Z",
"status": "Test",
"msgType": "Update",
"code": ["status"]
},
"body": {
"location": {
"lat": 40.9999990,
"lon": 29.9999997
},
"status": {
"general": "OK",
},
"tasks": [{
"id": "task-0001",
"status": "Read"
}]
}
}
What am I doing wrong? Also I will need to add whole json as a field, I created a jsonb column in my table, how to do it?

Many thanks in advance,
Adrian.

If you have the entire JSON object as a single line in a file then the file filter will create a field called [message] that contains the entire JSON. You can use that to populate the jsonb column. You will need a json filter to parse the JSON. You will then be able to refer to [header][topicName] and [header][sentUtc].

Thanks, I thought it would be an easier solution than that. Meanwhile I hardcoded jsons to kafka :slight_smile:

I have this filter that worked for a write from kafka to elastic, will work from there:

filter {
grok {
match => { "message" => "%{GREEDYDATA:timestamp} %{LOGLEVEL:log-level} [%{DATA:link}] %{DATA:class} %{GREEDYDATA:message}" }
}
}

On a closer look I think I need the json filter, do you have an example please? for the [header][topicName] part, can't find any atm.

json { source = > "message" }

Thanks for the help badger, I finally got to this result:

left the filter as you said:
filter{
json{
source => "message"
}
}

But figured out how to identify the data directly in output as below:

statement => [ "INSERT INTO schema.top (topic, message) VALUES(?, ?)", "[header][topicName]", "message" ]

I am trying to send the whole kafka message as last column(message), I have it as character varying in postgres, what am I doing wrong? Is it not identified with "message"? I'm getting no error but the result in postgress is NULL.

Edit:
Solved it, updated the filter to this:

filter{
json{
source => "message"
}
ruby { code => ' event.set("everything", event.to_json) ' }
}

and the output insert to this:

statement => [ "INSERT INTO schema.topic (topic, message) VALUES(?, ?)", "[header][topicName]", "%{everything}" ]

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.