Python to ingest data in to ES, reading from kafka

I want to read kafka topic and insert that data in to elasticsearch

anyone has basic python script for it.

I am newbee to python

connection to es is alredy established with this code

from elasticsearch import Elasticsearch
  es = Elasticsearch(
      http_auth=('xxx', 'xxx'),
  print "Connected",

python code for kafka connection is also ready. but do not know how to import that in loop to elasticsearch

my python output from kafka is something like this

Message key : None
Message value: {
"machine": "556c01",
"queue": "v556c01",
"command": 1319072427,
"action": "DELETE",
"timestamp": 1580139259825

How do I push this in to elasticsearch?

Anyone using python to do this?
May be I posted this in wrong category?

I highly doubt that no one is using python to ingest data in to elasticsearch.


I think there are some examples in the client documentation. Have a look at

I have come across on that document but can't figure out how to do this.
plus very new to python as well (which I know does not help)

This code prints message from kafka but don't know how to write to elasticsearch

   consumer = KafkaConsumer(TOPIC_NAME, bootstrap_servers=KAFKA_BOOTSTRAP_SERVERS, auto_offset_reset="latest", enable_auto_commit =False, group_id=None)

    reader_schema = avro_schema.Parse(schema_str)

    for message in consumer:
            writer_version = message.value[0]
        payload = message.value[1:]
         writer_schema = reader_schema
        decoded_value = DatumReader().read_data(writer_schema, reader_schema, BinaryDecoder(BytesIO(payload)))
        print(json.dumps(decoded_value, indent=0))

This is output
"machine": "556c01",
"queue": "q556c01",
"commandid": 1308095223,
"action": "DELETE",
"timestamp": 1579727117409

I can connect to elasticsearch
es = Elasticsearch( ['elk01'], http_auth=('xxxx', 'xxxx'), port=9200,)

but how to put this read messages back in to elk. can't get my head around.

I'm not a Python developer so I don't think I can help here. May be @Emanuil could help?

Maybe my answer to this post will help.

Thank you guys

I tested inserting one document manually and it works.
Still lot of testing needs to be done.
so far connection to es done, pulling out record from es done, insert to es done, update to es done.

es.create(index="sachin_quick_test", id=doc['num'], body=doc, doc_type='_doc')

I will post full result once all completed in week or so. that way someone else can use it.

Alright here is what I have so far working. incase someone else is looking for

es = Elasticsearch('elktst01', http_auth=('xxxx', 'xxxx'), port=9200)

res=es.index(index="sachin_quick_test", id=doc['num'], body=doc, doc_type='_doc')

res=es.delete(index="sachin_quick_test", id='2', doc_type='_doc')

res=es.create(index="sachin_quick_test", id=doc['num'], body=doc, doc_type='_doc')

res ="sachin_quick_test", body={"query": {"match_all": {}}})
print("Got %d Hits:" % res['hits']['total']['value'])
for hit in res['hits']['hits']:
    print("%(@timestamp)s:  %(field1)s: %(num)s" % hit["_source"])

problem is any date field I insert via python ELK thinks it is local time and converts it to UTC.
I have some date that I would like ELK to convert
but I have some date field that are already converted to UTC and would like to save it. but it does not do that and moves the time.


alright more update
figure out how to manipulate date and timezone

import pendulum
doc = {
'first': '2020-01-31 15:40:12',
'num': '2',
'timestamp': 1579624522874,

tz = pendulum.timezone('America/Chicago')
d = pendulum.parse(doc.get('first'))
d_timezone = pendulum.from_format(doc.get('first'), 'YYYY-MM-DD HH:mm:ss', tz=tz)

two different date d=as is, d_timezone with timezone

reassign that string will become date

create new field called second with new date field with timezone

now once I figure out how to push that kafka message back to elasticseach I will update in bulk

Tell me who is using python, i need help!

If you have a question, please open a new thread with your concerns.

inserting data is working as well with either es.index or es.create

closing this topic for now.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.