I'm sending data with python on both kafka and http request however kafka pipeline wraps the data with "document". the data on the top is coming from kafka pipeline and the below from http pipeline
And these are my client codes
from confluent_kafka import Producer
import json
try:
# producer = Producer({'bootstrap.servers': '192.168.:29092'})
producer = Producer({'bootstrap.servers': 'localhost:9092'})
except Exception as e:
print(e)
def send_message(topic, message):
try:
message = json.dumps(message)
# print(message)
producer.produce(topic, message)
producer.flush()
print("Message sent successfully")
except Exception as e:
print(e)
data = {
"service_name": "kafka-service",
"activity_type": "info",
"error": {
"component_id": "1234",
"message": "dogru index girildi",
},
"additional_data": {
"key": "value"
}
}
# print(json.dumps(data.get('error')))
send_message('kafka-activity-log', data)
import requests
import json
# Logstash HTTP endpoint
LOGSTASH_URL = 'http://localhost:8080' # Replace with your Logstash endpoint URL
# Data to be sent to Logstash
data = {
"user_id": "user123",
"service_name": "example_service",
"activity_type": "info",
"error": {
"component_id": "1234",
"message": "dogru index girildi",
},
"additional_data": {
"key": "now1"
}
}
# Convert data to JSON format
json_data = json.dumps(data)
# Set up HTTP headers
headers = {
'Content-Type': 'application/json'
}
# Send POST request to Logstash
response = requests.post(LOGSTASH_URL, headers=headers, data=json_data)
# Check the response
if response.status_code == 200:
print("Data sent to Logstash successfully!")
else:
print(f"Failed to send data to Logstash. Status code: {response.status_code}")
print(response.text)
this is my pipeline they both are the same just with different inputs
input {
kafka {
bootstrap_servers => "kafka:29092"
topics => ["kafka-activity-log"]
codec => json {
target => "[document]"
}
}
}
filter{
json {
source => "message"
target => "parsed_json"
}
if "_jsonparsefailure" not in [tags] {
mutate {
remove_field => ["message", "@version", "host", "url", "event", "user_agent"]
}
ruby {
code => '
empty_fields = 0
["user_id", "service_name", "activity_type"].each do |field|
if event.get(field).to_s.empty?
empty_fields += 1
end
end
if empty_fields >= 2
event.set("[@metadata][index]", "user_activity_log_missing_data")
elsif event.get("[user_id]").to_s.empty?
event.set("[@metadata][index]", "user_activity_log_empty_user_id")
elsif event.get("[service_name]").to_s.empty?
event.set("[@metadata][index]", "user_activity_log_empty_service_name")
elsif event.get("[activity_type]").to_s.empty?
event.set("[@metadata][index]", "user_activity_log_empty_activity_type")
elsif event.get("[error][message]").to_s.empty?
event.set("[@metadata][index]", "user_activity_log_empty_error_message")
elsif event.get("[error][component_id]").to_s.empty?
event.set("[@metadata][index]", "user_activity_log_empty_component_id")
else
event.set("[@metadata][index]", "user_activity_log")
end
'
}
} else {
ruby {
code => '
event.set("[@metadata][index]", "user_activity_log_invalid_data")
'
}
}
}
how can i prevent this so it does not wrap the element with document