How to recover streaming data even if the connection lost with Python client Elasticsearch?

I stream RESTful API data from https://www.n2yo.com/api/ which for tracking satellite positions. I use python client with Elasticsearch. I save the streamed data to ES every 10 seconds and visualized by Kibana. My ES vesrion is 6.4.3

My code is: 

URL = "https://www.n2yo.com/rest/v1/satellite/positions/25544/41.702/-76.014/0/2/&apiKey= your key"
es = Elasticsearch('http://ip:port',timeout=600)

settings = { "settings": {
                 "number_of_shards":1,
                  'number_of_replicas':0
                 },
      "mappings" : { 
           "document" : {
                "properties":{
                    "geo": {
                       "type": "geo_point"
                            }
                          }
                        } 
                     } 
                  }
try:
 es.indices.create(index = "spacestation", body=settings)
except RequestError as es1:
 print('Index already exists!!')
 sys.exit(1)

def collect_data():
  data = requests.get(url = URL).json() 
  del data['positions'][1]
  new_data = {'geo':{'lat':data['positions'][0]['satlatitude'],
               'lon':data['positions'][0]['satlongitude']}, 
                'satname': data['info']['satname'], 'satid': data['info']['satid'], 
                  'timestamp':datetime.fromtimestamp(data['positions'][0]['timestamp']).isoformat()        
              }

  es.index(index='spacestation', doc_type='document', body=new_data)

schedule.every(10).seconds.do(collect_data)

while True:
  schedule.run_pending()
 time.sleep(1)

My question is: Yesterday I lost the connection. The error is as below,

> requests.exceptions.ConnectionError: HTTPSConnectionPool(host='www.n2yo.com', port=443): Max retries exceeded with url: /rest/v1/satellite/positions/25544/41.702/-76.014/0/2/&apiKey= (Caused by NewConnectionError(': Failed to establish a new connection: [Errno -3] Temporary failure in name resolution',))

When I re-run my code, I can't because the index is already exist. If I delete the index I will lost my data which already in ES. What I can do? I need to keep my saved data and I need to run the job from now. Any solutions please? 

Hi @New_user

your problem is here:

try:
 es.indices.create(index = "spacestation", body=settings)
except RequestError as es1:
 print('Index already exists!!')
 sys.exit(1)

you use es.indices.create which stop the script f the index already exist.... If the index already exist you can just pass i.e. replace sys.exit(1) by pass. This way your script can continue collecting data.

Here the updated code:

try:
 es.indices.create(index = "spacestation", body=settings)
except RequestError as es1:
 print('Index already exists!!')
 pass

I think your solution can be improved using filebeat let me explain.
If your python script only copy data to a log file that you can rotate with logrotate (if you are under linux).
Then you use filebeat to send the data to elastic that can handle retry and lot of crazy things out of box. This way your python script only focus on reading data from the satellite position.
It's just an alternative as here you lost connection to n2yo.com but you can also lost connection to elastic, with filebeat you don't need to handle this...

Thanks for the reply, but I need to handle this with python script. I need to re run the code if any connection lost and save the data to the same index in Elasticsearch. Pls can you help me out this?

yes I can help.

If you read the previous message there's some code changes did you try ?

if you replace sys.exit(1) by pass, your script will not stop and you can rerun your script.

Bonjour,
Yes I did as you said. Here is my new code:

URL = "https://www.n2yo.com/rest/v1/satellite/positions/25544/41.702/-76.014/0/2/&apiKey= my key"

es = Elasticsearch('http://ip:port',timeout=600)

def create_index():
  settings = { "settings": {
                 "number_of_shards":1,
                  'number_of_replicas':0
                 },
      "mappings" : { 
           "document" : {
                "properties":{
                    "geo": {
                       "type": "geo_point"
                            }
                          }
                        } 
                     } 
                  }

  if not es.indices.exists(index = "spacestation"):
    es.indices.create(index = 'spacestation', body=settings)
  else:
    pass

def collect_data():
  try:
    data = requests.get(url = URL).json()
    create_index()
    del data['positions'][1]
    new_data = {'geo':{'lat':data['positions'][0]['satlatitude'],
               'lon':data['positions'][0]['satlongitude']}, 
                'satname': data['info']['satname'], 'satid': data['info']['satid'], 
                  'timestamp':datetime.fromtimestamp(data['positions'][0]['timestamp']).isoformat()}
    es.index(index='spacestation', doc_type='document', body=new_data)
  except:
    collect_data()

schedule.every(10).seconds.do(collect_data)

while True:
  schedule.run_pending()
  time.sleep(1) 

I'm getting the data to ES. But when I do Kibana Dashboard I'm getting strange results. I don't know why? At one time the total no of positions become 60 and then after 10 secs it became 59, then again 60 again 59 so on. I think total no of positions passed through should be increased with the time.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.