CouchDB plugin set dynamic type for Elasticsearch

Hello

I try to set up couchdb with logstash and elasticsearch. I already managed to get the couchdb.
My documents in the couchDB do have a type attribute and i want this type to be the index type in my elasticsearch.

So my couchdb doc looks like this

{
    _id: "9fa2682844a77faaa9d6c4add0000f2f",
    _rev: "1-1df53d53cf80a563d0e9b0adbb50c916",
    type: "mytype",
   ...
}

My logstash config:

input { 
    couchdb_changes {
    	db => "mydbname"
    	host => "localhost"
        port => 5984
    	codec => "json"
        type => "%{type}"
    }
}
output {
  elasticsearch { 
  	action => "%{[@metadata][action]}"
        document_id => "%{[@metadata][_id]}"
        host => "localhost"
        index => "indexname"
        protocol => "http"
  	host => localhost
        port => 9200   	  	
  }
}

Though my type in elasticsearch is just the string of my type config.
Elasticsearch doc: "{ ... , _type": "%{type}", ...}

Is it possible to get a dynamic type dependent on the type attribute of my couchdb?
If yes - can you help me please? I already tried many different configs though i couldnt accomplish it.

This configuration:

type => "%{type}"

...will not work in an input block.

If field type is in with _id and _rev, that field is, quite likely, in the @metadata field as [@metadata][type]. If so, you'd need to overwrite the Logstash event's type with a mutate replace:

mutate {
  replace => { "type" => "%{[@metadata][type]}" }
}

Turning on the debug log showed me that the type seems not to be in the @metadata field. The type field is not something from the couchDB - its just a custom data attribute i give to my documents by myself.

Actually the log showed something like @data={ "doc" => { "type" .... Though i dont know how to access that. I already tried to configure the elasticsearch output plugin like this

{
    ...
    document_type => "%{[@data][doc][type]}"
    ...
}

That actually wrotes the type %{[@data][doc][type]} so it seems like that didnt work with the sprintf function of the event. I already looked into the sources though sadly i am a really big ruby noob.

If it's a part of your document already, and not part of the metadata, then is it in the event already?

The @data={ "doc" part is in the input plugin. Everything in the "doc" should become part of the event. The type field, if you are adding it yourself, may be getting stomped on by the input plugin if you have a "type" directive in your input block.

Sorry i don't get your answer clearly. I removed the type of the input when i defined it in the output of course. The output type of this plugin would always be a higher priority with elasticsearch output plugin anyway. Its document_type > index_type (deprecated) > type from input > "log".

The type is part of the document. So in the event it is in @data={ "doc" => { "type" => "mySeekedType", ... }}. How could i access it to set my elasticsearch type then? Because all my approaches didnt work.

It seems to me that i cant access it because of the nested structure, unlike in the @metadata field.
Mby i can do a filter like: add_field { "doc" => "%{[@data][doc]}" } ? I will try this tomorrow. Thank you for helping me out btw.

I already was thinking that i fork the logstash-input-couchdb_changes project and add "type" field to the @metadata field, but this should be my last option.

The @data = { "doc" snippet is from within the couchdb_changes plugin, rather than the core Logstash code. "doc" comes from CouchDB, but everything that is in "doc" should become part of the base Logstash event.

What I am looking forā€”in order to help troubleshoot thisā€”is event output (stdout { codec => rubydebug }) so I can see all of the fields. I'm guessing that "type" is not being passed through for some reason. If you are assigning type => "couchdb" or something like that in the input block, that might stomp on whatever you might have in your document. If it doesn't get stomped on, but gets renamed, we'd need to see what field it is in (hence the request for event output).

1 Like

Your answer finally gave me the right direction.
The rubydebug output was:

{
    "doc" => {
	"type" => "mytype",
	...
    },
    "doc_as_upsert" => true,
    "@version" => "1",
    "@timestamp" => "2015-06-11T07:40:24.920Z"
}

So i changed my configuration to:

input { 
  couchdb_changes {
    db => "mydbname"
    host => "localhost"
    port => 5984 		  
  }
}
output {
  elasticsearch { 
    action => "%{[@metadata][action]}"
    document_id => "%{[@metadata][_id]}"
    document_type => "%{[doc][type]}"
    host => "localhost"
    index => "indexname"
    protocol => "http"
    host => localhost
    port => 9200   	  	
  }
}

And it finally worked. i think i didnt try this exact config in the output. I think i lost the track with the many different things i've tried. Thank you very much for your help!

Ok its still not solved completely. Now that i got the dynamic type.

The delete action doesnt delete the document in the elastic index.
Without the document_type config it does work.

UPDATE 1
I found this in the input plugin:

if line['doc']['_deleted']
  hash['@metadata']['action'] = 'delete'
else
  hash['doc'] = line['doc']
  hash['@metadata']['action'] = 'update'
  hash['doc'].delete('_id')
  hash['doc_as_upsert'] = true
  hash['doc'].delete('_rev') unless @keep_revision
end

Could the delete fail since the output cant resolve the type when it is a delete action because the doc is not set and the elasticsearch output deletes the doc with the unresolved type ("%{[doc][type]}")?

UPDATE 2
I dont have access to the document type anymore when it is deleted. Even when i patch the input plugin. :confused:

The line['doc'] does not contain data anymore. Only _id, _rev and the _deleted flag are present.
Is there an other/better solution than to work with an id prefex for each type and then mutating the event?

I suggest using the mutate because it alters the event before the output block. Because the event isn't being altered with document_type => "%{[doc][type]}"ā€”but rather assigns the type manuallyā€”that may be the reason it's not working correctly with the document_type flag.

I'm not sure what you altered in the input plugin, but mutate is the correct way to achieve the result you want. Conditionally testing for the presence of a "type" field in the "doc" is likely performance hindering, but a feature request can be made at the plugin's github repository: https://github.com/logstash-plugins/logstash-input-couchdb_changes

You are sure i can access the type of the document when the doc is deleted? I altered the above code just with one single line and it seemed like that input plugin has not access to the deleted documents attributes.

My quick n' dirty tryout was:

if line['doc']['_deleted']
  hash['@metadata']['action'] = 'delete'
  hash['doc'] = line['doc']
else
  hash['doc'] = line['doc']
  hash['@metadata']['action'] = 'update'
  hash['doc'].delete('_id')
  hash['doc_as_upsert'] = true
  hash['doc'].delete('_rev') unless @keep_revision
end

The rubydebug output showed me that this doc i added to the event only contained the _id, _rev and the _deleted attribute. If there is no data in the document available on delete in the input, how would it be available in the mutate?

You're right. I misunderstood what you had said.

The input plugin currently acts under the assumption that the type will be the same for all documents coming from the CouchDB. This is also because the changes feed doesn't send the document when it sends a "delete", but only the id. Dynamic typing with the Elasticsearch output will break this. I do not know that there can be a work-around for this without involving some rather elaborate checks.

For instance, you could (maybe, not sure it will work) use the elasticsearch filter to query for a document that matches that id in the expected index and extract the type from that and add type with a mutate, and only do this for delete calls.

1 Like

Thank you very mutch! I would never have found this solution since im very new in the elastic ecosystem.
Querying the type from elasticdb works like a charm for now. I hope this wont slow the system too much down when we are in a stage with many millions of data.
My final config looks like this:

input { 
  couchdb_changes {
      db => "mydbname"
      host => "127.0.0.1"
      port => 5984 	
      initial_sequence => 0 #this is only required for the an initial indexing
  }
}
filter {
  mutate {
    add_field => { "action" => "%{[@metadata][action]}" }
  }
  if [action] == 'delete' {
    elasticsearch {
      hosts => ["127.0.0.1"]
      query => "_id:%{[@metadata][_id]}"
      fields => ["type", "type"]
      sort => ""
    }
  } else {
    mutate {
      add_field => { "type" => "%{[doc][type]}" }
    }
  }
}
output {
  elasticsearch { 
    action => "%{[@metadata][action]}"
    document_id => "%{[@metadata][_id]}"
    host => "127.0.0.1"
    index => "myindexname"
    protocol => "http"
    port => 9200   	  	
  }  
  #stdout { codec => rubydebug } #enable this option for debugging purpose
}

Is there something suboptimal in the config? Something that could slow us down unnecessary?

1 Like

It looks good, but there's no reason to do the initial add_field of "action":

if [@metadata][action] == 'delete' {

That saves you from needlessly having to create a field.

1 Like

And as far as performance goes, the Elasticsearch filter might slow down the initial ingest a tiny bit when deletes are encountered. Otherwise it should be able to do many updates per second (don't know how busy your CouchDB is). There might be other ways to optimize the query (which is only executed for deletes), but this should do exactly what you need with no problems.

1 Like

i use same config file, but i have error:

  failed action with response of 404, dropping action: ["update", {:_id=>"a5fdc4d3cc602546b1d28cf75500101b", :_index=>"vendors", :_type=>"%{[doc][type]}", :_routing=>nil}, #<LogStash::Event:0x1038dea7 @metadata_accessors=#<LogStash::Util::Accessors:0x5033d9b0 @store={"_id"=>"a5fdc4d3cc602546b1d28cf75500101b", "action"=>"update", "seq"=>4, "retry_count"=>0}, @lut={"[action]"=>[{"_id"=>"a5fdc4d3cc602546b1d28cf75500101b", "action"=>"update", "seq"=>4, "retry_count"=>0}, "action"], "[_id]"=>[{"_id"=>"a5fdc4d3cc602546b1d28cf75500101b", "action"=>"update", "seq"=>4, "retry_count"=>0}, "_id"]}>, @cancelled=false, @data={"doc"=>{"name"=>"mts", "type"=>nil, "unnamed"=>nil}, "doc_as_upsert"=>true, "@version"=>"1", "@timestamp"=>"2015-11-04T16:07:02.763Z", "action"=>"update", "type"=>"%{[doc][type]}"}, @metadata={"_id"=>"a5fdc4d3cc602546b1d28cf75500101b", "action"=>"update", "seq"=>4, "retry_count"=>0}, @accessors=#<LogStash::Util::Accessors:0x19c32c1b @store={"doc"=>{"name"=>"mts", "type"=>nil, "unnamed"=>nil}, "doc_as_upsert"=>true, "@version"=>"1", "@timestamp"=>"2015-11-04T16:07:02.763Z", "action"=>"update", "type"=>"%{[doc][type]}"}, @lut={"action"=>[{"doc"=>{"name"=>"mts", "type"=>nil, "unnamed"=>nil}, "doc_as_upsert"=>true, "@version"=>"1", "@timestamp"=>"2015-11-04T16:07:02.763Z", "action"=>"update", "type"=>"%{[doc][type]}"}, "action"], "[action]"=>[{"doc"=>{"name"=>"mts", "type"=>nil, "unnamed"=>nil}, "doc_as_upsert"=>true, "@version"=>"1", "@timestamp"=>"2015-11-04T16:07:02.763Z", "action"=>"update", "type"=>"%{[doc][type]}"}, "action"], "[doc][type]"=>[{"name"=>"mts", "type"=>nil, "unnamed"=>nil}, "type"], "type"=>[{"doc"=>{"name"=>"mts", "type"=>nil, "unnamed"=>nil}, "doc_as_upsert"=>true, "@version"=>"1", "@timestamp"=>"2015-11-04T16:07:02.763Z", "action"=>"update", "type"=>"%{[doc][type]}"}, "type"]}>>] {:level=>:warn}
    failed action with response of 404, dropping action: ["update", {:_id=>"a5fdc4d3cc602546b1d28cf7550010ac", :_index=>"vendors", :_type=>"%{[doc][type]}", :_routing=>nil}, #<LogStash::Event:0x4bebf429 @metadata_accessors=#<LogStash::Util::Accessors:0x6e11c7b7 @store={"_id"=>"a5fdc4d3cc602546b1d28cf7550010ac", "action"=>"update", "seq"=>6, "retry_count"=>0}, @lut={"[action]"=>[{"_id"=>"a5fdc4d3cc602546b1d28cf7550010ac", "action"=>"update", "seq"=>6, "retry_count"=>0}, "action"], "[_id]"=>[{"_id"=>"a5fdc4d3cc602546b1d28cf7550010ac", "action"=>"update", "seq"=>6, "retry_count"=>0}, "_id"]}>, @cancelled=false, @data={"doc"=>{}, "doc_as_upsert"=>true, "@version"=>"1", "@timestamp"=>"2015-11-04T16:07:02.766Z", "action"=>"update", "type"=>"%{[doc][type]}"}, @metadata={"_id"=>"a5fdc4d3cc602546b1d28cf7550010ac", "action"=>"update", "seq"=>6, "retry_count"=>0}, @accessors=#<LogStash::Util::Accessors:0x71c10994 @store={"doc"=>{}, "doc_as_upsert"=>true, "@version"=>"1", "@timestamp"=>"2015-11-04T16:07:02.766Z", "action"=>"update", "type"=>"%{[doc][type]}"}, @lut={"action"=>[{"doc"=>{}, "doc_as_upsert"=>true, "@version"=>"1", "@timestamp"=>"2015-11-04T16:07:02.766Z", "action"=>"update", "type"=>"%{[doc][type]}"}, "action"], "[action]"=>[{"doc"=>{}, "doc_as_upsert"=>true, "@version"=>"1", "@timestamp"=>"2015-11-04T16:07:02.766Z", "action"=>"update", "type"=>"%{[doc][type]}"}, "action"], "[doc][type]"=>[{}, "type"], "type"=>[{"doc"=>{}, "doc_as_upsert"=>true, "@version"=>"1", "@timestamp"=>"2015-11-04T16:07:02.766Z", "action"=>"update", "type"=>"%{[doc][type]}"}, "type"]}>>] {:level=>:warn}

When i create document in couchdb

This is concerning. it seems that there is no type subfield of doc. This is the likely cause of the 404 error. You must supply a valid type or the document cannot be created or updated.

thanks, i change config, and now it works :slight_smile:

I get the below error when using the above config file. Any thoughts?

ā†[31mInvalid setting for elasticsearch output plugin:

output {
elasticsearch {
# This setting must be a ["index", "delete", "create", "update"]
# Expected one of ["index", "delete", "create", "update"], got ["%{[@metadata][action]}"]
action => "%{[@metadata][action]}"
...
}
} {:level=>:error}ā†[0m

The option validation code runs on the raw, non-interpolated value. This seems like a bug. The code does perform %{name} interpolation on the action option (so I guess the intention is that your configuration should work), but the code never gets that far since it gets rejected by the validation.

Is it possible to skip the validation?