Remove nested documents from index using logstash

Hi All i am trying to index the data from MS SQL server to elastic search as nested documents it's working fine but the challenge is i have to remove the nested documents and main documnet if the is_active column retrieved from SQL server is false. here is my config file

    input {
    jdbc {
    jdbc_driver_library => "D:/Users/xxxx/Desktop/driver/mssql-jdbc-7.4.1.jre12-shaded.jar"
    jdbc_driver_class => "com.microsoft.sqlserver.jdbc.SQLServerDriver"
    jdbc_connection_string => "jdbc:sqlserver://EC2AMAZ-J90JR4A\SQLEXPRESS:1433;databaseName=xxxx;"
    jdbc_user => "xxx"
    jdbc_password => "xxxx"
    statement => "Select Policyholdername,Age,Policynumber,Dob,Client_Address,is_active from policy"
    }
    }
    filter{
    jdbc_streaming {
    jdbc_driver_library => "D:/Users/xxxxx/Desktop/driver/mssql-jdbc-7.4.1.jre12-shaded.jar"
    jdbc_driver_class => "com.microsoft.sqlserver.jdbc.SQLServerDriver"
    jdbc_connection_string => "jdbc:sqlserver://EC2AMAZ-J90JR4A\SQLEXPRESS:1433;databaseName=xxxx;"
    jdbc_user => "xxxx"
    jdbc_password => "xxxx"
    statement => "select claimnumber,claimtype,is_active from claim where policynumber = :policynumber"
    parameters => {"policynumber" => "policynumber"}
    target => "claim_details"
    }
    }
    output {
    elasticsearch {
    hosts => "https://e5a4a4a4de7940d9b12674d62eac9762.eastus2.azure.elastic-cloud.com:9243"
    user => "xxxxx"
    password => "xxxx"
    index => "xxxx"
    action => "index"
    document_type => "_doc"
    document_id => "%{policynumber}"

    }
    stdout { codec => rubydebug }
    }

Could some one help to achieve this? Thanks in advance

Hi @Mohan_vel,

You can remove those inactive ones in the source. It is way more simpler to adjust in the source than drop those in the logstash :slight_smile:

Select Policyholdername,Age,Policynumber,Dob,Client_Address,is_active from policy
where is_active = "true"

1 Like

Hi @inhinyera16,

i am trying this approach for doing incremental indexing

Above select query you have mentioned will retrieve only the active records from Database in that case i won't be able to delete the row which already indexed as nested document in elastic search. Hope you able to understand.

If i am right i have to retrieve all the rows both active and inactive rows from database and have to decide it has to be indexed or deleted. Most of the times record will get modified or deleted in database by End user, from UI or DB team at backend. So record should be in sync between DB and elastic search hence i am trying to achieve this using logstash

I have included the model config which will do incremental indexing but it's not for nested document approach, it's direct indexing of all the retrieved rows.

In our case we have to do incremental indexing for above config because scenario is quite different because we are indexing documents as nested using JDBC Streaming. Hope you will be able to understand

input {
jdbc {
jdbc_driver_library => "D:\Users\xxx\Desktop\driver\mssql-jdbc-7.4.1.jre12-shaded.jar"
jdbc_driver_class => "com.microsoft.sqlserver.jdbc.SQLServerDriver"
jdbc_connection_string => "jdbc:sqlserver://xxxx-J90JR4A\SQLEXPRESS:1433;databaseName=xxxx;"
jdbc_user => "xxxx"
jdbc_password => "xxxx"
jdbc_paging_enabled => true
tracking_column => modified_date
use_column_value => true
clean_run => true
tracking_column_type => "timestamp"
schedule => "*/1 * * * *"

statement => "Select pl.Policyholdername,pl.Age,pl.Dob,pl.Client_Address,cl.claimnumber,cl.claimtype,cl.is_active,cl.Modified_date from claim cl
inner join Policy pl on pl.Policynumber=cl.policynumber
where cl.Modified_date >:sql_last_value"

last_run_metadata_path => "D:\Users\xxxx\Desktop\logstash-7.5.2\jdbc_lastrun\jdbc_last_run.txt"
jdbc_default_timezone => "UTC" 
}
}
filter {
if [is_active] {
        mutate {    
            add_field => {
                "[@metadata][elasticsearch_action]" => "index"
            }
        }
        mutate {
            remove_field => [ "is_active","@version","@timestamp" ]
        }
       
    } else {
        mutate {    
            add_field => {
                "[@metadata][elasticsearch_action]" => "delete"
            }
        }
        mutate {
            remove_field => [ "is_active","@version","@timestamp" ]
        }
    } 
}
output {
elasticsearch {
hosts => "https://e5a4a4a4de7940d9b12674d62eac9762.eastus2.azure.elastic-cloud.com:1234"
user => "elastic"
password => "xxxxx"
index => "xxxxx"
action => "%{[@metadata][elasticsearch_action]}"
document_type => "_doc"
document_id => "%{claimnumber}"

}
stdout { codec => rubydebug }
}

Are there any reason why elasticsearch should delete the data when its inactive? If the source keep it and just tagged it as inactive, cant the elasticsearch do the same? I dont think incremental with the same document_id will work. Use upsert instead.

            document_id => "%{claimnumber}"
            doc_as_upsert => true
            action => "update"

Then API's could filter out inactive ones

GET xxxxx/_search
{
  "query": {
    "bool": {
      "filter": {
        "term": {
          "is_active.keyword": "true"
        }
      }
    }
  }
}

Hi @inhinyera16

Let me check with my team on this and keep you posted

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.