Delete Duplicate Documents with Elasticsearch and Ruby

I use a script ruby for duplicate Document

input {
    beats {
    port => "5044"
    }
}    
              
filter {
    csv {
        separator => ","
		columns => ["chaine", "job", "date_plan", "statut", "date_debut", "date_fin", "serveur", "numero_passage", "application", "sous_application"]
}

ruby {
 code => 'require 'elasticsearch'
client = Elasticsearch::Client.new

begin
  # find duplicate documents by @timestamp
  result = client.search(
    index: 'hello', 
    body: {
      aggs: {
        duplicateCount: {
          terms: {
            field: "@timestamp",
            "min_doc_count": 2,
            size: 100
          },
          aggs: {
            duplicateDocuments: {
              top_hits: {}
            }
          }
        }
      }
    }
  )['aggregations']['duplicateCount']['buckets'].map do |bucket|
    #use the first document of the duplicates
    bucket['duplicateDocuments']['hits']['hits'].first
  end

  result.each do |doc|
    client.delete(index: doc['_index'], type: doc['_type'], id: doc['_id'])
  end
  client.indices.refresh(index: 'hello')
end until result.count <= 0'
  }

date {
match => [ "date_plan" , "YYYY-MM-dd" ]
timezone => "Europe/Paris"
}

date {
match => [ "date_debut" , "YYYY-MM-dd HH:mm:ss" ]
timezone => "Europe/Paris"
}

date {
match => [ "date_fin" , "YYYY-MM-dd HH:mm:ss" ]
timezone => "Europe/Paris"
}
mutate {
convert => { "numero_passage" => "integer" }
}

}

output {
  elasticsearch { 
  hosts => "http://localhost:9200" 
  index => "hello"
  }
stdout {codec => rubydebug}

}

I have this error when i start logstash

Thread.exclusive is deprecated, use Thread::Mutex
Sending Logstash logs to C:/Users/h83710/Desktop/elastic/logstash-7.5.2/logs which is now configured via log4j2.properties
[2020-05-07T09:55:54,643][WARN ][logstash.config.source.multilocal] Ignoring the 'pipelines.yml' file because modules or command line options are specified
[2020-05-07T09:55:54,989][INFO ][logstash.runner          ] Starting Logstash {"logstash.version"=>"7.5.2"}
[2020-05-07T09:55:58,702][ERROR][logstash.agent           ] Failed to execute action {:action=>LogStash::PipelineAction::Create/pipeline_id:main, :exception=>"LogStash::ConfigurationError", :message=>"Expected one of [ \\t\\r\\n], \"#\", \"{\", \"}\" at line 14, column 23 (byte 287) after filter {\n    csv {\n        separator => \",\"\n\t\tcolumns => [\"chaine\", \"job\", \"date_plan\", \"statut\", \"date_debut\", \"date_fin\", \"serveur\", \"numero_passage\", \"application\", \"sous_application\"]\n}\n\nruby {\n    code => 'require '", :backtrace=>["C:/Users/h83710/Desktop/elastic/logstash-7.5.2/logstash-core/lib/logstash/compiler.rb:41:in `compile_imperative'", "C:/Users/h83710/Desktop/elastic/logstash-7.5.2/logstash-core/lib/logstash/compiler.rb:49:in `compile_graph'", "C:/Users/h83710/Desktop/elastic/logstash-7.5.2/logstash-core/lib/logstash/compiler.rb:11:in `block in compile_sources'", "org/jruby/RubyArray.java:2584:in `map'", "C:/Users/h83710/Desktop/elastic/logstash-7.5.2/logstash-core/lib/logstash/compiler.rb:10:in `compile_sources'", "org/logstash/execution/AbstractPipelineExt.java:156:in `initialize'", "org/logstash/execution/JavaBasePipelineExt.java:47:in `initialize'", "C:/Users/h83710/Desktop/elastic/logstash-7.5.2/logstash-core/lib/logstash/java_pipeline.rb:27:in `initialize'", "C:/Users/h83710/Desktop/elastic/logstash-7.5.2/logstash-core/lib/logstash/pipeline_action/create.rb:36:in `execute'", "C:/Users/h83710/Desktop/elastic/logstash-7.5.2/logstash-core/lib/logstash/agent.rb:326:in `block in converge_state'"]}
[2020-05-07T09:55:59,588][INFO ][logstash.agent           ] Successfully started Logstash API endpoint {:port=>9600}
[2020-05-07T09:56:04,354][INFO ][logstash.runner          ] Logstash shut down.

don’t mix single quote with double quotes. if you want to use single quotes inside your ruby code, wrap the code in double quotes.

the log pointes this out, syntax error on line 14.

I correct the script but I have this error :

input {
    beats {
    port => "5044"
    }
}    
              
filter {
    csv {
        separator => ","
		columns => ["chaine", "job", "date_plan", "statut", "date_debut", "date_fin", "serveur", "numero_passage", "application", "sous_application"]
}

ruby {
code => "require 'elasticsearch'
client = Elasticsearch::Client.new

begin
  # find duplicate documents by @timestamp
  result = client.search(
    index: 'hello', 
    body: {
      aggs: {
        duplicateCount: {
          terms: {
            field: '@timestamp',
            'min_doc_count': 2,
            size: 100
          },
          aggs: {
            duplicateDocuments: {
              top_hits: {}
            }
          }
        }
      }
    }
  )['aggregations']['duplicateCount']['buckets'].map do |bucket|
    #use the first document of the duplicates
    bucket['duplicateDocuments']['hits']['hits'].first
  end

  result.each do |doc|
    client.delete(index: doc['_index'], type: doc['_type'], id: doc['_id'])
  end
  client.indices.refresh(index: 'hello')
end until result.count <= 0"
 }

date {
match => [ "date_plan" , "YYYY-MM-dd" ]
timezone => "Europe/Paris"
}

date {
match => [ "date_debut" , "YYYY-MM-dd HH:mm:ss" ]
timezone => "Europe/Paris"
}

date {
match => [ "date_fin" , "YYYY-MM-dd HH:mm:ss" ]
timezone => "Europe/Paris"
}
mutate {
convert => { "numero_passage" => "integer" }
}

}

output {
  elasticsearch { 
  hosts => "http://localhost:9200" 
  index => "hello"
  }
stdout {codec => rubydebug}

}
       "hostname"[2020-05-07T13:47:19,814][ERROR][logstash.filters.ruby    ][main] Ruby exception occurred: [404] {"_index":"hello","_type":"_doc","_id":"tPP17nEBLThlt2QSpEDq","_version":7,"result":"not_found","_shards":{"total":2,"successful":1,"failed":0},"_seq_no":319,"_primary_term":1}
[2020-05-07T13:47:19,815][ERROR][logstash.filters.ruby    ][main] Ruby exception occurred: [404] {"_index":"hello","_type":"_doc","_id":"tPP17nEBLThlt2QSpEDq","_version":6,"result":"not_found","_shards":{"total" => :2,"successful":1,"failed":0},"_seq_no":318,"_primary_term":1}

i’m sorry, i’m not familiar with ruby elasticsearch module, but your error indicates 404 , or not found