Forwarding my bulk records from Http Input Plugin to Elasticsearch Output Plugin - Discuss the Elastic Stack

Eureka! Yes! It works!

With this request:

{
    "index_name": "journaling_insert",
    "payload": [
        {
            "REMOTEIP": "1.111.1.11",
            "CHAINCODE": "8971",
            "EVENTID": "16",
            "STOREATTRIBUTE3": "Passed Value",
            "DATETIME": "2025-03-14T19:14:58.600",
            "STOREATTRIBUTE2": "StoreDB Value",
            "STOREATTRIBUTE4": "StoreDB Value",
            "STOREATTRIBUTE5": "StoreDB Value",
            "FLSECURITY": {
                "SID": "1111"
            },
            "FLCUSTOMER": {
                "FIRSTNAME": "Gandalf",
                "LASTNAME": "the Grey"
            }
        },
        {
            "REMOTEIP": "1.111.1.11",
            "CHAINCODE": "8971",
            "EVENTID": "17",
            "DRAWERIDENT": "test",
            "DATETIME": "2025-03-14T19:14:58.600",
            "STOREATTRIBUTE": "StoreDB Value",
            "STOREATTRIBUTE3": "StoreDB Value",
            "STOREATTRIBUTE4": "StoreDB Value",
            "STOREATTRIBUTE2": "StoreDB Value",
            "FLTRANSACTIONATTRIBUTES": {
                "INVOICENUMBER": "1111"
            },
            "FLCUSTOMER": {
                "FIRSTNAME": "Gandalf",
                "LASTNAME": "the Grey"
            }
        }
    ]
}

This is the final config file:

input {
	http {
		port => 5043
		codec => "json"
	}
}
filter {
	split {
		field => "[payload]"
	} 
	ruby {
		code => 'event.get("payload").each { 
			|k, v| event.set(k, v) 
		}'
	}
	mutate {
		add_field => { "[@metadata][index_name]" => "%{index_name}" }
		remove_field => "payload"
		remove_field => "headers"
		remove_field => "host"
		remove_field => "index_name"
		remove_field => "@version"
	}
}
output {
    file {
        path => "/log_streaming/my_app/log-%{+yyyy-MM-dd_HH.mm.ss.SSS}.log"	
    }
	elasticsearch {
		hosts => "localhost:9200"
		index => "%{[@metadata][index_name]}"
	}
}

Why I cannot remove @timestamp?

Hey @Badger , I tried injecting you configuration in mine:

input {
	http {
		port => 5043
	}
}
filter {
    mutate { gsub => [ "message", "\r", "" ] }
    mutate { split => { "message" => "
" } }
    ruby {
        code => '
            msg = event.get("message")
            if msg.is_a? Array
                while msg.length > 1 do
                    clone = event.clone
                    clone.set("message", msg.shift(2))

                    new_event_block.call(clone)
                end
                if msg.length > 0
                    logger.warn("Content is uneven")
                    event.set("message", msg)
                else
                    event.cancel
                end
            end
        '
    }
    if [message][1] {
        json { source => "[message][1]" }
        json { source => "[message][0]" target => "[@metadata][operation]" }
    }
	mutate {
		remove_field => "headers"
		remove_field => "host"
		remove_field => "message"
		remove_field => "@version"
	}
}
output {
    file {
        path => "/log_streaming/my_app/log-%{+yyyy-MM-dd_HH.mm.ss.SSS}.log"	
    }
	if [@metadata][index] {
		elasticsearch {
			hosts => "localhost:9200"
			index => "%{[@metadata][operation][index][_index]}"
		}
	}
}

and I can see that the file created as output is correct:

{"STOREATTRIBUTE5":"StoreDB Value","CHAINCODE":"8971","REMOTEIP":"1.111.1.11","@timestamp":"2025-03-17T18:07:27.424Z","DATETIME":"2025-03-17T18:07:27.420","STOREATTRIBUTE3":"Passed Value","EVENTID":"16","STOREATTRIBUTE2":"StoreDB Value","STOREATTRIBUTE4":"StoreDB Value","FLSECURITY":{"SID":"1111"},"FLCUSTOMER":{"LASTNAME":"the Grey","FIRSTNAME":"Gandalf"}}
{"STOREATTRIBUTE5":"StoreDB Value","FLTRANSACTIONATTRIBUTES":{"INVOICENUMBER":"1111"},"CHAINCODE":"8971","REMOTEIP":"1.111.1.11","@timestamp":"2025-03-17T18:07:27.424Z","DATETIME":"2025-03-17T18:07:27.420","STOREATTRIBUTE2":"StoreDB Value","EVENTID":"17","DRAWERIDENT":"test","STOREATTRIBUTE3":"StoreDB Value","STOREATTRIBUTE4":"StoreDB Value","FLCUSTOMER":{"LASTNAME":"the Grey","FIRSTNAME":"Gandalf"}}

But it's not sending information to Elasticsearch. I don't have an error but I don't have the data in Elasticsearch and I don't see any new index created. Any suggestion?

My mistake. That should have been

if [@metadata][operation][index] {

Eureka, it works!

I am giving to @Badger the solution check because it allows me to don't change the input and he was suggesting it from the other thread but, @leandrojmp , your solution is working great too and I could use it to unify all requests that I was sending to my elasticsearch endpoint for myelasticendpoint:9200/myindex/_doc and the myelasticendpoint:9200/_bulk.

Thanks all for your help!

1 Like