Updating data in CSV logstash is pushing the entire CSV file again with updated data which duplicates my records in index. But I just want to sync the data

input{

file {
path => "/home/elastic/elk/logstash-6.4.3/csv_data/attrition_dump_v3.csv"
start_position => "beginning"
sincedb_path => "/dev/null"
}

file {
path => "/home/elastic/elk/logstash-6.4.3/csv_data/workday_dump_v3.csv"
start_position => "beginning"
sincedb_path => "/dev/null"
}

}

filter{

if [path] == "/home/elastic/elk/logstash-6.4.3/csv_data/attrition_dump_v3.csv"
{
    csv {
    
    columns =>  ["Emp id","Full Name","Original Hire date","Hire Date","Last Day of Work","Quarter","Tenure","Tenure in years","Termination Reason","Reason","Vol/Invol",
    "Position Title","Grade","Cost Center - Name","Mgr Name","Goal","Competency","Leader","Kumar -1","BU","Team"] 
    separator => ","
    }

    mutate {
    add_field => { "doc_type" => "attrition" }
    add_field => {"id" => ""}
    copy => {"Emp id" => "id" }
    remove_field => [ "message" ]
    }
}

else if [path] == "/home/elastic/elk/logstash-6.4.3/csv_data/workday_dump_v3.csv"
{
    csv {
    columns =>  ["Employee ID","Employee","Last,First Name","Email - Primary Work","Hire Date","Original Hire Date","Is Rehire","Years of Service","Company Service Date",
    "Continuous Service Date","Seniority Date","Time in Job Profile","Time in Job Profile Start Date","Time in Position","Position","Job Title","Job Profile",
    "Grade Profile ID","Grade","Grade Effective Date","Employment Status","Leave Type","Employee Type","Worker Type","Full/Part","Reg/Temp","Worker SubType","Exempt/Non-Exempt","Pay Rate Type","Scheduled Std Hours - Calculated FTE","Location Std Hours","Default Weekly Hours","FTE","Cost Center - ID","Cost Center - Name","HFM-Code","HFM-Function","HFM-SubFunction","Profit Center","Product Code","IES/Novella","Project ID","Project Description","Tech/Non-Tech","Client Facing Y/N","Worker's Business Unit","HR BU","SBU","Finance BU","Department","FM Entity","Custom 1","Custom 2","HR Category","Company","Location Code","Location","City","State","Country Name","Mature/Emerging","Geo Region","No. of Directs","Manager ID","Manager Name","Tier 1","Tier 2","Tier 3","Tier 4","Tier 5","Tier 6","Tier 7","Last Base Pay Increase - Date","Last Base Pay Increase Reason","Total Pay - Amount","Total Base Pay - Amount","Total Base Pay (Base or Basis) Local","Hourly Rate - Amount","Total Base Pay - Frequency","Total Base Pay - Currency","Total Base Pay in USD","Total Base Pay (Base or Basis) USD","Pay Range - Minimum","Pay Range - Midpoint","Pay Range - Maximum","Compa Ratio (Base or Basis)","Compa Ratio Bucket","VC Plan ID","VC Plan Name","Target Bonus - Percent","Target Bonus - Amount","Target Bonus - Currency","Target Bonus Amount in USD","CS Summary Role","Billable Stat","Job Family","Job Family Group","Competency Rating (2017/18)","Goal Rating (2017)","Competency Rating (2016/17)","Goal Rating (2016)","Competency Rating (2015/16)","Goal Rating (2015)","Legacy Organization"]
    separator => ","
    }

    mutate {
    add_field => { "doc_type" => "workday" }
    add_field => {"id" => ""}
    copy => {"Employee ID" => "id" }
    remove_field => [ "message" ]
    }

}
uuid {
target => "uuid"

}
}

output {
elasticsearch
{
index => "xg_hr_details-000001"
action => "update"
document_id => "%{[Employee ID]}"
doc_as_upsert => "true"
hosts => "http://caruelsatic01p:9200/"
}
}

Two things. Firstly, you are referencing "Employee ID" as the document_id, but that does not exist for your attrition records.

Secondly...

mutate {
    [...]
    add_field => {"id" => ""}
    copy => {"Emp id" => "id" }
    [...]
}

A mutate filter performs operations in a fixed order, and add_field comes after copy. So this will copy "Emp id" to id, then it will add the string "" to id, resulting in an array. Just remove the add_field.

Then you probably want to reference id in the document_id option on the output.

Hi Badger ,

Thanks , for the suggestions i have implemented the changes suggested by you please , find my latest logstash config . As while updating i am getting duplicate data . Whenever i make changes in my CSV the data is getting pushed thrice...
input{

file {
path => "/home/elastic/elk/logstash-6.4.3/csv_data/attrition_dump_v3.csv"
start_position => "beginning"
sincedb_path => "/dev/null"
}

file {
path => "/home/elastic/elk/logstash-6.4.3/csv_data/workday_dump_v3.csv"
start_position => "beginning"
sincedb_path => "/dev/null"
}

}

filter{

if [path] == "/home/elastic/elk/logstash-6.4.3/csv_data/attrition_dump_v3.csv"
{
    csv {
    
    columns =>  ["Emp id","Full Name","Original Hire date","Hire Date","Last Day of Work","Quarter","Tenure","Tenure in years","Termination Reason","Reason","Vol/Invol",
    "Position Title","Grade","Cost Center - Name","Mgr Name","Goal","Competency","Leader","Kumar -1","BU","Team"] 
    separator => ","
    }

    mutate {
    add_field => { "doc_type" => "attrition" }
    copy => {"Emp id" => "id"}
    add_field => {"id" => ""}
    split => ["id", ","]
    add_field => { "ID" => "%{id[0]}"}
    remove_field => [ "message" ]
    }
}

else if [path] == "/home/elastic/elk/logstash-6.4.3/csv_data/workday_dump_v3.csv"
{
    csv {
    columns =>  ["Employee ID","Employee","Last,First Name","Email - Primary Work","Hire Date","Original Hire Date","Is Rehire","Years of Service","Company Service Date",
    "Continuous Service Date","Seniority Date","Time in Job Profile","Time in Job Profile Start Date","Time in Position","Position","Job Title","Job Profile",
    "Grade Profile ID","Grade","Grade Effective Date","Employment Status","Leave Type","Employee Type","Worker Type","Full/Part","Reg/Temp","Worker SubType","Exempt/Non-Exempt","Pay Rate Type","Scheduled Std Hours - Calculated FTE","Location Std Hours","Default Weekly Hours","FTE","Cost Center - ID","Cost Center - Name","HFM-Code","HFM-Function","HFM-SubFunction","Profit Center","Product Code","IES/Novella","Project ID","Project Description","Tech/Non-Tech","Client Facing Y/N","Worker's Business Unit","HR BU","SBU","Finance BU","Department","FM Entity","Custom 1","Custom 2","HR Category","Company","Location Code","Location","City","State","Country Name","Mature/Emerging","Geo Region","No. of Directs","Manager ID","Manager Name","Tier 1","Tier 2","Tier 3","Tier 4","Tier 5","Tier 6","Tier 7","Last Base Pay Increase - Date","Last Base Pay Increase Reason","Total Pay - Amount","Total Base Pay - Amount","Total Base Pay (Base or Basis) Local","Hourly Rate - Amount","Total Base Pay - Frequency","Total Base Pay - Currency","Total Base Pay in USD","Total Base Pay (Base or Basis) USD","Pay Range - Minimum","Pay Range - Midpoint","Pay Range - Maximum","Compa Ratio (Base or Basis)","Compa Ratio Bucket","VC Plan ID","VC Plan Name","Target Bonus - Percent","Target Bonus - Amount","Target Bonus - Currency","Target Bonus Amount in USD","CS Summary Role","Billable Stat","Job Family","Job Family Group","Competency Rating (2017/18)","Goal Rating (2017)","Competency Rating (2016/17)","Goal Rating (2016)","Competency Rating (2015/16)","Goal Rating (2015)","Legacy Organization"]
    separator => ","
    }

    mutate {
    add_field => { "doc_type" => "workday" }
    copy => {"Employee ID" => "id"}
    add_field => {"id" => ""}
    split => ["id", ","]
    add_field => { "[ID]" => "%{id[0]}"}
    remove_field => [ "message" ]
    }

}
uuid {
target => "uuid"

}
}

output {

if [document_id] {
elasticsearch
{
index => "xg_hr_details-000001"
action => "update"
document_id => "%{[ID]}"
doc_as_upsert => "true"
hosts => "http://caruelsatic01p:9200/"
}
}
else {
elasticsearch
{
index => "xg_hr_details-000001"
hosts => "http://caruelsatic01p:9200/"
}
}
}

Please , find the kibana discover image to see duplicate logs. Circled RED document is original document , i have made change in Name . Then it pushed thrice....
Please, provide suggestion for avoiding duplicates.

In your output section you are testing for the existence of the [document_id] field, which does not exist, so it will always go through the else section, which unconditionally creates a new document.

Can you suggest my output filter for that ..... ?

I have used ID but, still duplicates are getting inserted

output {

if [ID] {
elasticsearch
{
index => "xg_hr_details-000001"
action => "update"
document_id => "%{[ID]}"
doc_as_upsert => "true"
hosts => "http://caruelsatic01p:9200/"
}
}
else {
elasticsearch
{
index => "xg_hr_details-000001"
hosts => "http://caruelsatic01p:9200/"
}
}
}

If you start over with a fresh index and run logstash twice, what does a single document in the index look like. Copy it from the JSON tab in Kibana.

I have the index setting - refresh_interval to 1 sec. Logstash i am running it as a service. Whenever i am saving my CSV then it's getting pushed again.

I am having duplicate logs . I changed the name to Sumit test

{
"_index": "xg_hr_details-000001",
"_type": "doc",
"_id": "52202",
"_version": 8,
"_score": null,
"_source": {
"Last Day of Work": null,
"path": "/home/elastic/elk/logstash-6.4.3/csv_data/attrition_dump_v3.csv",
"ID": "52202",
"host": "carulogbdf01p",
"Competency": null,
"uuid": "8968604e-8b1b-4582-8a83-ca3654b34666",
"Mgr Name": null,
"id": [
"52202",
""
],
"Kumar -1": null,
"@version": "1",
"@timestamp": "2019-04-04T12:52:30.944Z",
"Vol/Invol": null,
"Original Hire date": "26-Apr-18",
"Tenure in years": null,
"Position Title": null,
"Reason": null,
"doc_type": "attrition",
"Cost Center - Name": null,
"Tenure": null,
"Grade": null,
"Team": null,
"Termination Reason": null,
"Leader": null,
"Emp id": "52202",
"Hire Date": "26-Apr-18",
"Goal": null,
"BU": null,
"Quarter": null,
"Full Name": "Sumit test"
},
"fields": {
"avg_HC": [
787
],
"vol_attr_count": [
0
],
"@timestamp": [
"2019-04-04T12:52:30.944Z"
]
},
"highlight": {
"Full Name": [
"@kibana-highlighted-field@Sumit@/kibana-highlighted-field@ test"
],
"doc_type": [
"@kibana-highlighted-field@attrition@/kibana-highlighted-field@"
]
},
"sort": [
1554382350944
]
}

You are setting the document id, so you should not be getting duplicates. You will get a new version of the record every time you restart logstash.

Yes, I know .. it should happen but it's not happening....

Logstash is running as a service , so whenever i make any changes or simply save it whole data of csv is getting pushed...

You have told logsash not to preserve state across restarts, so that is expected.

After removing - start_position => "beginning" sincedb_path => "/dev/null"
also whole CSV data is getting pushed .... can you please provide any sample config for updating and syncing of data for CSV.

Hi Badger,

Different issue that i came across is i am reading a file from windows folder

file {
path => "C:\Users\Public\Documents\Elastic\logstash-6.3.0\csv_data\attrition_dump*.csv"
}

My CSV file will keep on changing so for that i used * but, data of that csv is not getting pushed but when i give full name everything works fine.

Please, suggest the appropriate pattern for reading the file from windows directory

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.