Logstash not writing data to Elasticsearch data streams

We are using logstash version 7.4.2 currently. For mitigating threats of log4j vulnerability, I have upgraded the Logstash version to 7.16.1 as advised by the Elastic team.
I have also changed the Elasticsearch output plugin in logstash pipelines accordingly so that they write the processed logs to the data stream in Elasticsearch.

output{
        elasticsearch {
                hosts => ["${ELASTICSEARCH_HOST}:${ELASTICSEARCH_PORT}"]
                data_stream => "true"
                data_stream_type => "logs"
                data_stream_dataset => "${APP_STREAM_INDEX}"
                data_stream_namespace => "default"
                action => "create"
                user => "${ELASTICSEARCH_INSERT_USERNAME}"
                password => "${ELASTICSEARCH_INSERT_PASSWORD}"
                ssl => "true"
                ssl_certificate_verification => "true"
                cacert => "${LOGSTASH_BASE}/certificates/ABC-CA-DE_pem.cer"
        }
}

But after restarting the logstash, all the logstash pipelines are failing with below errors.

bidos-logstash_1  | [2021-12-16T20:02:47,358][WARN ][logstash.outputs.elasticsearch][vasapi_stream][0ea35b21eb8496261db65f0592c7b832e7f25293a630aa471c3012b6581fa3d5] Could not index event to Elasticsearch. {:status=>400, :action=>["create", {:_id=>nil, :_index=>"logs-vasapi_stream-default", :routing=>nil}, {"logger_name"=>"org.apache.spark.sql.execution.streaming.MicroBatchExecution", "port"=>53802, "host"=>"si-01.de.abc.com", "message"=>"Committed offsets for batch 60052. Metadata OffsetSeqMetadata(0,1639683435003,Map(spark.sql.streaming.stateStore.providerClass -> org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider, spark.sql.streaming.flatMapGroupsWithState.stateFormatVersion -> 2, spark.sql.streaming.multipleWatermarkPolicy -> min, spark.sql.streaming.aggregation.stateFormatVersion -> 2, spark.sql.shuffle.partitions -> 5))", "thread_name"=>"stream execution thread for [id = 2c69de27-aef6-4748-ab0a-d814aeb011c9, runId = 6292ab5f-1b21-470c-8fce-00e3361ad561]", "@version"=>"1", "@timestamp"=>2021-12-16T19:37:15.065Z, "level_value"=>20000, "applicationname"=>"vasapi.SI01.proc.main.mduss6.application", "applicationid"=>"application_1639646017711_0253", "tags"=>["_grokparsefailure"], "level"=>"INFO", "data_stream"=>{"type"=>"logs", "dataset"=>"vasapi_stream", "namespace"=>"default"}}], :response=>{"create"=>{"_index"=>".ds-logs-vasapi_stream-default-2021.12.16-000001", "_type"=>"_doc", "_id"=>"2k_XxH0BfV2hHkp_Cw6P", "status"=>400, "error"=>{"type"=>"mapper_parsing_exception", "reason"=>"object mapping for [host] tried to parse field [host] as object, but found a concrete value"}}}}
bidos-logstash_1  | [2021-12-16T20:02:47,358][WARN ][logstash.outputs.elasticsearch][vasapi_stream][0ea35b21eb8496261db65f0592c7b832e7f25293a630aa471c3012b6581fa3d5] Could not index event to Elasticsearch. {:status=>400, :action=>["create", {:_id=>nil, :_index=>"logs-vasapi_stream-default", :routing=>nil}, {"logger_name"=>"org.apache.spark.sql.execution.streaming.CheckpointFileManager", "port"=>53802, "host"=>"si-101l.de.bosch.com", "message"=>"Renamed temp file hdfs://aesi01dev/proc/vasapi/mes/parsing_20210226-backup_20210226/checkpoint/source-data-json_20210226/offsets/.87363.d5da0a58-0c08-4f41-a301-ff8bd6553f24.tmp to hdfs://aesi01dev/proc/vasapi/mes/parsing_20210226-backup_20210226/checkpoint/source-data-json_20210226/offsets/87363", "thread_name"=>"stream execution thread for [id = 01f58740-ef5e-4ccd-9f00-4d6ff5a4f387, runId = db842470-6c59-46d4-8cad-29892ab73dc1]", "@version"=>"1", "@timestamp"=>2021-12-16T19:37:15.067Z, "level_value"=>20000, "applicationname"=>"vasapi.SI01.proc.main.mduss6.application", "applicationid"=>"application_1639646017711_0253", "tags"=>["_grokparsefailure"], "level"=>"INFO", "data_stream"=>{"type"=>"logs", "dataset"=>"vasapi_stream", "namespace"=>"default"}}], :response=>{"create"=>{"_index"=>".ds-logs-vasapi_stream-default-2021.12.16-000001", "_type"=>"_doc", "_id"=>"20_XxH0BfV2hHkp_Cw6P", "status"=>400, "error"=>{"type"=>"mapper_parsing_exception", "reason"=>"object mapping for [host] tried to parse field [host] as object, but found a concrete value"}}}}
bidos-logstash_1  | [2021-12-16T20:02:47,358][WARN ][logstash.outputs.elasticsearch][vasapi_stream][0ea35b21eb8496261db65f0592c7b832e7f25293a630aa471c3012b6581fa3d5] Could not index event to Elasticsearch. {:status=>400, :action=>["create", {:_id=>nil, :_index=>"logs-vasapi_stream-default", :routing=>nil}, {"logger_name"=>"org.apache.spark.sql.execution.streaming.MicroBatchExecution", "port"=>53802, "host"=>"si-101l.de.bosch.com", "message"=>"Committed offsets for batch 87363. Metadata OffsetSeqMetadata(0,1639683435003,Map(spark.sql.streaming.stateStore.providerClass -> org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider, spark.sql.streaming.flatMapGroupsWithState.stateFormatVersion -> 2, spark.sql.streaming.multipleWatermarkPolicy -> min, spark.sql.streaming.aggregation.stateFormatVersion -> 2, spark.sql.shuffle.partitions -> 5))", "thread_name"=>"stream execution thread for [id = 01f58740-ef5e-4ccd-9f00-4d6ff5a4f387, runId = db842470-6c59-46d4-8cad-29892ab73dc1]", "@version"=>"1", "@timestamp"=>2021-12-16T19:37:15.067Z, "level_value"=>20000, "applicationname"=>"vasapi.SI01.proc.main.mduss6.application", "applicationid"=>"application_1639646017711_0253", "tags"=>["_grokparsefailure"], "level"=>"INFO", "data_stream"=>{"type"=>"logs", "dataset"=>"vasapi_stream", "namespace"=>"default"}}], :response=>{"create"=>{"_index"=>".ds-logs-vasapi_stream-default-2021.12.16-000001", "_type"=>"_doc", "_id"=>"3E_XxH0BfV2hHkp_Cw6P", "status"=>400, "error"=>{"type"=>"mapper_parsing_exception", "reason"=>"object mapping for [host] tried to parse field [host] as object, but found a concrete value"}}}}
bidos-logstash_1  | [2021-12-16T20:02:47,358][WARN ][logstash.outputs.elasticsearch][vasapi_stream][0ea35b21eb8496261db65f0592c7b832e7f25293a630aa471c3012b6581fa3d5] Could not index event to Elasticsearch. {:status=>400, :action=>["create", {:_id=>nil, :_index=>"logs-vasapi_stream-default", :routing=>nil}, {"logger_name"=>"org.apache.spark.sql.execution.streaming.CheckpointFileManager", "port"=>53802, "host"=>"si-101l.de.bosch.com", "message"=>"Renamed temp file hdfs://aesi01dev/proc/vasapi/mes/parsing_20210226/checkpoint/mes-results_20210226/offsets/.59079.ac422bb4-ab6a-4356-821a-cfbe148b1d78.tmp to hdfs://aesi01dev/proc/vasapi/mes/parsing_20210226/checkpoint/mes-results_20210226/offsets/59079", "thread_name"=>"stream execution thread for [id = 29a022a7-08ad-4d6a-ab96-e99a4d1e8b2c, runId = c1a45931-7b2e-413d-af63-3a6ed50590de]", "@version"=>"1", "@timestamp"=>2021-12-16T19:37:15.076Z, "level_value"=>20000, "applicationname"=>"vasapi.SI01.proc.main.mduss6.application", "applicationid"=>"application_1639646017711_0253", "tags"=>["_grokparsefailure"], "level"=>"INFO", "data_stream"=>{"type"=>"logs", "dataset"=>"vasapi_stream", "namespace"=>"default"}}], :response=>{"create"=>{"_index"=>".ds-logs-vasapi_stream-default-2021.12.16-000001", "_type"=>"_doc", "_id"=>"3U_XxH0BfV2hHkp_Cw6P", "status"=>400, "error"=>{"type"=>"mapper_parsing_exception", "reason"=>"object mapping for [host] tried to parse field [host] as object, but found a concrete value"}}}}
bidos-logstash_1  | [2021-12-16T20:02:47,358][WARN ][logstash.outputs.elasticsearch][vasapi_stream][0ea35b21eb8496261db65f0592c7b832e7f25293a630aa471c3012b6581fa3d5] Could not index event to Elasticsearch. {:status=>400, :action=>["create", {:_id=>nil, :_index=>"logs-vasapi_stream-default", :routing=>nil}, {"logger_name"=>"org.apache.spark.sql.execution.streaming.MicroBatchExecution", "port"=>53802, "host"=>"si-101l.de.bosch.com", "message"=>"Committed offsets for batch 59079. Metadata OffsetSeqMetadata(0,1639683435003,Map(spark.sql.streaming.stateStore.providerClass -> org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider, spark.sql.streaming.flatMapGroupsWithState.stateFormatVersion -> 2, spark.sql.streaming.multipleWatermarkPolicy -> min, spark.sql.streaming.aggregation.stateFormatVersion -> 2, spark.sql.shuffle.partitions -> 5))", "thread_name"=>"stream execution thread for [id = 29a022a7-08ad-4d6a-ab96-e99a4d1e8b2c, runId = c1a45931-7b2e-413d-af63-3a6ed50590de]", "@version"=>"1", "@timestamp"=>2021-12-16T19:37:15.076Z, "level_value"=>20000, "applicationname"=>"vasapi.SI01.proc.main.mduss6.application", "applicationid"=>"application_1639646017711_0253", "tags"=>["_grokparsefailure"], "level"=>"INFO", "data_stream"=>{"type"=>"logs", "dataset"=>"vasapi_stream", "namespace"=>"default"}}], :response=>{"create"=>{"_index"=>".ds-logs-vasapi_stream-default-2021.12.16-000001", "_type"=>"_doc", "_id"=>"3k_XxH0BfV2hHkp_Cw6P", "status"=>400, "error"=>{"type"=>"mapper_parsing_exception", "reason"=>"object mapping for [host] tried to parse field [host] as object, but found a concrete value"}}}}
bidos-logstash_1  | [2021-12-16T20:02:47,359][WARN ][logstash.outputs.elasticsearch][vasapi_stream][0ea35b21eb8496261db65f0592c7b832e7f25293a630aa471c3012b6581fa3d5] Could not index event to Elasticsearch. {:status=>400, :action=>["create", {:_id=>nil, :_index=>"logs-vasapi_stream-default", :routing=>nil}, {"logger_name"=>"org.apache.spark.sql.execution.streaming.IncrementalExecution", "port"=>53802, "host"=>"si-101l.de.bosch.com", "message"=>"Current batch timestamp = 1639683435003", "thread_name"=>"stream execution thread for [id = 01f58740-ef5e-4ccd-9f00-4d6ff5a4f387, runId = db842470-6c59-46d4-8cad-29892ab73dc1]", "@version"=>"1", "@timestamp"=>2021-12-16T19:37:15.147Z, "level_value"=>20000, "applicationname"=>"vasapi.SI01.proc.main.mduss6.application", "applicationid"=>"application_1639646017711_0253", "tags"=>["_grokparsefailure"], "level"=>"INFO", "data_stream"=>{"type"=>"logs", "dataset"=>"vasapi_stream", "namespace"=>"default"}}], :response=>{"create"=>{"_index"=>".ds-logs-vasapi_stream-default-2021.12.16-000001", "_type"=>"_doc", "_id"=>"30_XxH0BfV2hHkp_Cw6P", "status"=>400, "error"=>{"type"=>"mapper_parsing_exception", "reason"=>"object mapping for [host] tried to parse field [host] as object, but found a concrete value"}}}}
bidos-logstash_1  | [2021-12-16T20:02:47,359][WARN ][logstash.outputs.elasticsearch][vasapi_stream][0ea35b21eb8496261db65f0592c7b832e7f25293a630aa471c3012b6581fa3d5] Could not index event to Elasticsearch. {:status=>400, :action=>["create", {:_id=>nil, :_index=>"logs-vasapi_stream-default", :routing=>nil}, {"logger_name"=>"org.apache.spark.sql.execution.streaming.CheckpointFileManager", "port"=>53802, "host"=>"si-101l.de.bosch.com", "message"=>"Writing atomically to /proc/vasapi/mes/parsing_20210226-backup_20210226/source-data-json_20210226/_spark_metadata/87363 using temp file /proc/vasapi/mes/parsing_20210226-backup_20210226/source-data-json_20210226/_spark_metadata/.87363.aab81a9b-22a9-4e4d-bcae-38438a8a373c.tmp", "thread_name"=>"stream execution thread for [id = 01f58740-ef5e-4ccd-9f00-4d6ff5a4f387, runId = db842470-6c59-46d4-8cad-29892ab73dc1]", "@version"=>"1", "@timestamp"=>2021-12-16T19:37:15.587Z, "level_value"=>20000, "applicationname"=>"vasapi.SI01.proc.main.mduss6.application", "applicationid"=>"application_1639646017711_0253", "tags"=>["_grokparsefailure"], "level"=>"INFO", "data_stream"=>{"type"=>"logs", "dataset"=>"vasapi_stream", "namespace"=>"default"}}], :response=>{"create"=>{"_index"=>".ds-logs-vasapi_stream-default-2021.12.16-000001", "_type"=>"_doc", "_id"=>"4E_XxH0BfV2hHkp_Cw6P", "status"=>400, "error"=>{"type"=>"mapper_parsing_exception", "reason"=>"object mapping for [host] tried to parse field [host] as object, but found a concrete value"}}}}
bidos-logstash_1  | [2021-12-16T20:02:47,359][WARN ][logstash.outputs.elasticsearch][vasapi_stream][0ea35b21eb8496261db65f0592c7b832e7f25293a630aa471c3012b6581fa3d5] Could not index event to Elasticsearch. {:status=>400, :action=>["create", {:_id=>nil, :_index=>"logs-vasapi_stream-default", :routing=>nil}, {"logger_name"=>"org.apache.spark.sql.execution.streaming.CheckpointFileManager", "port"=>53802, "host"=>"si-101l.de.bosch.com", "message"=>"Renamed temp file /proc/vasapi/mes/parsing_20210226-backup_20210226/source-data-json_20210226/_spark_metadata/.87363.aab81a9b-22a9-4e4d-bcae-38438a8a373c.tmp to /proc/vasapi/mes/parsing_20210226-backup_20210226/source-data-json_20210226/_spark_metadata/87363", "thread_name"=>"stream execution thread for [id = 01f58740-ef5e-4ccd-9f00-4d6ff5a4f387, runId = db842470-6c59-46d4-8cad-29892ab73dc1]", "@version"=>"1", "@timestamp"=>2021-12-16T19:37:15.635Z, "level_value"=>20000, "applicationname"=>"vasapi.SI01.proc.main.mduss6.application", "applicationid"=>"application_1639646017711_0253", "tags"=>["_grokparsefailure"], "level"=>"INFO", "data_stream"=>{"type"=>"logs", "dataset"=>"vasapi_stream", "namespace"=>"default"}}], :response=>{"create"=>{"_index"=>".ds-logs-vasapi_stream-default-2021.12.16-000001", "_type"=>"_doc", "_id"=>"4U_XxH0BfV2hHkp_Cw6P", "status"=>400, "error"=>{"type"=>"mapper_parsing_exception", "reason"=>"object mapping for [host] tried to parse field [host] as object, but found a concrete value"}}}}
bidos-logstash_1  | [2021-12-16T20:02:47,359][WARN ][logstash.outputs.elasticsearch][vasapi_stream][0ea35b21eb8496261db65f0592c7b832e7f25293a630aa471c3012b6581fa3d5] Could not index event to Elasticsearch. {:status=>400, :action=>["create", {:_id=>nil, :_index=>"logs-vasapi_stream-default", :routing=>nil}, {"logger_name"=>"org.apache.spark.sql.execution.streaming.FileStreamSinkLog", "port"=>53802, "host"=>"si-101l.de.bosch.com", "message"=>"Current compact batch id = 87363 min compaction batch id to delete = 87259", "thread_name"=>"stream execution thread for [id = 01f58740-ef5e-4ccd-9f00-4d6ff5a4f387, runId = db842470-6c59-46d4-8cad-29892ab73dc1]", "@version"=>"1", "@timestamp"=>2021-12-16T19:37:15.635Z, "level_value"=>20000, "applicationname"=>"vasapi.SI01.proc.main.mduss6.application", "applicationid"=>"application_1639646017711_0253", "tags"=>["_grokparsefailure"], "level"=>"INFO", "data_stream"=>{"type"=>"logs", "dataset"=>"vasapi_stream", "namespace"=>"default"}}], :response=>{"create"=>{"_index"=>".ds-logs-vasapi_stream-default-2021.12.16-000001", "_type"=>"_doc", "_id"=>"4k_XxH0BfV2hHkp_Cw6P", "status"=>400, "error"=>{"type"=>"mapper_parsing_exception", "reason"=>"object mapping for [host] tried to parse field [host] as object, but found a concrete value"}}}}
bidos-logstash_1  | [2021-12-16T20:02:47,359][WARN ][logstash.outputs.elasticsearch][vasapi_stream][0ea35b21eb8496261db65f0592c7b832e7f25293a630aa471c3012b6581fa3d5] Could not index event to Elasticsearch. {:status=>400, :action=>["create", {:_id=>nil, :_index=>"logs-vasapi_stream-default", :routing=>nil}, {"logger_name"=>"org.apache.spark.sql.execution.streaming.ManifestFileCommitProtocol", "port"=>53802, "host"=>"si-101l.de.bosch.com", "message"=>"Committed batch 87363", "thread_name"=>"stream execution thread for [id = 01f58740-ef5e-4ccd-9f00-4d6ff5a4f387, runId = db842470-6c59-46d4-8cad-29892ab73dc1]", "@version"=>"1", "@timestamp"=>2021-12-16T19:37:15.643Z, "level_value"=>20000, "applicationname"=>"vasapi.SI01.proc.main.mduss6.application", "applicationid"=>"application_1639646017711_0253", "tags"=>["_grokparsefailure"], "level"=>"INFO", "data_stream"=>{"type"=>"logs", "dataset"=>"vasapi_stream", "namespace"=>"default"}}], :response=>{"create"=>{"_index"=>".ds-logs-vasapi_stream-default-2021.12.16-000001", "_type"=>"_doc", "_id"=>"40_XxH0BfV2hHkp_Cw6P", "status"=>400, "error"=>{"type"=>"mapper_parsing_exception", "reason"=>"object mapping for [host] tried to parse field [host] as object, but found a concrete value"}}}}

Can please anybody help as all our logstash pipelines in production are failing after we have upgraded to Logstash version 7.16.1.

This is not a logstash issue it seems that your index mapping is different in the index for elasticsearch. This is because the new version of logstash is likely trying to send in a host (host.*) object instead of just a host string field for some reason.

However, you can use logstash to try and fix this a number of different ways.

  1. Remove the host field
  2. Rename the host field to something like host.name

HI @AquaX
I have renamed the field by adding the following lines in filter plugin

mutate {
         rename => { "host" => "[host][name]" }
}

And now I am getting the below error:

[WARN ][deprecation.logstash.codecs.jsonlines] Relying on default value of `pipeline.ecs_compatibility`, which may change in a future major release of Logstash. To avoid unexpected changes when upgrading Logstash, please explicitly declare your desired ECS Compatibility mode.

I have set the following property in logstash.yml file

pipeline.ecs_compatibility: disabled

But the pipeline does not progress after the below lines,

bidos-logstash_1  | [2021-12-27T02:58:49,492][INFO ][logstash.javapipeline    ][.monitoring-logstash] Pipeline started {"pipeline.id"=>".monitoring-logstash"}
bidos-logstash_1  | [2021-12-27T02:58:49,665][INFO ][logstash.javapipeline    ][vasapi_stream] Pipeline Java execution initialization time {"seconds"=>0.75}
bidos-logstash_1  | [2021-12-27T02:58:49,838][INFO ][logstash.javapipeline    ][vasapi_stream] Pipeline started {"pipeline.id"=>"vasapi_stream"}
bidos-logstash_1  | [2021-12-27T02:58:49,858][INFO ][logstash.inputs.tcp      ][vasapi_stream][b14eb8d9e41a7b7c6a22f1d12a127fca5d104a428505d67aee0732dd3d544c42] Starting tcp input listener {:address=>"0.0.0.0:5959", :ssl_enable=>false}
bidos-logstash_1  | [2021-12-27T02:58:49,957][INFO ][logstash.agent           ] Pipelines running {:count=>2, :running_pipelines=>[:".monitoring-logstash", :vasapi_stream], :non_running_pipelines=>[]}

What should be done here ?
FYI my pipeline looks like this.

input {
    tcp {
        port => "${VASAPI_TCP_PORT}"
        codec => json_lines

    }
}
filter{
        if ![applicationid]{ drop{}}
        # extract duration of finished tasks
        grok{
                match => {"message" => "Finished task %{GREEDYDATA} in %{INT:task_duration_ms:int} ms%{GREEDYDATA}"}
        }
        mutate {
            rename => { "host" => "[host][name]" }
        }
}
output{
        elasticsearch {
                hosts => ["${ELASTICSEARCH_HOST}:${ELASTICSEARCH_PORT}"]
                data_stream => "true"
                data_stream_type => "logs"
                data_stream_dataset => "${VASAPI_STREAM_INDEX}"
                data_stream_namespace => "default"
                action => "create"
                user => "${ELASTICSEARCH_INSERT_USERNAME}"
                password => "${ELASTICSEARCH_INSERT_PASSWORD}"
                ssl => "true"
                ssl_certificate_verification => "true"
                cacert => "${LOGSTASH_BASE}/certificates/BOSCH-CA-DE_pem.cer"
        }
}

Logstash version -> 7.16.1
Elasticsearc version -> 7.14.1

@AquaX , I have changed the input plugin like below.

input {
    tcp {
        port => "${VASAPI_TCP_PORT}"
        ecs_compatibility => v1
        codec => json_lines {
                     ecs_compatibility => v1
                 }
    }
}

which is giving me below messages

[INFO ][logstash.codecs.jsonlines][vasapi_stream] ECS compatibility is enabled but `target` option was not specified. This may cause fields to be set at the top-level of the event where they are likely to clash with the Elastic Common Schema. It is recommended to set the `target` option to avoid potential schema conflicts (if your data is ECS compliant or non-conflicting, feel free to ignore this message)

and not writing data to Elasticsearch data stream.
I have also added stout in Output plugin in logstash pipeline conf and I can see it is parsing the logs.

bidos-logstash_1  | {
bidos-logstash_1  |          "@timestamp" => 2021-12-27T06:45:22.924Z,
bidos-logstash_1  |               "level" => "INFO",
bidos-logstash_1  |     "applicationname" => "vasapi.SI01.proc.main.mduss6.application",
bidos-logstash_1  |                "tags" => [
bidos-logstash_1  |         [0] "_grokparsefailure"
bidos-logstash_1  |     ],
bidos-logstash_1  |         "logger_name" => "org.apache.spark.sql.execution.streaming.CheckpointFileManager",
bidos-logstash_1  |         "level_value" => 20000,
bidos-logstash_1  |             "message" => "Renamed temp file /proc/vasapi/mes/parsing_20210226/mes-locations_20210226/_spark_metadata/.161710.f5d81cdb-55c3-4a5d-add1-5770a53f559f.tmp to /proc/vasapi/mes/parsing_20210226/mes-locations_20210226/_spark_metadata/161710",
bidos-logstash_1  |       "applicationid" => "application_1639646017711_0253",
bidos-logstash_1  |            "@version" => "1",
bidos-logstash_1  |         "thread_name" => "stream execution thread for [id = 2c69de27-aef6-4748-ab0a-d814aeb011c9, runId = 6292ab5f-1b21-470c-8fce-00e3361ad561]"
bidos-logstash_1  | }

So here the only problem is Logstash is not writing the data to Elasticsearch data stream. Is there anything missing or to be added in the conf here.