Couchdb_changes index only doc field from event

Hello Logstash Community!
I want to upgrade elasticsearch to newew version and replace couchdb_river with couchdb_changes plugin
while working this input plugin create events with folowing fields ["doc","doc_as_upsert","@version","@timestamp"] and elasticsearch create documents with this fields, is there any possibility to replace whole event with doc field, or treat doc field as event root?
Thanks in advance!

Unfortunately, there is no way to remove those fields when sending through Logstash.

Logstash will always send @timestamp and @version. And without doc_as_upsert, elasticsearch won't create any "new" docs, as the plugin only sends updates.

when i query elasticsearch _source:{doc:{},doc_as_upsert:{}}. with couchdb_river in source i only get doc field contents

the problem is that i don`t need those fields and I want elasticsearch output to index only doc contents

That's correct. doc_as_upsert exits Logstash, but is only a flag to Elasticsearch. It wouldn't be in the final, indexed document.

You'll have to roll your own solution if you want something different from what Logstash offers. Logstash cannot not send @timestamp and @version.

For what elasticsearch?
for logstash output plugin or for ES?
logstash-elasticsearch-output receives event => {doc,doc_as_upsert,@timestamp,@version} and pass it as is to ES for indexing! So in ES index there are documents with _source: {doc:{},doc_as_upsert:true,@version:"",@timestamp:""}
The question is: Is this a correct behaviour or i am doing something wrong?
If this is correct behaviour why it is so?

All I was trying to do is this
filter {
ruby {
code => "event['doc'].to_hash.each{|k,v| event[k] = v}"
remove_field => ["doc"]
}
}
Thanks can close this

I am about to make the same change after updating to Elasticsearch 2.1.1 (moving from using the CouchDB river to using Logstash to ship data from CouchDB to Elasticsearch). Is there really no way (excluding the above filter) to omit the @version, @timestamp, and doc_as_upsert fields and index what is contained in the 'doc' field into Elasticsearch? The goal is to have the document in Elasticsearch exactly reflect its CouchDB counterpart.

I was struggling with such behavour for two days. Searching around internet didn't help, all i can implement is this.... Speaking off @version @timestamp and doc_as_upsert i think elasticsearch output plugin should omit them while indexing document or you could possibly think of using elasticsearch scripting to remove those fields

couchdb+logstash+elastic

@theuntergeek , do you have any suggestions on where I can go from here?

This is what I get for commenting at 5am when my head is still fuzzy.

I am traveling right now. I'll take a look this evening after I land.

Looking forward to it. Safe travels.

@theuntergeek sorry if i offend you. Maybe we didn't understand each other. I really appreciate your help! And i am looking forward for your expertise.

@alexey, I took a (very) quick look into this last night. The below block from 'logstash-input-couchdb_changes' builds the event, assigns the document to the doc field and deletes the _id and _rev field from the document. This could be easily changed to achieve the desired result - you could add a flag to toggle on/off.

if line['doc']['_deleted']
          hash['@metadata']['action'] = 'delete'
        else
          hash['doc'] = line['doc']
          hash['@metadata']['action'] = 'update'
          hash['doc'].delete('_id')
          hash['doc_as_upsert'] = true
          hash['doc'].delete('_rev') unless @keep_revision   
end

Alternatively, building on your example above, you could alter the event to resemble the couch document with the addition of add_field => { "_id" => "%{[@metadata][_id]}"} to add the _id field back and use the keep_revision on the input plugin. I.e:

input { 
    couchdb_changes {
        ...
        keep_revision => true
    }
}
filter{
    ruby {
        code => "event['doc'].to_hash.each{|k,v| event[k] = v}"
        remove_field => ["doc"]
        remove_field => ["doc_as_upsert"]
        remove_field => ["@version"]
        remove_field => ["@timestamp"]
        add_field => { "_id" => "%{[@metadata][_id]}"}
    }
}

As I understand you will use this solution to migrate to es 2.1?

Possibly. I'm just investigating what's required in updating to ES 2.x from 1.7.3. If there aren't any big gains from updating we will most likely stick with our current configuration (ES1.7.3 using river-couchdb).

Sorry for not responding last night. I think this is actually a regression in the Elasticsearch output since Logstash 1.5.0. It's not using the "doc" field when doing the upsert. More to follow when I'm out of training and can chase it down.

I do not think there is regression within elasticsearch output plugin, it indexes what you pass to it, why should it bother that event has doc field doc_as_upsert field and everything else. In my opinion the problem is within couchdb_changes input plugin. From developer(e.g. my) point of view event is couchdb document everything else should goes to @metadata field so from this build event definition

private
  def build_event(line)
    # In lieu of a codec, build the event here
    line = LogStash::Json.load(line)
    return nil if line.has_key?("last_seq")
    hash = Hash.new
    hash['@metadata'] = { '_id' => line['doc']['_id'] }
    if line['doc']['_deleted']
      hash['@metadata']['action'] = 'delete'
    else
      hash['doc'] = line['doc']
      hash['@metadata']['action'] = 'update'
      hash['doc'].delete('_id')
      hash['doc_as_upsert'] = true
      hash['doc'].delete('_rev') unless @keep_revision
    end
    hash['@metadata']['seq'] = line['seq']
    event = LogStash::Event.new(hash)
    @logger.debug("event", :event => event.to_hash_with_metadata) if @logger.debug?
    event
  end

we should go here

private
  def build_event(line)
    # In lieu of a codec, build the event here
    line = LogStash::Json.load(line)
    return nil if line.has_key?("last_seq")
    hash['@metadata'] = { '_id' => line['doc']['_id'] }
    if line['doc']['_deleted']
      line['doc']['@metadata']{'action' => 'delete', '_id'=>line['doc']['_id'],'seq'=>line['seq']}
    else
      line['doc']['@metadata']{'action' => 'update', '_id'=>line['doc']['_id'],'doc_as_upsert'=>'true','seq'=>line['seq']}
    end
    event = LogStash::Event.new(line['doc'])
    @logger.debug("event", :event => event.to_hash_with_metadata) if @logger.debug?
    event
  end

Changing build_event like this we save couchdb document as is in elasticsearch index and we don`t even need doc_as_upsert and _id fields in @metadata cause if action is not delete we should set elasticsearch output plugin config doc_as_upsert to true anyway and _id we could use from event[_id] cause every couchdb document has _id and with this changes output section of logstash config could be like this:

output {
   if "delete" in [@metadata][action] {
     elasticsearch {
          ...
          document_id => [_id]
          action => "delete"
     }
   } else {
     elasticsearch {
          ...
          document_id=>[_id]
          action => "update"
          doc_as_upsert => 'true'
     }
   }
}

even more build event could be like this

private
  def build_event(line)
    # In lieu of a codec, build the event here
    line = LogStash::Json.load(line)
    return nil if line.has_key?("last_seq")
    event = LogStash::Event.new(line['doc'])
    @logger.debug("event", :event => event.to_hash_with_metadata) if @logger.debug?
    event
  end

and config

output {
   if [_deleted] {
     elasticsearch {
          ...
          document_id => [_id]
          action => "delete"
     }
   } else {
     elasticsearch {
          ...
          document_id=>[_id]
          action => "update"
          doc_as_upsert => 'true'
     }
   }
}