Hi,
I am new to the ELK stack workflow, currently I am doing a project that pulls live feed from twitter every time a user requests and I want to display the results using Kibana. In my project instead of using Logstash to collect data I have used tweepy library in python, and then connected it to Elastic Search where I created an index to store my incoming data.
The problem:
The code is working but my problem is that the incoming doccuments are getting stored only if I give a new index name every other time my new set of live feeds come in. If I wanted to use the same index name to add all incoming documents to the existing ones the new documents are not updating.
I have attached my code snipped below:
def filterKeys(document):
return {key: document[key] for key in use_these_keys }
from elasticsearch import Elasticsearch
from elasticsearch import helpers
es_client = Elasticsearch(http_compress=True)
def doc_generator(df):
df_iter = df.iterrows()
#index, document = next(df_iter)
for index, document in df_iter:
yield {
"_index": 'twitter_req',
"_type": "twitter_twp",
"_id" : f"{document['ID']}",
"_source": filterKeys(document),
}
#raise StopIteration
helpers.bulk(es_client, doc_generator(twitter_feeds))
Can any of you kindly tell me where I am going wrong and how to make it so that incoming documents get automatically updated every time the code runs.