Logstash - update CSV content changes into ES

Hi Team.

I have a user scenario that to maintain automation test cases in elk.

Step1(No Issue) - my colleges will send me first CSV file which containing below columns. And I use CSV filter to output them in to ES.
1. Test Case ID
2. Author
3. Feature
4. Function
5. Verification

Step2(has issue here) - after 1 moth, my colleages will send me second CSV file which contains the newly created test cases or existing test cases updated(for example, Verification and Function are updated.)

I want logstash to judge the Test Case ID existed in ES or not.

If test Case ID already existed in ES, then to overwrite the existing event .
If test case ID not found in ES, then to newly created one Event in ES.

How should I write the judet by IF condition in logstahs?

filter { 
          csv { columns => [  "Test Case ID",
                              "Author",
                              "Feature",
                              "Function",
                              "Verification"]
               separator => ","
			   skip_header => "true"
			   }   
	   
                   
        If Test Case ID in the CSV
        { overwrite the existing event in ES}
        else
        {Create a new event in ES}
 }

Don't do this using if-else in the filter section. Use an elasticsearch output, set document_id to your test case id (or a hash of it using a fingerprint filter) and set the doc_as_upsert option on the output

1 Like

Hi Badger,

Per the official user guide, it says that type of "document_id" is a string. So I add this configuration document_id => " TID". But in Kibana, the result is not expected.
1. The _id field value is TID, instead of 1 or 2.
2. only 2nd record is indexed to ES. Where is first 1 record?

  **Data:** 
  TID,Author,Module,Feature,Function,Verification,Creation Date
  1,Cherie Zhou,SCM,SOC,Nomination,successfully,2019-04-10
  2,Cherie Zhou,CAL,Analyzer,Analyzer,successfully,2019-04-09

    **configuration**              
     output {
        elasticsearch {
    	   action => "index"
    	   hosts  => "localhost:9200"
    	   index  => "testcase"
         manage_template => true
         template => "C:/elkstack/elasticsearch-6.5.1/indextemp/dtemplate.json"
         template_name=> "dtemplate"
         template_overwrite => true
         document_id => "Test Case ID"
         doc_as_upsert => true	 }              
    	stdout { codec => rubydebug {metadata => true}}
    }
document_id => "%{[Test Case ID]}"

Figure it out. It should be document_id => "%{TID}"

why there is "[" "]"? it could work as well without the square brackets.

Does it mean hash? If it's hash scenario, multiple fields can be putted in it. For example [TID, UID, EID]. How does it know from which field the value of _id should come from?

The square brackets are optional in your case. If you are referencing a field inside an object they are not optional, so if your event had a beat field that contains a hostname field, a sprintf reference to it would be %{[beat][hostname]}

Ok. Thank you. it’s the case of referencing to a nested field.

One more question. If I want the value of _id comes from the combination of TID and Author. Is the syntax like this?? document_id => β€œ%{TID}+%{Author}”

Yes, that will work.

If you need to do that with any more than those 2 switch to fingerprint...

fingerprint { source => [ "TID", "Author" ] target => "[@metadata][docid]" method => "MURMUR3" }

Then you can use document_id => %{[@metadata][docid]}

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.