Need help to sync between local instance and a remote one

Hello, Im working on a project which collects temperature and humidity data from 47 sensors every 10 seconds and put the data from each sensor to his relative index (a0, a1,a2 etc..) with a timestamp
I wrote a script that sends that the data to a local Elasticsearch index with python.
The local instance doesn't have a stable connection to the internet which means there could be down time
now my question is, how do I sync between a local instance to a remote instance?
I also have to make sure it retries to send the data in case of failure(internet loss etc..)
*The local instance need to only send data every(x minutes) to the remote instance and not be available to the WWW
Thanks in advance, I don't have much experience when it comes to ElasticStack so there might be a built in option to do this which I don't know about.

Im using es 6.2 by the way

For now, I'd say snapshot, ship the data to the main cluster on a shared FS and restore data.

XDCR will happen at some point but it's not yet there.

In general avoid splitting nodes across multiple data centers where the connection is bad.

But why do you have a local instance BTW? For local analysis of the data?

1 Like

Is there any specific reason to have the Elastic Local?Can you place a message broker in between?

1 Like

@snowleopard, I've tried to make use of RabbitMq(Pika on python3 to be exact) and I faced some difficulties making it work the way I want it to and to also interact with elasticsearch

Okay so I've dropped the idea of the local instance..
@dadoonet , @snowleopard
now I'm kinda stuck with how I send data to the db in a reliable way
Lets say I output to 47 Local files named after the date(YYYY-MM-dd.log) each file is located on its own directory (/a1, /a15, /b1, /b15 etc..)
what is the best way to output these logs every 30 mins if each log should go to an index named after his root directory?
I've included my python code for reference if it helps, I couldn't find a way to implent RabbitMq into it or any other broker for that matter since its already pretty complicated for me since I dont code usually.
Its to be noted that I dont want any duplicates of data in elasticsearch and I want to be sure that the data is delivered even if the internet disconnects for a while and reconnects..
thanks in advance

# ----- Configuration ----- #
print_lock = threading.Lock() # So prints wont mess up
ps = []
repeatTime = input("Sleep time between samples(Sec):")

def config():
	
	for i in range(howmany):
		print("board No #", i + 1," Configuration:")
		portInput = input("COM Port:")
		ps.append(portInput)
	print(ps)
	return ps

if len(sys.argv) > 1:
	print("Arguments passed")
	argus = sys.argv
	argus.pop(0)
	howmany = len(argus)
	for i in range(len(argus)):
		print(argus[i])
		ps.append(argus[i])
else:
	howmany = int(input("How many boards?:"))
	config()
# /*----- Configuration -----*\ #

# ----- Save&Export ----- #
def saveData(board, th, s, now):
	today = datetime.date.today()
	path = str(board)+"/"+str(s)+"/"+str(today)+".log"
	tr = th.split(':')
	temp = tr[0]
	hum = tr[1]
	# ti = tr[2] # Temp Index(Removed)
	data = now + "| Temp:" + temp + " Humidity:" + hum + "\n"
	saveFile = open(path, "ab")
	saveFile.write(data.encode("utf-8"))
	saveFile.close()
	# todo Add error handlers in case there is a problem saving the file.
# /*----- Save&Export -----*\ #


# ----- Main Loop ----- #
def blowJob(worker, board):
	# This is where the shit happens
	# Since shit falls in a linear way... usually
	while True:#    This is linear from now on:
		ser = serial.Serial(board,9600)
		ser.close() # In case the Serial is already open in other place mandtory for the first run
		ser.open()
		ser.write(('AT'+'\r\n').encode('utf-8')) #Send a request to the board
		value = ser.readline().decode("utf-8").strip('\n\r').split(';') #Split the respond to an array value
		# now = strftime("%Y-%m-%d %H:%M:%S", gmtime())
		local = time.localtime()
		now = strftime("%Y-%m-%d %H:%M:%S", local)
		b = value[0] #Get the first value as the board Identifier
		value.pop(0) #Remove the frist value
		with print_lock:
			print("Recived from Board ", b,': ')


		for i in range(len(value)):
			data = value[i]
			with print_lock:
				print(data)
			if data == "0.00:0.00":
				saveData(b, data, i, now)
				with print_lock:
					print("Error: Reading from Board: "+ board + ", Sensor: " + str(i+1) +". Has Failed!" )
			else:
				
				saveData(b, data, i, now)
				with print_lock:
					print("Data from Board: "+ board + ", Sensor: " + str(i+1) +". Has Saved")

		ser.close()
		with print_lock:
			print(threading.current_thread().name, worker, board)
		time.sleep(int(repeatTime))
# /*----- Main Loop-----*\  #


# ----- Threads setup ---- #
def threader(port): 
	while True:
		worker = q.get()
		blowJob(worker, port)
		q.task_done()

q = Queue()

for i in range(howmany): 
	port = ps[i]
	t = threading.Thread(target = threader, args =(port,))
	t.deadmon = True # Dies when the main thread dies
	t.start()

for worker in range(howmany): #Threads put to work
	q.put(worker)

q.join() 
# /*----- Threads setup ----*\ #

So it's no more an elasticsearch related question. You will get more accurate help I believe on RabbitMQ forums IMO.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.