Thanks, @navya_k!
You can try something like this?
from elasticsearch import Elasticsearch
from getpass import getpass
from datetime import datetime
# Initialize Elasticsearch client
client = Elasticsearch(
getpass("Host: "),
api_key=getpass("Elastic API Key: "),
)
# Uploads logs to a specified Elasticsearch index.
def upload_logs(index_name, logs):
for log in logs:
client.index(index=index_name, document=log)
# Searches logs in a specified Elasticsearch index based on a time range provided by the user.
def search_logs(index_name):
# Get user input for start and end time
start_time = input("Enter start time (YYYY-MM-DDTHH:MM:SS): ")
end_time = input("Enter end time (YYYY-MM-DDTHH:MM:SS): ")
# Convert the input times to ISO format
start_time_iso = datetime.strptime(start_time, '%Y-%m-%dT%H:%M:%S').isoformat()
end_time_iso = datetime.strptime(end_time, '%Y-%m-%dT%H:%M:%S').isoformat()
# Construct the query
query = {
"range": {
"timestamp": {
"gte": start_time_iso,
"lte": end_time_iso
}
}
}
# Execute the search query
response = client.search(index=index_name, body={"query": query})
return response['hits']['hits']
# Example usage:
if __name__ == "__main__":
# Define the index name
index_name = "sample_logs"
# Example log entries
logs = [
{"timestamp": "2024-04-23T12:00:00", "level": "INFO", "message": "System start."},
{"timestamp": "2024-04-23T12:05:00", "level": "ERROR", "message": "Failed to connect to database."},
]
# Upload logs
upload_logs(index_name, logs)
# Search logs within a time range specified by the user
search_results = search_logs(index_name)
print(search_results)
Let me know if that works for you?