Datastream gets no data

Hi,

Being new to the datastream subject, I tried setting up a datastream using the documentation here: https://www.elastic.co/guide/en/elasticsearch/reference/current/set-up-a-data-stream.html and index towards it.

However, My logstash (7.9.1) is not able to deliver any data, even though I can see the datastream index is created. What am I doing wrong?

The message in logstash logging:

[2020-09-30T15:42:30,681][WARN ][logstash.outputs.elasticsearch][main][fa72e9983cda258dabc20ca0c215f0dab291c9be0d9e9affbd052bfc5d89e274] Could not index event to Elasticsearch. {:status=>400, :action=>["index", {:_id=>"7620e77ee0e88c1f10558a0fe06cd42fc0d28044", :_index=>"agl-api-ds", :routing=>nil, :_type=>"_doc"}, #<LogStash::Event:0x3347c5b>], :response=>{"index"=>{"_index"=>"agl-api-ds", "_type"=>"_doc", "_id"=>"7620e77ee0e88c1f10558a0fe06cd42fc0d28044", "status"=>400, "error"=>{"type"=>"illegal_argument_exception", "reason"=>"only write ops with an op_type of create are allowed in data streams"}}}}

logstash output (metadata&indexname are translated to 'agl-api' ):

output {
        if "api-log" in [tags] or "access-log" in [tags] or "tls-proxy" in [tags] {
                elasticsearch {
                        hosts => ["esserver1.servers.local:9200","esserver2.servers.local:9200"]
                        document_id => "%{[@metadata][fingerprint]}"
                        index => "%{[@metadata][indexbasename]}-ds"
                        sniffing => false
                }
        }
}

I see a hidden index created for the datastream:

I have this template fir the datastream:

The request is:

PUT _index_template/agl-api-datastream-tmpl
{
  "version": 1,
  "priority": 200,
  "template": {
    "settings": {
      "index": {
        "lifecycle": {
          "name": "agl-data-stream-policy"
        },
        "refresh_interval": "30s",
        "number_of_shards": "5",
        "number_of_replicas": "1"
      }
    },
    "mappings": {
      "_routing": {
        "required": false
      },
      "_source": {
        "excludes": [],
        "includes": [],
        "enabled": true
      },
      "dynamic": false,
      "properties": {
        "ms-host": {
          "norms": false,
          "type": "keyword"
        },
        "api-version": {
          "norms": false,
          "type": "keyword"
        },
        "agent": {
          "dynamic": true,
          "type": "object",
          "enabled": true,
          "properties": {
            "hostname": {
              "norms": false,
              "type": "keyword"
            },
            "name": {
              "norms": false,
              "type": "text"
            },
            "version": {
              "norms": false,
              "type": "text"
            }
          }
        },
        "api-platform": {
          "norms": false,
          "type": "keyword"
        },
        "api-url": {
          "norms": false,
          "type": "text"
        },
        "ms-error-message": {
          "norms": false,
          "type": "text"
        },
        "api-nanoservice": {
          "norms": false,
          "type": "keyword"
        },
        "source": {
          "norms": false,
          "type": "text"
        },
        "platform": {
          "norms": false,
          "type": "keyword"
        },
        "ms-result-code": {
          "norms": false,
          "type": "text"
        },
        "api-brand": {
          "norms": false,
          "type": "keyword"
        },
        "transaction-id": {
          "norms": false,
          "type": "text"
        },
        "@version": {
          "norms": false,
          "type": "keyword"
        },
        "host": {
          "norms": false,
          "type": "keyword"
        },
        "ms-request-body": {
          "norms": false,
          "type": "text"
        },
        "thread-id": {
          "type": "integer"
        },
        "additional-data": {
          "norms": false,
          "type": "text"
        },
        "log-message": {
          "norms": false,
          "type": "text"
        },
        "ms-url": {
          "norms": false,
          "type": "text"
        },
        "api-type": {
          "norms": false,
          "type": "keyword"
        },
        "app-id": {
          "norms": false,
          "type": "keyword"
        },
        "offset": {
          "type": "long"
        },
        "event-type": {
          "norms": false,
          "type": "keyword"
        },
        "user-id": {
          "norms": false,
          "type": "text"
        },
        "client-ip": {
          "type": "ip"
        },
        "prospector": {
          "type": "object",
          "properties": {
            "type": {
              "norms": false,
              "type": "text"
            }
          }
        },
        "api-query-string": {
          "norms": false,
          "type": "text"
        },
        "message": {
          "norms": false,
          "type": "text"
        },
        "tags": {
          "norms": false,
          "type": "text"
        },
        "api-tenant": {
          "norms": false,
          "type": "keyword"
        },
        "@timestamp": {
          "type": "date"
        },
        "ms-result": {
          "norms": false,
          "type": "keyword"
        },
        "log-level": {
          "norms": false,
          "type": "keyword"
        },
        "execution-time": {
          "type": "integer"
        },
        "fields": {
          "type": "object",
          "properties": {
            "environment": {
              "norms": false,
              "type": "keyword"
            }
          }
        },
        "http-verb": {
          "norms": false,
          "type": "keyword"
        },
        "session-id": {
          "norms": false,
          "type": "text"
        },
        "username": {
          "norms": false,
          "type": "text"
        }
      }
    }
  },
  "index_patterns": [
    "agl-api-ds"
  ],
  "data_stream": {},
  "composed_of": []
}

And there is this 1 lifecycle policy nicely connected to the hidden index.

So somehow everything looks to me as ok and yet I cannot get datainto it.

What am I doing wrong?
For the record, when I index to a different indexname (non-existing) that index is created and indeed filling with data, so the connection is OK. It's something with that datastream.

Any help is appreciated

I learned that I actually should use a different output plugin in Logstash, so I changed the output conf to:

output {
        if "api-log" in [tags] or "access-log" in [tags] or "tls-proxy" in [tags] {
                 elasticsearch_data_stream{
                        hosts => ["esserver1.servers.local:9200","esserver2.servers.local:9200"]
                        document_id => "%{[@metadata][fingerprint]}"
                        index => "%{[@metadata][indexbasename]}-ds"
                        sniffing => false
                }
        }
}

However this gave:

[2020-09-30T16:02:52,175][ERROR][logstash.agent           ] Failed to execute action {:action=>LogStash::PipelineAction::Create/pipeline_id:main, :exception=>"Java::JavaLang::IllegalStateException", :message=>"Unable to configure plugins: (PluginLoadingError) Couldn't find any output plugin named 'elasticsearch_data_stream'. Are you sure this is correct? Trying to load the elasticsearch_data_stream output plugin resulted in this error: no such file to load -- logstash/outputs/elasticsearch_data_stream", :backtrace=>["org.logstash.config.ir.CompiledPipeline.<init>(CompiledPipeline.java:119)", "org.logstash.execution.JavaBasePipelineExt.initialize(JavaBasePipelineExt.java:82)", "org.logstash.execution.JavaBasePipelineExt$INVOKER$i$1$0$initialize.call(JavaBasePipelineExt$INVOKER$i$1$0$initialize.gen)", "org.jruby.internal.runtime.methods.JavaMethod$JavaMethodN.call(JavaMethod.java:837)", "org.jruby.ir.runtime.IRRuntimeHelpers.instanceSuper(IRRuntimeHelpers.java:1169)", "org.jruby.ir.runtime.IRRuntimeHelpers.instanceSuperSplatArgs(IRRuntimeHelpers.java:1156)", "org.jruby.ir.targets.InstanceSuperInvokeSite.invoke(InstanceSuperInvokeSite.java:39)", "usr.share.logstash.logstash_minus_core.lib.logstash.java_pipeline.RUBY$method$initialize$0(/usr/share/logstash/logstash-core/lib/logstash/java_pipeline.rb:44)", "org.jruby.internal.runtime.methods.CompiledIRMethod.call(CompiledIRMethod.java:80)", "org.jruby.internal.runtime.methods.MixedModeIRMethod.call(MixedModeIRMethod.java:70)", "org.jruby.runtime.callsite.CachingCallSite.cacheAndCall(CachingCallSite.java:332)", "org.jruby.runtime.callsite.CachingCallSite.call(CachingCallSite.java:86)", "org.jruby.RubyClass.newInstance(RubyClass.java:939)", "org.jruby.RubyClass$INVOKER$i$newInstance.call(RubyClass$INVOKER$i$newInstance.gen)", "org.jruby.ir.targets.InvokeSite.invoke(InvokeSite.java:207)", "usr.share.logstash.logstash_minus_core.lib.logstash.pipeline_action.create.RUBY$method$execute$0(/usr/share/logstash/logstash-core/lib/logstash/pipeline_action/create.rb:52)", "usr.share.logstash.logstash_minus_core.lib.logstash.pipeline_action.create.RUBY$method$execute$0$__VARARGS__(/usr/share/logstash/logstash-core/lib/logstash/pipeline_action/create.rb)", "org.jruby.internal.runtime.methods.CompiledIRMethod.call(CompiledIRMethod.java:80)", "org.jruby.internal.runtime.methods.MixedModeIRMethod.call(MixedModeIRMethod.java:70)", "org.jruby.ir.targets.InvokeSite.invoke(InvokeSite.java:207)", "usr.share.logstash.logstash_minus_core.lib.logstash.agent.RUBY$block$converge_state$2(/usr/share/logstash/logstash-core/lib/logstash/agent.rb:357)", "org.jruby.runtime.CompiledIRBlockBody.callDirect(CompiledIRBlockBody.java:138)", "org.jruby.runtime.IRBlockBody.call(IRBlockBody.java:58)", "org.jruby.runtime.IRBlockBody.call(IRBlockBody.java:52)", "org.jruby.runtime.Block.call(Block.java:139)", "org.jruby.RubyProc.call(RubyProc.java:318)", "org.jruby.internal.runtime.RubyRunnable.run(RubyRunnable.java:105)", "java.base/java.lang.Thread.run(Thread.java:834)"]}
[2020-09-30T16:02:52,183][ERROR][logstash.agent           ] An exception happened when converging configuration {:exception=>LogStash::Error, :message=>"Don't know how to handle `Java::JavaLang::IllegalStateException` for `PipelineAction::Create<main>`"}
[2020-09-30T16:02:52,223][FATAL][logstash.runner          ] An unexpected error occurred! {:error=>#<LogStash::Error: Don't know how to handle `Java::JavaLang::IllegalStateException` for `PipelineAction::Create<main>`>, :backtrace=>["org/logstash/execution/ConvergeResultExt.java:129:in `create'", "org/logstash/execution/ConvergeResultExt.java:57:in `add'", "/usr/share/logstash/logstash-core/lib/logstash/agent.rb:370:in `block in converge_state'"]}
[2020-09-30T16:02:52,241][ERROR][org.logstash.Logstash    ] java.lang.IllegalStateException: Logstash stopped processing because of an error: (SystemExit) exit

Got the idea from here:

But appearantly this is not a standard part of Logstash?

1 Like

OK.

Seems to work partly now.
Reverted to elasticsearch output plugin and set the 'action'to "create" (i.s.o.the default 'index')

I recieve data now.
Hoewever, I see these to lines in logstash log:

[2020-09-30T16:19:25,834][WARN ][logstash.outputs.elasticsearch][main][ef8f8fd38026ee6b4cb399a9d9dcfe8440065e0aca25750d4cb1f8bb8192b208] Failed action. {:status=>409, :action=>["create", {:_id=>"19319400d8fbb7c0069c70df9b178ccc3860dc2d", :_index=>"agl-api-ds", :routing=>nil, :_type=>"_doc"}, #<LogStash::Event:0x13738519>], :response=>{"create"=>{"_index"=>".ds-agl-api-ds-000001", "_type"=>"_doc", "_id"=>"19319400d8fbb7c0069c70df9b178ccc3860dc2d", "status"=>409, "error"=>{"type"=>"version_conflict_engine_exception", "reason"=>"[19319400d8fbb7c0069c70df9b178ccc3860dc2d]: version conflict, document already exists (current version [1])", "index_uuid"=>"h9zN5pZvSgm8ltTfdXesRg", "shard"=>"0", "index"=>".ds-agl-api-ds-000001"}}}}
[2020-09-30T16:19:33,327][WARN ][logstash.outputs.elasticsearch][main][ef8f8fd38026ee6b4cb399a9d9dcfe8440065e0aca25750d4cb1f8bb8192b208] Failed action. {:status=>409, :action=>["create", {:_id=>"0645733540c1642af821aa78a270114dbe1603e1", :_index=>"agl-api-ds", :routing=>nil, :_type=>"_doc"}, #<LogStash::Event:0x16b63ce6>], :response=>{"create"=>{"_index"=>".ds-agl-api-ds-000001", "_type"=>"_doc", "_id"=>"0645733540c1642af821aa78a270114dbe1603e1", "status"=>409, "error"=>{"type"=>"version_conflict_engine_exception", "reason"=>"[0645733540c1642af821aa78a270114dbe1603e1]: version conflict, document already exists (current version [1])", "index_uuid"=>"h9zN5pZvSgm8ltTfdXesRg", "shard"=>"2", "index"=>".ds-agl-api-ds-000001"}}}}


How can I prevent these?

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.