Logstash wrapping the data with document

I'm sending data with python on both kafka and http request however kafka pipeline wraps the data with "document". the data on the top is coming from kafka pipeline and the below from http pipeline

And these are my client codes

from confluent_kafka import Producer

import json

try:
    # producer = Producer({'bootstrap.servers': '192.168.:29092'})
    producer = Producer({'bootstrap.servers': 'localhost:9092'})
except Exception as e:
    print(e)

def send_message(topic, message):
    try:
        message = json.dumps(message)
        # print(message)
        producer.produce(topic, message)

        producer.flush()
        print("Message sent successfully")
    except Exception as e:
        print(e)


data = {
    "service_name": "kafka-service",
    "activity_type": "info",
    "error": {
        "component_id": "1234",
        "message": "dogru index girildi",
                
    },
    "additional_data": {
        "key": "value"
    }
}

# print(json.dumps(data.get('error')))
send_message('kafka-activity-log', data)

import requests
import json

# Logstash HTTP endpoint
LOGSTASH_URL = 'http://localhost:8080'  # Replace with your Logstash endpoint URL

# Data to be sent to Logstash
data = {
    "user_id": "user123",
    "service_name": "example_service",
    "activity_type": "info",
    "error": {
        "component_id": "1234",
        "message": "dogru index girildi",
                
    },
    "additional_data": {
        "key": "now1"
    }
}

# Convert data to JSON format
json_data = json.dumps(data)

# Set up HTTP headers
headers = {
    'Content-Type': 'application/json'
}

# Send POST request to Logstash
response = requests.post(LOGSTASH_URL, headers=headers, data=json_data)

# Check the response
if response.status_code == 200:
    print("Data sent to Logstash successfully!")
else:
    print(f"Failed to send data to Logstash. Status code: {response.status_code}")
    print(response.text)

this is my pipeline they both are the same just with different inputs

 input {
    kafka {
      bootstrap_servers => "kafka:29092"
      topics => ["kafka-activity-log"]
      codec => json {
        target => "[document]"
      }
    } 
    }

    filter{
      json {
        source => "message"
        target => "parsed_json"
      }


    if "_jsonparsefailure" not in [tags] {
      mutate {
        remove_field => ["message", "@version", "host", "url", "event", "user_agent"]
      }

      ruby {
        code => '
          empty_fields = 0
          ["user_id", "service_name", "activity_type"].each do |field|
            if event.get(field).to_s.empty?
              empty_fields += 1
            end
          end

          if empty_fields >= 2
            event.set("[@metadata][index]", "user_activity_log_missing_data")
          elsif event.get("[user_id]").to_s.empty?
            event.set("[@metadata][index]", "user_activity_log_empty_user_id")
          elsif event.get("[service_name]").to_s.empty?
            event.set("[@metadata][index]", "user_activity_log_empty_service_name")
          elsif event.get("[activity_type]").to_s.empty?
            event.set("[@metadata][index]", "user_activity_log_empty_activity_type")
          elsif event.get("[error][message]").to_s.empty?
            event.set("[@metadata][index]", "user_activity_log_empty_error_message")
          elsif event.get("[error][component_id]").to_s.empty?
            event.set("[@metadata][index]", "user_activity_log_empty_component_id")
          else
            event.set("[@metadata][index]", "user_activity_log")
          end
        '
      }
    } else {
      ruby {
        code => '
          event.set("[@metadata][index]", "user_activity_log_invalid_data")
        '
      }
    }
    }

how can i prevent this so it does not wrap the element with document

It is your kafka config that is putting the data from kafka inside the document field.

      codec => json {
        target => "[document]"
      }

The target option does exactly what you do not want it to do, it will parse your json and put it as nested fields inside document, if you do not want it, remove the target option, use just codec => json.

thanks a lot !
but my http pipeline is configured the same

codec => json {
      target => "[document]" # Parse the JSON into the [document] field
    }

why it does not do that ?

It is a bug, the http input per default will expect the massages as json, so if your message is a json it will basically ignore anything you put in the codec option.

Check this answer about it.

1 Like

i did not know that again thanks a lot for your help

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.