I have the following situation: I am uploading time sensitive data to Elastic. The folder which gets opened by the parsing and upload script contains all the files which I want to upload.
However, in this folder there are files stored which I have already uploaded to Elastic.
So I still parse these files locally, but I do not want to upload them. I am adding to every document a MD5 hashvalue as an extra field to make sure that it is unique.
I know it is not the best performance solution, but is there a way to tell Elastic that it should search through the whole index if such a hashvalue or id on the basis of a hashvalue is present? If it is present just update the document and if not upload the new document?
I have searched the forum and some Blogs like this or where the idea of a fingerprint plug-in for Logstach was suggested. But I want a solution before uploading.
This Blog entry also mentions the use of HTTP PUT so that you provide the ID and Elastic searches through the whole index to either update or newly upload the document.
This is exactly that I want. But I have no idea on how to change the upload script
This is the upload script I am using as a function at the moment
def elasticUpload(elasticServer, indexName, directory):
try:
client = Elasticsearch(elasticServer)
except Exception as e:
raise ConnectionError("Error creating client at " + elasticServer)
files = []
def getFiles(path=".", files=[]):
if os.path.isfile(path):
return files.append(path)
for item in os.listdir(path):
item = os.path.join(path, item)
if os.path.isfile(item):
files.append(item)
else:
files = getFiles(item, files)
return files
files = getFiles(path=directory)
for report in files:
if ".ndjson" not in report:
continue
with open(report, "r") as f:
elems = ndjson.load(f)
for el in elems:
res = client.index(
index=indexName,
document=el
)
logging.info("Indexed a document:" + str(res))
print("Indexed a document:", res)
os.remove(report)
Has anyone an idea on how to change this script so that it uses HTTP PUT that Elastic dose what I described above?
Any suggestions on improving the Python function is also welcomed.
No, that is not possible. If you have a unique hash that identifies the document, what you should do is use this as a document ID when you index or update. That will create it if it is new and replace it if it already exists. If you do not want to replace, use create rather than index and be prepared to handle any errors in yout script.
Ok, so if I understood you correctly, I define the hashvalue as the "_id" parameter and let it upload the document with client.index() as usual to Elastic. Then Elastic will take care of the rest? So either update or newly create?
Or should I define the hashvalue as "document_id" and give the client extra parameter that it knows what to do?
I have come across another possible solution which works partly for me.
When I define an index and a pipeline like this, the value "_id" gets updated by the value of "fingerprint" as soon as I upload a document to the index
PUT newCreatedIndex
{
"settings": {
"default_pipeline": "replaceIDbyFingerprint"
}
}
PUT _ingest/pipeline/bat_results-pipelineID
{
"description": "updates _id with fingerprint at the time of ingestion for bat_results",
"processors": [
{
"set": {
"field": "_id",
"value": "{{fingerprint}}",
"ignore_failure": true,
"ignore_empty_value": true
}
}
]
}
So far it satisfices all my requirements. But when I now want to upload exactly the same documents with the same hashcalue to the index "newCreatedIndex", I get the following logging error:
WARNING - Node <Urllib3HttpNode(http://smuc13084-nat.bmwgroup.net:9200)> has failed for 1 times in a row, putting on 1 second timeout
CRITICAL - ConflictError(409, 'version_conflict_engine_exception', '[8a75c3f8c5bf650d5621fbd0e68fbaee]: version conflict, document already exists (current version [1])')
CRITICAL - <class 'elasticsearch.ConflictError'> main.py 67
Dose anyone knows how to solve this error?
When I upload a document the second, third time or so, I just want the original document be replaced. So hat there are no duplicates but the uploading process continues without errors
I have solved the problem
The following steps I have implemented:
The pipeline to replace the value "_id" with the value "fingerprint" from the document:
PUT _ingest/pipeline/replaceIDwithFingerprint
{
"description": "replace the automatic generated value of id with the value fingerprint",
"processors": [
{
"set": {
"field": "_id",
"value": "{{fingerprint}}",
"ignore_failure": true,
"ignore_empty_value": true
}
}
]
}
This pipeline I have added to the index with the following
PUT indexName
{
"settings": {
"default_pipeline": "replaceIDwithFingerprint"
}
}
The Upload-Script I have changed in the following way:
fingerprintID = el['fingerprint']
res = client.update(
index=indexName,
doc=el,
id = fingerprintID,
doc_as_upsert = True
)
This s solved the problem that I can now upload multiple documents to the same index and ELK will take care of adding it new or updating the already existing document
Apache, Apache Lucene, Apache Hadoop, Hadoop, HDFS and the yellow elephant
logo are trademarks of the
Apache Software Foundation
in the United States and/or other countries.