Logstash update doc fields if exits

Blockquote logstash to update doc fields if exits or insert
new to ELK stack, sorry if any thing not as per forum
I am sending kafka stream of events logs, there was two types of logs, open-event log and close-event log. support required for logstash configuration to update doc if exits or insert. both the logs having same fields, open-event log is with eventsatus as 'Open' and close-event with 'Close', having same one field ('CAM_ID').

OPEN EVENT fields

           "Severity" : "PS_MAJOR",
          "LSNExt_AckStatus" : "ALM_UNACKNOWLEDGED",
          "@timestamp" : "2021-12-16T17:08:21.479Z",
          "nativeProbableCause" : "Disconnection",
          "LSNExt_AssignedPortLabel" : "",
          "emsTime" : "2021-12-16 22:00:05",
          "l" : "",
          "CAM_ID" : "9195026",
          "PTP" : "",
          "probableCause" : "EMS",
          "CTP" : "",
          "EMS_DN" : "LSN/EMS_BG-40_70",
          "LINKSTATUS" : "Down",
          "j" : "",
          "LSNExt_MEName" : "SONATA SOFTWARE PVT LTD",
          "k" : "",
          "slot" : "",
          "EMS_notifId" : "Not Relevant",
          "SYSTEM_CLEARTIME" : "",
          "CLEARTIME" : "",
          "notificationId" : "5203582",
          "m" : "",
          "NativeEMSName" : "SONATA SOFTWARE PVT LTD",
          "serviceAffecting" : "SA_UNKNOWN",
          "EMSname" : "LSN/EMS_BG-40_70/5833",
          "SYSTEM_OPENTIME" : "2021-12-16 22:00:05",
          "OPENTIME" : "2021-12-16 22:00:05",
          "ObjectType" : "",
          "entity" : "ISP",
          "eventstatus" : "Open",
          "circle" : "KA",
          "neTime" : "2021-12-16 22:00:05",
          "@version" : "1"

CLOSE EVENT fields

           "Severity" : "PS_CLEARED", 
          "LSNExt_AckStatus" : "ALM_UNACKNOWLEDGED", 
          "@timestamp" : "2021-12-16T17:30:11.094Z",
          "nativeProbableCause" : "Disconnection",
          "LSNExt_AssignedPortLabel" : "",
          "emsTime" : "2021-12-16 22:23:45",
          "l" : "",
          "CAM_ID" : "9195026",
          "PTP" : "",
          "probableCause" : "EMS",
          "CTP" : "",
          "EMS_DN" : "LSN/EMS_BG-40_70",
          "LINKSTATUS" : "Up",
          "j" : "",
          "LSNExt_MEName" : "SONATA SOFTWARE PVT LTD",
          "k" : "",
          "slot" : "",
          "EMS_notifId" : "",
          "SYSTEM_CLEARTIME" : "2021-12-16 22:23:45",
          "CLEARTIME" : "2021-12-16 22:23:45",
          "notificationId" : "5209303",
          "m" : "",
          "NativeEMSName" : "SONATA SOFTWARE PVT LTD",
          "serviceAffecting" : "SA_UNKNOWN",
          "EMSname" : "LSN/EMS_BG-40_70/5833",
          "SYSTEM_OPENTIME" : "",
          "OPENTIME" : "",
          "ObjectType" : "",
          "entity" : "ISP",
          "eventstatus" : "Close",
          "circle" : "KA",
          "neTime" : "2021-12-16 22:23:45",

want to update already exits doc with same 'CAM_ID', if close-event comes.

fields to be updated in open-event

  1. eventstatus would be 'Close'
  2. SYSTEM_CLEARTIME
  3. CLEARTIME

logstash.conf

input {
  kafka {
        group_id => "2129"
        topics => ["ecievents"]
        bootstrap_servers => "localhost:9092"
        codec => json
        tags => ["ecievents"]
  }
    kafka {
        group_id => "2129"
        topics => ["tejasevents"]
        bootstrap_servers => "localhost:9092"
        codec => json
        tags => ["tejasevents"]
    }
}
output {
    if "ecievents" in [tags]{
     elasticsearch {
         hosts => ["localhost:9200"]
         document_type => "_doc"
         index => "ecievents"
     }
      stdout { codec => rubydebug
           }
   }
   else if "tejasevents" in [tags]{
    elasticsearch {
          hosts => ["localhost:9200"]
          document_type => "_doc"
          index => "tejasevents"
      }
      stdout { codec => rubydebug
      }
    }
}

Change your Elasticsearch action type to "update" and define a _id field manually.

FYI: Doing updates in Elasticsearch has 3x the IO.. so if this happens a lot then you need to be sure that its something your system can handle.

 elasticsearch {
         hosts => ["localhost:9200"]
         document_type => "_doc"
         index => "ecievents"
         action => "update"
         doc_as_upsert => true
         document_id => "%{CAM_ID}"
         #CLEARTIME => "%{CLEARTIME}"
         #LINKSTATUS => "%{LINKSTATUS}"
     }

Thanks Andreas,

is this correct? while specifying CLEARTIME and LINKSTATUS its showing error.

[ERROR][logstash.outputs.elasticsearch] Unknown setting 'CLEARTIME' for elasticsearch
[ERROR][logstash.outputs.elasticsearch] Unknown setting 'LINKSTATUS' for elasticsearch

I don't know what CLEARTIME or LINKSTATUS are supposed to be. Are those fields in your data?

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.