Sql_last_value, store value even no document indexed

Hello everyone,
I'm using logstash jdbc to transform my data in postgresql to Elasticsearch.
Because my sql query is too large, about 1 millions records, i have to use schedule to transform about 5 row each time, and it repeats every second.
I use tracking_column to track ID column.
The problem is that sometime there are two consecutive rows that the gap is bigger than 100 and logstash stops at that point and :sql_last_value too.

My config file is like following

input {
    jdbc {
        jdbc_connection_string => "jdbc:postgresql://${POSTGRES_HOST}:${POSTGRES_PORT}/${DB_NAME}"
        jdbc_driver_class => "org.postgresql.Driver"

        jdbc_user => "${JDBC_USER}"
        jdbc_password => "${JDBC_PASSWORD}"

        jdbc_paging_enabled => true

        use_column_value => true
        tracking_column_type => "numeric"
        tracking_column => "decision_id"
        last_run_metadata_path => "/usr/share/logstash/config/decision_last_value.yml"
        record_last_run => true

        statement => "
select  d.id as decision_id, daet.content as element_content
from decision d
left join decision_element dae on dae.decision_id = d.id
WHERE decision_id > :sql_last_value AND decision_id < :sql_last_value + 5
"

        type => "decision"
        schedule => "* * * * * *"
    }
}

filter {
    mutate {
        add_field => {
            "[my_join_field][name]" => "decision"
            "[my_join_field][parent]" =>  "%{case_id}"
        }
    }
}

output {
    elasticsearch {
    	hosts => "elasticsearch:9200"
		user => "elastic"
		password => "changeme"
        index => "case"
      }
}

For example, i have decision_id in two consecutive row is 14 and 120. Logstash doesn't move on at decision_id = 14 and :sql_last_value stay at 14 because there is no document between 14 < decision_id < 19. Are there anyway to update :sql_last_value or maybe another way ?
Thank alot.

pilo
what happens if you run your sql without where clause?
is it gone a over load database.
elasticsearch should be able to handle many million records in one go.

few other thing I see from your config is that
it is gone a execute this query every second
it is gone a duplicate record if you run this again from other system or after removing last_run_metadata file.

Hello, thank for your reply.
When i run your sql without where clause, i have javaheapsize outofmemory error.
Do you have another approach for this error ?

you need to increase your java heap size

elkm01 ~]# grep Xms /etc/logstash/jvm.options
-Xms19g

how much memory your system has? assign little more memory to it. by default it might be 1gig which is not large enough.

Hello, thank you for your reply.
In fact i raised java heap size to 3g and can't go further.
Otherwise, after doing some search, i found this:


It's so weird that no one talk about this. I tried version 7.0.0 and it works.
Which version did you use ?
This is my config:

input {
    jdbc {
        jdbc_connection_string => "jdbc:postgresql://${POSTGRES_HOST}:${POSTGRES_PORT}/${DB_NAME}"
        jdbc_driver_class => "org.postgresql.Driver"

        jdbc_user => "${JDBC_USER}"
        jdbc_password => "${JDBC_PASSWORD}"

        jdbc_paging_enabled => true
        jdbc_page_size => 10000

        statement_filepath=> "/usr/share/logstash/config/indexing_sql/decision_indexing.sql"

        type => "decision"
    }
}

filter {
    mutate {
        add_field => {
            "[my_join_field][name]" => "decision"
            "[my_join_field][parent]" =>  "%{case_id}"
        }
    }
}

output {
    elasticsearch {
    	hosts => "elasticsearch:9200"
		user => "elastic"
		password => "changeme"
        index => "case"
        routing => "%{case_id}"
    }
}

Do you have any idea what's missing ?

I am not sure but I think after some verson in 7.x type has been remove. which type do not know. search on it.

type => decision

try very simple thing without much. first test that your data is being pulled from database and display on screen

input {
    jdbc {
        jdbc_connection_string => "jdbc:postgresql://${POSTGRES_HOST}:${POSTGRES_PORT}/${DB_NAME}"
        jdbc_driver_class => "org.postgresql.Driver"
        jdbc_user => "${JDBC_USER}"
        jdbc_password => "${JDBC_PASSWORD}"
        jdbc_paging_enabled => true
        jdbc_page_size => 10000
        statement_filepath=> "/usr/share/logstash/config/indexing_sql/decision_indexing.sql"
      }
}

filter {}

output {
   stdout { codec => rubydebug }
}

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.