Keep data synchronize with Indices

I'm using Elasticsearch 7.9.0. and using ingress to process and enrich data.
I want to share data in indices Elasticsearch with another app. This app have api to collect data.

Have any way to keep sync data in indices with another app. I mean, new document insert in to indices, that can be put to that app (can be use logstash? ).
Or schedule query into indicies and push new data to that app.

I have idea to use logstash pipeline with input elasticsearch and output http. But i dont know how to check new document in indice with elasticsearch input and set schedule.

Thank !

Instead of pushing to elasticsearch you can may be push to something like Kafka then read that content to push both to elasticsearch and your other system?

I believe that you could also just use logstash for this, start an input http where you send your documents and 2 outputs one for elasticsearch and the other one for your other system (not sure which one to use though).

I knew that, but have some reason.

Currently, I'm using both of logstash and ingest ES to process document.
I have 4 logsource (difference data format) with 4 pipeline to process them. My mission is using all of data and push it to new index with new format (rename, remove, enrich ...etc) but still keep original data (after process by logstash). So my flow data : Raw data -> logstash (grok, mutate, enrich with es filter ..) -> es ingress pipeline ( reformat, rename, remove some field ...).
What I need is the documentation after running through es pipeline.
That why i can't use logstash to put event to another output.

Or maybe my flow data process have problem :expressionless:

Sample :

input {
  beats {
    port => 5044
    host => "0.0.0.0"
    client_inactivity_timeout => 100
    id => "tcp5044"
  }
}

filter { 
  grok {}
  mutate {}
  elasticsearch {} -> for enrich data
}

output {
   elasticsearch { index => original }
  if <condition > {
    elasticsearch {
         index=> formated-index
         pipeline => format-cts
    }
  }
}

ES ingest pipeline

{
  "description" : "Format data and remove some field are not necessary",
  "processors" : [
    {
      "set": {
        "field": "newfield.type",
        "value": "SOFTWARE"
      }
    },
    {
      "set": {
        "field": "newfield.field_name",
        "value": "{{data.name}}"
      }
    },
    {
      "set": {
        "field": "newfield.field1",
        "value": "{{data.version}}"
      }
    },
    {
      "set": {
        "field": "newfield.field1",
        "value": "{{data.type}}"
      }
    },
    {
      "uppercase": {
        "field": "field_L",
        "ignore_missing": true
      }
    },
    {
      "date" : {
        "field" : "timestamp",
        "target_field" : "@timestamp",
        "formats" : ["dd/MMM/yyyy:HH:mm:ss Z", "ISO8601"],
        "timezone" : "Asia/Ho_Chi_Minh",
        "ignore_failure" : true
      }
    },
    {
      "dot_expander": {
        "field": "http.request.uri"
      }
    },
    {
      "dot_expander": {
        "field": "http.response.body.bytes"
      }
    },
    {
      "dot_expander": {
        "field": "http.response.status_code"
      }
   {
      "remove" : {
        "field": ["useragent", "http", "log", "agent", "input", "ecs", "message", "type", "tags", "data", "tenant"],
        "ignore_missing": true
      }
    }
  ]
}

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.