SQL data upload using JDBC-logstash

Hi,
I'm uploading data using jdbc-logstash script but when i run ths script it adds allover data again and again so index is showing count of duplicate data too.

This is the script which i am using

input {
  jdbc {
    jdbc_driver_library => "/usr/share/java/mysql-connector-java-8.2.0.jar"
    jdbc_driver_class => "com.mysql.cj.jdbc.Driver"
    jdbc_connection_string => "jdbc:mysql://127.0.0.1:3306/app_medixcel_base"
    jdbc_user => "root"
    jdbc_password => "plus91"
    use_column_value => true
    tracking_column => "leave_id"
    tracking_column_type => "numeric"
    statement => "SELECT leave_id, staff_id, start_date, end_date, end_time, leave_reason, is_full_day, approved, added_on, approval_reason, approved_by, 
         clinic_id, leave_in_days FROM app_medixcel_base.mxcel_staff_leaves WHERE start_date BETWEEN '2023-01-27' AND '2024-03-27';"
  }
}

filter {
  mutate {
    copy => { "id" => "[@metadata][_id]"}
    remove_field =>  ["@version"]
  }
}

output {
  elasticsearch {
    hosts => ["http://localhost:9200"]
    user => "elastic"
    password => "plus91"
    index => "index_mxcel_patient_leaves"
  }
  stdout { codec => "rubydebug" }
}
 My tracking coloumn is leave_id 
this script is running successfully and not giving any error how I can avoid the duplicate data upload?

If the leave_id field is the primary key, you can use as doc_id. If is not, then you should use fingerprint. Check the similar topic.

output {
  elasticsearch {
    hosts => ["http://localhost:9200"]
    user => "elastic"
    index => "index_mxcel_patient_leaves"
    document_id => "%{leave_id}"
    doc_as_upsert => true
  }

HI @Rios
leave_id is primary key here is my updated script

  jdbc {
    jdbc_driver_library => "/usr/share/java/mysql-connector-java-8.2.0.jar"
    jdbc_driver_class => "com.mysql.cj.jdbc.Driver"
    jdbc_connection_string => "jdbc:mysql://127.0.0.1:3306/app_medixcel_base"
    jdbc_user => "root"
    jdbc_password => "plus91"
    use_column_value => true
    tracking_column => "leave_id"
    tracking_column_type => "numeric"
    statement => "SELECT leave_id, staff_id, start_date, end_date, end_time, leave_reason, is_full_day, approved, added_on, approval_reason, approved_by,
         clinic_id, leave_in_days FROM app_medixcel_base.mxcel_staff_leaves WHERE start_date BETWEEN '2023-01-27' AND '2024-03-27';"
  }
}

filter {
  mutate {
    copy => { "id" => "[@metadata][_id]"}
    remove_field =>  ["@version"]
  }
}

output {
  elasticsearch {
    hosts => ["http://localhost:9200"]
    user => "elastic"
    password => "plus91"
    index => "index_mxcel_patient_leaves"
    document_id => "%{leave_id}"
    doc_as_upsert => true

  }
  stdout { codec => "rubydebug" }
}

Have you tested?

Yes I did. not working

Please copy a message which come to the filter, replace anything which is classified. Simply need a data sample :slight_smile:

this is my data sample

{
    "approval_reason" => "Done",
        "is_full_day" => 1,
      "leave_in_days" => 0,
           "staff_id" => 138,
         "start_date" => 2023-01-26T18:30:00.000Z,
          "clinic_id" => 0,
           "end_date" => 2023-01-26T18:30:00.000Z,
           "added_on" => 2024-01-19T20:29:39.000Z,
           "approved" => true,
       "leave_reason" => "Vacation",
           "end_time" => 2024-02-17T00:00:00.000Z,
        "approved_by" => 0,
         "@timestamp" => 2024-02-17T06:53:56.527Z,
           "leave_id" => 1
}

I have made the version which use same logic, it's working fine.

  • new leave_id, new doc
  • existing leave_id in ES, updated doc
    There is the article which address duplicated records.
input {
  generator {
       message => 'approval_reason:Done, is_full_day:0, leave_in_days:0, staff_id:138, start_date:2023-01-26T18:30:00.000Z, clinic_id:0, end_date:2023-01-26T18:30:00.000Z, added_on:2024-01-19T20:29:39.000Z, approved:true, leave_reason:Vacation, end_time:2024-02-17T00:00:00.000Z, approved_by:0, leave_id:1'
	   count => 1
  }
} 

filter {

   kv { 
      source => "message"
      value_split => ":"
      field_split => ",  "
   }
   
   # fingerprint {
   #  source => "message"
   #  target => "[@metadata][fingerprint]"
   #  method => "SHA1"
   #  base64encode => true
   # }

   date {
        match => [ "start_date", "ISO8601" ]
       target => "start_date"
   }
	  
   date {
        match => [ "end_date", "ISO8601" ]
       target => "end_date"
   }
   date {
        match => [ "added_on", "ISO8601" ]
       target => "added_on"
   }
   date {
        match => [ "end_time", "ISO8601" ]
       target => "end_time"
   }
   mutate {  convert => {
          "is_full_day" => "integer"
          "leave_id" => "integer"
          "approved_by" => "integer"
          "staff_id" => "integer"
          "clinic_id" => "integer"
          "approved" => "boolean"
          "leave_in_days" => "integer" }
   } 
}
output {
   	elasticsearch {
        hosts => ["localhost:9200"]
        index => "testupsert_%{+YYYY.MM.dd}"
        user => "elastic"
        document_id => "%{[leave_id]}" # [@metadata][fingerprint]
        doc_as_upsert => true
        action => 'update'
     }
    stdout { codec => rubydebug{ metadata => true} }          
}
1 Like

I forgot to mention, you don't need fingerprint since you have leave_id as PK. However there is the code for fingerprint to make the unique id and you just set as document_id.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.