Preprocess data with python

Hi Everyone,
I'm quite new to the Elastic suite. My current situation is:

Data --> Logstash (with filters) --> Elasticsearch --> Kibana

I would like to process data with a python script (ML Model) before store it into Elasticsearch.
What is the best method to do it? I currently use Elastic Stack 7.5.0 version.

Thanks

Look at the python elasticsearch module helpers bulk

I just build a list of events: (This is in an outer loop using the elasticsearch_dsl to read another index merging items)

   es_row = {  "_index": "rollup-ironport-spf-"+datetime.today().strftime('%Y.%m.%d'),
               "_type": "doc",
               "_id": mid_id,
               "@timestamp": spf_timestamp,
               "event_timestamp": hit["syslog_timestamp"],
               "mid": mid_id,
               "icid": icid_id,
            }

   s = Search(using=client, index="syslog-ironport-*") \
       .query("match", message=icid_id).extra(size=100) \
       .filter("range", ** {'@timestamp': {'gte': 'now-1d', 'lt': 'now'}}) \

   icid_results = s.execute()

   for icid in icid_results.hits:
       if "From:" in icid["cisco_message"] and mid_id in icid["cisco_message"]:
#          print("....", icid["syslog_timestamp"], icid["cisco_message"])
           es_row.update(from_grok.match(icid["cisco_message"]))

       if "To:" in icid["cisco_message"] and mid_id in icid["cisco_message"]:
#          print("....", icid["syslog_timestamp"], icid["cisco_message"])
           es_row.update(to_grok.match(icid["cisco_message"]))

       if "dns host" in icid["cisco_message"]:
#          print("....", icid["syslog_timestamp"], icid["cisco_message"])
           es_row.update(icid_grok.match(icid["cisco_message"]))

   es_out.append(dict(es_row))

Then send to elastic:

spf_index = bulk(client, es_out)

Thank Len, this is a solution I was looking for.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.