I have an index that contains 1 document. This document has a field1 with value "B" and a field2 with value "C, D, E" (that is, the value in field2 is comma separated and can have variable lenght).
I want to create a new index, that contains the following 3 documents:
field1: "B" and field2:"C"
field1:"B" and field2:"D"
field1:"B" and fielde:"E"
I was thinking about using a watcher to reindex the already existing documents and creating the new field at the same time. But I'm not sure how to do this nor if this is the correct approach.
Thanks in advance for sharing your knowledge
In case someone might find it useful, I managed to solve this with the following python script.
import logging
from elasticsearch import Elasticsearch, helpers
import schedule
import time
import sys
# Configure logging to save logs in parse.log file
logging.basicConfig(level=logging.INFO, filename='parse.log', filemode='a', format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
# Configure Elasticsearch connection
es = Elasticsearch(hosts=["https://elasticsearch.com:111"], basic_auth=("elasticsearch_user", "elasticsearch_password"))
# Define source and destination index names
source_index = "source_index"
dest_index = "dest index"
# Optional query to filter documents from source index
query = {
"size": 5000, # Adjust the size based on your requirements
"query": {
"range": {
"@timestamp": {
"gte": "now-1d/d",
"lte": "now/d"
}
}
}
}
# Use the helpers.bulk API for efficient indexing
def job():
actions = []
for hit in es.search(index=source_index, body=query)["hits"]["hits"]:
# Modify document if needed before adding to actions
source = hit["_source"]
# Extract value of fields code_one and code_two
field1 = source["field1"]
field2 = source["field2"]
# Extract specific terminals
field1_string = str(field1)
field2_string = str(field2)
nodes = field2_string.split(",")
# Define de id of the document, according to the number of terminals
if len(nodes) == 0:
source["node"] = ""
actions.append({
"_index": dest_index,
"_id": hit["_id"],
"_source": source.copy() # Create a copy of the source dictionary
})
else:
for node in nodes:
new_id = field1_string + node
source_copy = source.copy() # Create a copy of the source dictionary
source_copy['host'] = node
actions.append({
"_index": dest_index,
"_id": new_id,
"_source": source_copy
})
# Send all the documents to the destination index using bulk API
try:
helpers.bulk(es, actions)
logger.info(f"Bulk indexing successful. Processed {len(actions)} documents.")
except Exception as e:
logger.error(f"Error during bulk indexing: {e}")
def main():
# Schedule the job to run every day at 7 am
schedule.every().day.at("07:00").do(job)
try:
# Run the scheduler in an infinite loop
while True:
schedule.run_pending()
time.sleep(1)
except KeyboardInterrupt:
# Handle Keyboard Interrupt (Ctrl+C)
logger.info("Received KeyboardInterrupt. Exiting gracefully.")
sys.exit(0)
if __name__ == "__main__":
main()
This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.