input {
jdbc {
jdbc_connection_string => "jdbc:postgresql://localhost:5432/db"
jdbc_user => "user"
jdbc_password => "pass"
jdbc_driver_library => "/usr/share/logstash/lib/postgresql-42.5.4.jar"
jdbc_driver_class => "org.postgresql.Driver"
tracking_column => "last_updated"
tracking_column_type => "timestamp"
jdbc_default_timezone => "Asia/Almaty"
use_column_value => true
clean_run => true
schedule => "*/5 * * * * *"
statement => "SELECT t.id, t.last_updated, t.author_id,
(
SELECT jsonb_agg(jsonb_build_object(
'id', up.id,
'first_name', up.first_name,
'last_name', up.last_name,
'photo', up.photo
))
FROM public.users AS up
WHERE up.id = ANY(t.assignee[1:3])
)::TEXT AS assignees_string
FROM forms.tasks AS t
LEFT JOIN forms.projects AS p ON p.id = t.project_id
LEFT JOIN forms.statuses AS s ON s.id = t.status
WHERE t.last_updated > :sql_last_value
ORDER BY last_updated ASC"
last_run_metadata_path => "/mnt/.logstash_jdbc_last_run_test"
type => "tasks"
}
}
filter {
if [type] == "tasks" {
ruby {
code => "
require 'json'
begin
assignees_json = JSON.parse(event.get('assignees_string').to_s || {})
event.set('assignees', assignees_json)
rescue Exception => e
event.tag('invalid boundaries json')
end
"
}
}
}
filter {
mutate {
remove_field => ["@version", "@timestamp",, "assignees_string"]
}
}
output {
if [type] == 'tasks' {
elasticsearch {
hosts => "127.0.0.1"
index => "test"
document_id => "%{id}"
}
stdout {codec=>"rubydebug"}
}
}
Can U help me to parse this plz?