Parse Array of JSON object

input {
    jdbc {
        jdbc_connection_string => "jdbc:postgresql://localhost:5432/db"
        jdbc_user => "user"
        jdbc_password => "pass"
        jdbc_driver_library => "/usr/share/logstash/lib/postgresql-42.5.4.jar"
        jdbc_driver_class => "org.postgresql.Driver"
        tracking_column => "last_updated"
        tracking_column_type => "timestamp"
        jdbc_default_timezone => "Asia/Almaty"
        use_column_value => true
        clean_run => true
        schedule => "*/5 * * * * *"
        statement =>  "SELECT t.id, t.last_updated, t.author_id,
                        (
                              SELECT jsonb_agg(jsonb_build_object(
                                'id', up.id,
                                'first_name', up.first_name,
                                'last_name', up.last_name,
                                'photo', up.photo
                              ))
                              FROM public.users AS up
                              WHERE up.id = ANY(t.assignee[1:3])
                          )::TEXT AS assignees_string
                      FROM forms.tasks AS t
                      LEFT JOIN forms.projects AS p ON p.id = t.project_id
                      LEFT JOIN forms.statuses AS s ON s.id = t.status
                      WHERE t.last_updated > :sql_last_value
                      ORDER BY last_updated ASC"
        last_run_metadata_path => "/mnt/.logstash_jdbc_last_run_test"
        type => "tasks"
    }

}

 filter {
    if [type] == "tasks" {
        ruby {
        code => "
          require 'json'
          begin
              assignees_json = JSON.parse(event.get('assignees_string').to_s || {})
              event.set('assignees', assignees_json)
          rescue Exception => e
                event.tag('invalid boundaries json')
          end
        "
        }
    }
}

filter {
  mutate {
    remove_field => ["@version", "@timestamp",, "assignees_string"]
  }
}

output {
        if [type] == 'tasks' {
                elasticsearch {
                    hosts => "127.0.0.1"
                    index => "test"
                    document_id => "%{id}"
                }
                stdout {codec=>"rubydebug"}
        }
}


Can U help me to parse this plz?

Hi,

Why are you using a ruby filter to parse JSON? Logstash has an extra JSON filter for that.

 filter {
    if [type] == "tasks" {
        json {
          source => "assignees_string"
          target => "assignees"
        }
    }
}

Have you checked what the result of the SQL is and can you give us an example document (anonymize data if required before posting)?

Best regards
Wolfram

Hi, Thank You for your response, but...

This is error happens in the console when I put your code

Code in logstash

input {
    jdbc {
        jdbc_connection_string => "jdbc:postgresql://localhost:5432/db"
        jdbc_user => "user"
        jdbc_password => "pass"
        jdbc_driver_library => "/usr/share/logstash/lib/postgresql-42.5.4.jar"
        jdbc_driver_class => "org.postgresql.Driver"
        tracking_column => "last_updated"
        tracking_column_type => "timestamp"
        jdbc_default_timezone => "UTC"
        use_column_value => true
        clean_run => true
        schedule => "*/5 * * * * *"
        statement =>  "SELECT t.id, t.last_updated,
				  (
                              SELECT jsonb_agg(jsonb_build_object(
                                'id', up.id,
                                'first_name', up.first_name,
                                'last_name', up.last_name,
                                'photo', up.photo
                              ))
                              FROM public.users AS up
                              WHERE up.id = ANY(t.assignee[1:3])
                          ) AS assignees_string
                      FROM forms.tasks AS t
		          LEFT JOIN forms.projects AS p ON p.id = t.project_id
                      LEFT JOIN forms.statuses AS s ON s.id = t.status
                      WHERE t.last_updated > :sql_last_value
                      ORDER BY t.last_updated ASC"
        last_run_metadata_path => "/mnt/.logstash_jdbc_last_run_test"
        tags => ["tasks"]
    }

}

 filter {
   if "tasks" in [tags] {
	json {
		source => "assignees_string"
		target => "assignees"
	}
   }
}

output {
	if 'tasks' in [tags] {

        	elasticsearch {
        	    hosts => "127.0.0.1"
        	    index => "test"
        	    document_id => "%{id}"
        	}

		stdout {codec=>"rubydebug"}

	}

    
}

Best regards
Nurm

Then I guess that this is the cause for your error: Logstash does not get a JSON string as expected, it gets a PGObject and Logstash cannot parse it.
In your initiali pipeline you had ::TEXT which is missing in the latest pipeline - I guess this should convert the object to the JSON string? Can you try it again with that?

Yeah, I forgot this '::TEXT', Thank You very much!

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.