Hello everyone,
I import values from a CSV via Logstash to Elasticsearch, whereby the timestamp is correctly determined from the name of the CSV and then the correct format is also written to Elasticsearch - e.g.: 2024-03-29T10:40:09.324934244Z
Strangely enough, I see the wrong value for the variable @timestamp in Kibana, namely the value for the execution / import of the data set. It worked fine until a few weeks ago with the same script. Does anyone have any idea what the problem could be?
Logstash Script - date part:
# Datum und Uhrzeit aus dem CSV-Timestamp extrahieren und in @timestamp speichern
date {
match => ["Timestamp", "EEE dd MMM yyyy hh:mm:ss a 'CEST'", "EEE dd MMM yyyy hh:mm:ss a 'CET'"]
timezone => "Europe/Berlin" # Setzen Sie diese auf Ihre relevante Zeitzone.
#remove_field => ["Timestamp"]
}
# Konvertiere Millisekunden zu Sekunden und kombiniere mit @timestamp
ruby {
code => "
# Konvertiere @timestamp in ein Time-Objekt
timestamp_str = event.get('@timestamp').to_s
new_time = Time.at(DateTime.parse(timestamp_str).to_time.to_f)
# Setze das aktualisierte @timestamp-Feld zurück
event.set('@timestamp', LogStash::Timestamp.new(new_time))
"
}
}
output {
stdout { codec => rubydebug }
file {
path => "/home/ai-upload/logstash.log"
codec => "rubydebug"
}
elasticsearch {
hosts => ["http://localhost:9200"]
index => "moderationen"
}
}
elasticsearch import python part:
# Die letzten Timestamps finden
try:
last_indexed = es.search(index="topics", size=1, sort={"@timestamp": "desc"})
last_timestamp = last_indexed['hits']['hits'][0]['_source']['@timestamp']
except:
last_timestamp = "1900-01-01T00:00:00.000Z"
# Nachrichten abrufen, die neuer sind als der zuletzt gespeicherte Timestamp
response = es.search(index="moderationen", size=10000, _source=["_id", "text", "@timestamp"],
query={
"range": {
"@timestamp": {
"gt": last_timestamp
}
}
})
(....)
timestamps = [hit["_source"].get("@timestamp", None) for hit in response["hits"]["hits"]]
print("Anzahl der Timestamps:", len(timestamps))
print(timestamps[:10])
(...)
# Verarbeiten Sie jedes Dokument mit OpenAI
for idx, document in enumerate(documents):
word_count = len(document.split())
char_count = len(document)
# Ergebnisse in Elasticsearch speichern
es.index(index="topics", document={
"@timestamp": response['hits']['hits'][idx]['_source']['@timestamp'],
"id": ids[idx],
"text": documents[idx],
"topics": extracted_topics
})
This is one record at my index:
I think dateparsefailure indicates a problem, but how could I find the correct log file?