Hi @stephenb,
Thanks for confirmation!!!
The normal way to do this would be on ingest.
I checked by enriching data during ingest in logstash pipeline but was unable to figure it out.
So I made two indexes at the output of logstash pipeline "type1" aka the main index which will contain "Sender_id", "Mobile_Number" field, "type2" index will contain "STAT" field. Both indexes will contain the "MSGID" field as common.
Logstash Output-
output{
stdout {codec => rubydebug}
if [Sender_id] {
elasticsearch {
hosts => ["<>"]
index => "type1" #type1 log
user => "<>"
password => "<>"
}
}
else if [Mobile_Number] {
elasticsearch {
hosts => ["<>"]
index => type1" #type1 log
user => "<>"
password => "<>"
}
}
else{
elasticsearch {
hosts => ["<>"]
index => "type2" #type2 log
user => "<>"
password => "<>"
}
}
}
The python script is enriching the "type1" index by checking on the "MSGID" field and taking the "STAT" field from the type2 index.
py script-
from elasticsearch import Elasticsearch, RequestsHttpConnection, Urllib3HttpConnection, ElasticsearchException
import json
from elasticsearch.connection.http_urllib3 import VERIFY_CERTS_DEFAULT
from elasticsearch.exceptions import ConnectionTimeout
from flask import request, Flask, Response, json
from flask_cors import CORS
import pandas as pd
import string
app = Flask(__name__)
CORS(app)
es_host = "<>"
es_port = "<>"
es_uname = "<>"
es_pwd = "<>"
es_type1 = "type1"
es_type2 = "type2"
def makeConnection():
elastic = Elasticsearch([{'host': es_host, 'port': es_port}], http_auth=(
es_uname, es_pwd), scheme="https", verify_certs=False)
return elastic
elastic = makeConnection()
def enrichment_data(message_id):
data = elastic.search(index="type2", body={
"size": 10000,
"sort": {"event_timestamp": "desc"},
"query":
{
"match": {
"MSGID.keyword": str(message_id)
}
}})
if data['hits']['total']['value']!=0:
stat = data['hits']['hits'][0]['_source']['STAT']
metadata = {"STAT": stat
}
else:
metadata={}
pass
return metadata
def enrich_data():
get_data = elastic.search(index=type1, body={
"size":10000,
"query": {
"bool": {
"must": [
{
"exists": {
"field": "MSGID"
}
}
]
}
}
}
)
if get_data['hits']['total']['value'] != 0:
for data in get_data['hits']['hits']:
message_id = data['_source']['MSGID']
metadata = enrichment_data(message_id)
try:
data['_source']['STAT'] = metadata['STAT']
except:
pass
body_json = data['_source']
elastic.index(index=data['_index'], doc_type=data['_type'],
id=data['_id'], body=body_json)
elastic.indices.refresh(index=data['_index'])
return
if __name__ == '__main__':
enrich_data()
Pros-
-
"STAT" field is getting enriched in type1 index successfully. The script can be scheduled so that enrichment can be automated.
-
Pie chart Viz is created and working fine.
Cons-
- As a single doc of the "MSGID" field is getting checked in both index and enriching it, speed is very slow as 400 docs/min enrichment speed. We have 1m logs getting ingested every hour so building viz on enriched data in real-time is not possible and generating alerts is not possible as we are building alerts on type1 index based on "STAT" and "Sender_id" field.
I checked with bulk update (batch update) using scroll API and Bulk Api helperI but it's not working on the above script currently.
Any suggestions on the above will be highly helpful.
Note- I worked with enrich processor in past, reindexing needs to be done to get enriched data,which is cannot be use-case here,
Thank you !!!