Elastic Agent Logstash data streams not working

This is my logstash.yml

  else if "apps" in [tags] {
    mutate {
      # replace => { "[@metadata][index_prefix]" => "applications-%{+YYYY.MM}" }
      add_field => {
        "[data_stream][type]" => "logs"
        "[data_stream][dataset]" => "applications"
        "[data_stream][namespace]" => "test"
        "[@metadata][use_data_stream]" => "true"
      }
    }
  }

output {
  if [@metadata][use_data_stream] {
    elasticsearch {
      hosts => ["https://data01tst:9200", "https://data02tst:9200", "https://data03tst:9200", "https://data04tst:9200"]
      ssl_enabled => true
      ssl_certificate_authorities => '/etc/logstash/certs/ca.crt'
      user => "${ES_USER}"
      password => "${ES_PASS}"

      data_stream => "true"
    }
  }
  else {
    elasticsearch {
      hosts => ["https://data01tst:9200", "https://data02tst:9200", "https://data03tst:9200", "https://data04tst:9200"]
      ssl_enabled => true
      ssl_certificate_authorities => '/etc/logstash/certs/ca.crt'
      user => "${ES_USER}"
      password => "${ES_PASS}"
      ilm_enabled => false
      document_id => "%{[@metadata][_id]}"
      index => "%{[@metadata][index_prefix]}"
    }
  }
}

user logstash is assigned to role logstash_writer

{
  "logstash_writer": {
    "cluster": [
      "manage_index_templates",
      "manage_ilm",
      "monitor"
    ],
    "indices": [
      {
        "names": [
          "ap01-*",
          "app02-*",
          "other-*",
          "apm-*",
          "kafka-*",
          "ec-*",
          "handler-*",
          "applications-*",
          "broker-*",
          "logs-*-test"
        ],
        "privileges": [
          "create",
          "create_index",
          "write",
          "manage",
          "delete",
          "manage_ilm",
          "create_doc",
          "view_index_metadata"
        ],
        "allow_restricted_indices": false
      }
    ],
    "applications": [],
    "run_as": [],
    "metadata": {},
    "transient_metadata": {
      "enabled": true
    }
  }
}

I don't understand why logstash try to send data to different data stream

[2026-04-17T07:12:01,253][INFO ][logstash.outputs.elasticsearch][main][dd697e36b01b93f470c81b3498002d8e2c583c822b637ccddc60c4d9f7a0f5f2] Retrying failed action {:status=>403, :action=>["create", {:_id=>nil, :_index=>"[\"logs\", \"logs\"]-[\"filestream.generic\", \"applications\"]-[\"default\", \"test\"]", :routing=>nil}, {"log"=>{"file"=>{"path"=>"/appuser/app01/log/application.log", "inode"=>"6364", "device_id"=>"64768", "fingerprint"=>"e8507fe1ffa904e5b11ade12f6636621cc6b2a3e8b9ebb94f53f626b4677640e"}, "offset"=>308194}, "logtime"=>"2026-04-17 07:03:59.758", "tags"=>["app01", "apps", "app01", "beats_input_codec_plain_applied"], "day"=>"17", "app_msg"=>"   trace_id= span_id= [https-jsse-nio-8720-exec-9] INFO  c.v.w.c.i.t.AccessLogger - A.B.237.3 - - [17/Apr/2026:07:03:59 +0200] \"-\" HTTP400 0B 0ms (-ms)", "@version"=>"1", "input"=>{"type"=>"filestream"}, "data_stream"=>{"type"=>["logs", "logs"], "namespace"=>["default", "test"], "dataset"=>["filestream.generic", "applications"]}, "event"=>{"dataset"=>"filestream.generic", "original"=>"2026.04.17 07:03:59.758    trace_id= span_id= [https-jsse-nio-8720-exec-9] INFO  c.v.w.c.i.t.AccessLogger - A.B.237.3 - - [17/Apr/2026:07:03:59 +0200] \"-\" HTTP400 0B 0ms (-ms)"}, "agent"=>{"type"=>"filebeat", "ephemeral_id"=>"e6ecfced-04f2-4efd-8b8f-e72bfb15afab", "id"=>"8bf89371-a845-405c-9136-2ffe6f8be5a6", "version"=>"9.3.0", "name"=>"appsrv01"}, "@timestamp"=>2026-04-17T05:03:59.758Z, "month"=>"04", "year"=>"2026", "time"=>"07:03:59.758", "host"=>{"mac"=>["00-50-56-93-32-C7"], "containerized"=>false, "hostname"=>"appsrv01", "id"=>"73c99579400044adb0a3fe01931c97b0", "ip"=>["A.B.50.138"], "name"=>"appsrv01", "architecture"=>"x86_64", "os"=>{"codename"=>"Ootpa", "type"=>"linux", "kernel"=>"4.18.0-553.100.1.el8_10.x86_64", "platform"=>"rhel", "family"=>"redhat", "version"=>"8.10 (Ootpa)", "name"=>"Red Hat Enterprise Linux"}}, "ecs"=>{"version"=>"8.0.0"}, "message"=>"2026.04.17 07:03:59.758    trace_id= span_id= [https-jsse-nio-8720-exec-9] INFO  c.v.w.c.i.t.AccessLogger - A.B.237.3 - - [17/Apr/2026:07:03:59 +0200] \"-\" HTTP400 0B 0ms (-ms)", "elastic_agent"=>{"snapshot"=>false, "id"=>"8bf89371-a845-405c-9136-2ffe6f8be5a6", "version"=>"9.3.0"}}], :error=>{"type"=>"security_exception", "reason"=>"action [indices:admin/auto_create] is unauthorized for user [logstash_internal] with effective roles [logstash_writer] on indices [[\"logs\", \"logs\"]-[\"filestream.generic\", \"applications\"]-[\"default\", \"test\"]], this action is granted by the index privileges [auto_configure,create_index,manage,all]"}}
[2026-04-17T07:12:01,254][INFO ][logstash.outputs.elasticsearch][main][dd697e36b01b93f470c81b3498002d8e2c583c822b637ccddc60c4d9f7a0f5f2] Retrying individual bulk actions that failed or were rejected by the previous bulk request {:count=>2}

WHat is wrong in this configuration? Any idea?

There was missing one parameter in output plugin

data_stream_auto_routing => true

and some small changes in filer plugin, proper logstash conf should be

  else if "apps" in [tags] {
    mutate {
      # replace => { "[@metadata][index_prefix]" => "applications-%{+YYYY.MM}" }
      replace => {
        "[data_stream][type]" => "logs"
        "[data_stream][dataset]" => "applications"
        "[data_stream][namespace]" => "test"
      }
      add_field => { "[@metadata][use_data_stream]" => "true" }
    }
  }
...
output {
  if [@metadata][use_data_stream] {
    elasticsearch {
      hosts => ["https://data01tst:9200", "https://data02tst:9200", "https://data03tst:9200", "https://data04tst:9200"]
      ssl_enabled => true
      ssl_certificate_authorities => '/etc/logstash/certs/ca.crt'
      user => "${ES_USER}"
      password => "${ES_PASS}"

      data_stream => true
      data_stream_auto_routing => true # critical for index rerouting
    }
  }
  else {
    elasticsearch {
      hosts => ["https://data01tst:9200", "https://data02tst:9200", "https://data03tst:9200", "https://data04tst:9200"]
      ssl_enabled => true
      ssl_certificate_authorities => '/etc/logstash/certs/ca.crt'
      user => "${ES_USER}"
      password => "${ES_PASS}"
      ilm_enabled => false
      document_id => "%{[@metadata][_id]}"
      index => "%{[@metadata][index_prefix]}"
    }
  }
}