Mutate join filter

Is it possible to use the mutate join filter to make it possible to make queries across indexes in Kibana? Like joining tables in SQL?

What is the “mutate join filter” you are referring to? What are you looking to achieve?

I'm trying to get information from two different indexes. I've got a log file that contains different messages about a phone call, and I want to get information from all of the messages in e.g. one table in kibana.

Can you provide an example of data from the two sources?

"Index" = "message1":

"sessionID" = "12345"
"Response" = "answered"

"Index" = "message2":

"sessionID" = "12345"
"Caller" = 09876543"
"Callee" = 01234567"

What I want:

"Index" = "message"

"sessionID" = "12345"
"Response" = "answered"
"Caller" = 09876543"
"Callee" = 01234567"

You will need to join these up before indexing and might be able to get se the translate filter.

Can you give me an example of how to do this?

Another possible approach is doc_as_upsert, as mentioned here.

I tried that, but I don't quite understand how it works, so I didn't manage to make it work. Logstash just shuts down.

output {
if [Type] == "adapter" {
    file { path => "/some/path/out.txt" codec => plain { format => '{ "update" : {"_id" : "%{sessionID}", "_type" : "doc", "_index" : "someindex"} }
{ "doc": "sessionID": true, "doc_as_upsert" : true }
' } }
}
}

So now the configuration file runs, but the out.txt file only looks like this:

{"update" : {"_id": "%{sessionid}", "_type" : "doc", "_index" : "someindex"}}
			{"doc" : "sessionid": true, "doc_as_upsert" : true}
			{"update" : {"_id": "%{sessionid}", "_type" : "doc", "_index" : "someindex"}}
			{"doc" : "sessionid": true, "doc_as_upsert" : true}
			{"update" : {"_id": "%{sessionid}", "_type" : "doc", "_index" : "someindex"}}
			{"doc" : "sessionid": true, "doc_as_upsert" : true}
			{"update" : {"_id": "%{sessionid}", "_type" : "doc", "_index" : "someindex"}}
			{"doc" : "sessionid": true, "doc_as_upsert" : true}
			{"update" : {"_id": "%{sessionid}", "_type" : "doc", "_index" : "someindex"}}
			{"doc" : "sessionid": true, "doc_as_upsert" : true}
			{"update" : {"_id": "%{sessionid}", "_type" : "doc", "_index" : "someindex"}}
			{"doc" : "sessionid": true, "doc_as_upsert" : true}
			{"update" : {"_id": "%{sessionid}", "_type" : "doc", "_index" : "someindex"}}
			{"doc" : "sessionid": true, "doc_as_upsert" : true}
			{"update" : {"_id": "%{sessionid}", "_type" : "doc", "_index" : "someindex"}}
			{"doc" : "sessionid": true, "doc_as_upsert" : true}
			{"update" : {"_id": "%{sessionid}", "_type" : "doc", "_index" : "someindex"}}
			{"doc" : "sessionid": true, "doc_as_upsert" : true}
			{"update" : {"_id": "%{sessionid}", "_type" : "doc", "_index" : "someindex"}}
			{"doc" : "sessionid": true, "doc_as_upsert" : true}
			{"update" : {"_id": "%{sessionid}", "_type" : "doc", "_index" : "someindex"}}
			{"doc" : "sessionid": true, "doc_as_upsert" : true}
			{"update" : {"_id": "%{sessionid}", "_type" : "doc", "_index" : "someindex"}}
			{"doc" : "sessionid": true, "doc_as_upsert" : true}
			{"update" : {"_id": "%{sessionid}", "_type" : "doc", "_index" : "someindex"}}
			{"doc" : "sessionid": true, "doc_as_upsert" : true}
			{"update" : {"_id": "%{sessionid}", "_type" : "doc", "_index" : "someindex"}}
			{"doc" : "sessionid": true, "doc_as_upsert" : true}
			{"update" : {"_id": "%{sessionid}", "_type" : "doc", "_index" : "someindex"}}
			{"doc" : "sessionid": true, "doc_as_upsert" : true}
			{"update" : {"_id": "%{sessionid}", "_type" : "doc", "_index" : "someindex"}}
			{"doc" : "sessionid": true, "doc_as_upsert" : true}
			{"update" : {"_id": "%{sessionid}", "_type" : "doc", "_index" : "someindex"}}
			{"doc" : "sessionid": true, "doc_as_upsert" : true}
			{"update" : {"_id": "%{sessionid}", "_type" : "doc", "_index" : "someindex"}}
			{"doc" : "sessionid": true, "doc_as_upsert" : true}
			{"update" : {"_id": "%{sessionid}", "_type" : "doc", "_index" : "someindex"}}
			{"doc" : "sessionid": true, "doc_as_upsert" : true}
			{"update" : {"_id": "%{sessionid}", "_type" : "doc", "_index" : "someindex"}}
			{"doc" : "sessionid": true, "doc_as_upsert" : true}
			{"update" : {"_id": "%{sessionid}", "_type" : "doc", "_index" : "someindex"}}
			{"doc" : "sessionid": true, "doc_as_upsert" : true}
			{"update" : {"_id": "%{sessionid}", "_type" : "doc", "_index" : "someindex"}}
			{"doc" : "sessionid": true, "doc_as_upsert" : true}
			{"update" : {"_id": "%{sessionid}", "_type" : "doc", "_index" : "someindex"}}
			{"doc" : "sessionid": true, "doc_as_upsert" : true}
			{"update" : {"_id": "%{sessionid}", "_type" : "doc", "_index" : "someindex"}}
			{"doc" : "sessionid": true, "doc_as_upsert" : true}
			{"update" : {"_id": "%{sessionid}", "_type" : "doc", "_index" : "someindex"}}
			{"doc" : "sessionid": true, "doc_as_upsert" : true}
			{"update" : {"_id": "%{sessionid}", "_type" : "doc", "_index" : "someindex"}}
			{"doc" : "sessionid": true, "doc_as_upsert" : true}
			{"update" : {"_id": "%{sessionid}", "_type" : "doc", "_index" : "someindex"}}
			{"doc" : "sessionid": true, "doc_as_upsert" : true}
			{"update" : {"_id": "%{sessionid}", "_type" : "doc", "_index" : "someindex"}}
			{"doc" : "sessionid": true, "doc_as_upsert" : true}
			{"update" : {"_id": "%{sessionid}", "_type" : "doc", "_index" : "someindex"}}
			{"doc" : "sessionid": true, "doc_as_upsert" : true}
			{"update" : {"_id": "%{sessionid}", "_type" : "doc", "_index" : "someindex"}}
			{"doc" : "sessionid": true, "doc_as_upsert" : true}
			{"update" : {"_id": "%{sessionid}", "_type" : "doc", "_index" : "someindex"}}
			{"doc" : "sessionid": true, "doc_as_upsert" : true}
			{"update" : {"_id": "%{sessionid}", "_type" : "doc", "_index" : "someindex"}}
			{"doc" : "sessionid": true, "doc_as_upsert" : true}
			{"update" : {"_id": "%{sessionid}", "_type" : "doc", "_index" : "someindex"}}
			{"doc" : "sessionid": true, "doc_as_upsert" : true}

This was not exactly the result I was hoping for

Let me expand upon that approach. Suppose we have two sources of data. Firstly we have a file that contains a session identifier and a response

{ "sessionID": "12345", "Response": "answered" }
{ "sessionID": "23456", "Response": "dropped" }

Then we have another file that contains the caller and callee

{ "sessionID": "12345", "Caller": "09876543", "Callee": "01234567" }
{ "sessionID": "23456", "Caller": "09871234", "Callee": "76543210" }

We can use file inputs to read these and tag them, so that we know which events have which format

input {
    file { path => "/home/foo.txt" tags => [ "foo" ] sincedb_path => "/dev/null" start_position => beginning }
    file { path => "/home/bar.txt" tags => [ "bar" ] sincedb_path => "/dev/null" start_position => beginning }
}

We use a json filter to parse them

filter { json { source => "message" } }

In the output section we write out the appropriate fields for each type

output {
    if "foo" in [tags] {
        file { path => "/tmp/out.txt" codec => plain { format => '{ "update" : {"_id" : "%{sessionID}", "_type" : "doc", "_index" : "someindex"} }
{ "doc": { "Response": "%{Response}" }, "doc_as_upsert" : true }
' } }
    }
    if "bar" in [tags] {
        file { path => "/tmp/out.txt" codec => plain { format => '{ "update" : {"_id" : "%{sessionID}", "_type" : "doc", "_index" : "someindex"} }
{ "doc": { "Caller": "%{Caller}", "Callee": "%{Callee}" }, "doc_as_upsert" : true }
' } }
    }
}

This results in the following output

{ "update" : {"_id" : "12345", "_type" : "doc", "_index" : "someindex"} }
{ "doc": { "Response": "answered" }, "doc_as_upsert" : true }
{ "update" : {"_id" : "23456", "_type" : "doc", "_index" : "someindex"} }
{ "doc": { "Response": "dropped" }, "doc_as_upsert" : true }
{ "update" : {"_id" : "12345", "_type" : "doc", "_index" : "someindex"} }
{ "doc": { "Caller": "09876543", "Callee": "01234567" }, "doc_as_upsert" : true }
{ "update" : {"_id" : "23456", "_type" : "doc", "_index" : "someindex"} }
{ "doc": { "Caller": "09871234", "Callee": "76543210" }, "doc_as_upsert" : true }

If you load that into elasticsearch using curl then the first update with a given _id will insert the document with the associated fields, and subsequent udpates will update the document with additional fields.

This is probably a stupid question, but how do I do this when all of the information is in the same file? I have used several grok filters to extract the infromation that I need from the lines in the file. Do I need to save them in different files first?
Thank you so much for your help!

You can use any conditional that tells you whether the event contains [Response] or [Callee]. It might be as simple as

output {
    if [Response] {
        file { path => "/tmp/out.txt" codec => plain { format => '{ "update" : {"_id" : "%    {sessionID}", "_type" : "doc", "_index" : "someindex"} }
{ "doc": { "Response": "%{Response}" }, "doc_as_upsert" : true }
' } }
    }
    if [Callee] {
        file { path => "/tmp/out.txt" codec => plain { format => '{ "update" : {"_id" : "%{sessionID}", "_type" : "doc", "_index" : "someindex"} }
{ "doc": { "Caller": "%{Caller}", "Callee": "%{Callee}" }, "doc_as_upsert" : true }
' } }
    }
}

Now my problem is that this command:

curl -XPOST 'http://localhost:9200/someindex/doc/_bulk' -H "Content-Type: application/json" --data-binary @/home/summer19/dipper_prod_logs/out.txt

won't run unless i exchange "_id" with "id", "_type" with "type" and "_index" with "index". But when I do that, nothig happens to the docs in the "someindex"-index.
Any suggestions to how i can fix this?

You can probably get rid of type/_type altogether, but the current documentation says you should be using _index and _id. What error do you get?

{"error":{"root_cause":[{"type":"x_content_parse_exception","reason":"[1:852] [UpdateRequest] failed to parse field [doc]"}],"type":"x_content_parse_exception","reason":"[1:852] [UpdateRequest] failed to parse field [doc]","caused_by":{"type":"json_parse_exception","reason":"Unexpected character ('_' (code 95)): was expecting comma to separate Object entries\n at ...

This is the error message I get.

That suggests the _id is not inside double quotes. Can you post the first 2 lines of what you are posting using curl?

Thank you for the help! I tried using the aggregate filter plugin instead. Not working perfectly yet, but I managed to get all the information from one sessionid in the same document.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.