Cannot get the name of indices using logstash

Hello everyone,

How can I get the name of indices using logstash?
I have this indices which is from data stream called (backing indices if I correct)

.ds-my-neoada-stream-2023.09.14-000001
.ds-my-neoada-stream-2023.09.14-000002
.ds-my-neoada-stream-2023.09.14-000004
.ds-my-neoada-stream-2023.09.14-000005

I tried using this syntax

index => .ds.my-neoada-stream-%{+yyyy.MM.dd}

but what happened was that it created new indices

.ds-my-neoada-stream-2023.09.20

but this is not what I want to happen. What I want to happen is to update my lastest indices.
using fingerprint


  fingerprint {
    key => "1234ABCD"
    method => "SHA256"
    source => ["ticket_id", "tick_account" , "tick_teampips" , "tick_current", "tick_stages", "ticket_xagent"]
    target => "[@metadata][_id]"
    concatenate_sources => true
  }

I also add the action => create , document_id => "[@metadata][generated_id]" and doc_as_upsert => true

elasticsearch {
      hosts => ["myIpAdddress"]
      user => "myuser"
      password => "mypassword!"
      cacert => "/etc/elasticsearch/certs/http_ca.crt"
      ssl => true
      ssl_certificate_verification => false
      document_id => "%{[@metadata][_id]}"
      doc_as_upsert => true
      index => ".ds-my-neoada-stream-%{+yyyy.MM.dd}"
      

#      template_overwrite => true
      action => "update"

my purpose for this is to prevent duplication of my logs.

Any solution for this?

Regards

I don't think you can do what you want with Logstash.

You seem to be using data streams and data streams are append only, meaning that you cannot update it with Logstash.

You will need to use normal indices without rollover to achieve this.

1 Like

Is there any other solution for this other than changing my index setup?
I have this approach, if there is an existing document or ID then don't create for new document and updates the existing fields only.
is this approach possible? If yes, how?

This is sample logs, and if this document exist just update the fields only like "ticket_xstat": "Open"
what happens when user update to close "ticket_xstat": "Close"
it creates new documents again

{
  "_index": ".ds-my-neoada-stream-2023.09.14-000005",
  "_id": "0txktYoB0eBbzdbC3gSJ",
  "_version": 1,
  "_score": 0,
  "_source": {
    "ticket_zdesc": "I cannot access the shared drives",
    "agent": {
      "name": "Neoada-Server",
      "id": "37d5bd3a-55c8-4b17-8934-6544ecee3353",
      "type": "filebeat",
      "ephemeral_id": "a83bd7a2-69b7-48a5-9674-fa38d43da5af",
      "version": "8.9.0"
    },
    "ecs": {
      "version": "8.0.0"
    },
    "ticket_xagent": "IT Helpdesk",
    "ticket_xdate": "2023-09-21 09:35:31",
    "ticket_xstat": "Open",

}

Best regards,
CRUZ

No, not really. Have a look at this thread, which discussesa quite similar problem.

1 Like

There is not, if you want to keep updating documents you cannot use Data Streams nor Rollovers, you need to use normal indices.

You can still use ILM to move your data between data tiers, if you have different data tiers, and to delete your indices after some time, but your use case won't work using data streams or rollovers because you need logstash to write directly on the back indices.

1 Like

Thank you so much @leandrojmp and @Christian_Dahlqvist for taking the time to answer my concern and for sharing the information.

@Christian_Dahlqvist your blog about duplicate prevention was a great help to my question.

To solve my problem I did what you said guys,

LOGSTASH CONFIG

input {
    pipeline {
        address => neoadaLog
    }
    elasticsearch {
      hosts => ["myIpAddress"]
      user => "myuser"
      password => "mypassword!"
      ssl => true
      ssl_certificate_verification => false

      index => "neoada-alias"
      query => '{ "sort": [ "_doc" ] }'
    }
    
} # <--- END OF INPUT --->

filter {

  if [log][file][path] =~ /\/opt\/lampp\/htdocs\/ada_v2\/storage\/app\/.+\.log/ {

    grok {
			match => { "message" => ["%{TIMESTAMP_ISO8601:[@metadata][timestamp]}%{SPACE}\(%{IP:ip}\)-%{DATA:log}: %{GREEDYDATA:[@metadata][logmsg]}"] }
      overwrite => [ "[@metadata][timestamp]" , "[@metadata][logmsg]" ]
	  }

    json { source => "[@metadata][logmsg]" }

    date {
      match => ["[@metadata][timestamp]", "yyyy-MM-dd HH:mm:ss"]
		  timezone => "Asia/Manila"
		  target => "@timestamp"
    }
    mutate {
      remove_field => ["created_at" , "is_viber" , "assignment_id" , "budget_category", "location_id"]
    }
  
  fingerprint {
      key => "1234ABCD"
      method => "SHA256"
      source => ["ticket_id"]
      target => "[generated_id]"
      concatenate_sources => true
    }
  
    
  }
 
  
} # <--- END OF FILTER --->

output { 
  if [@metadata][pipeline] {
    elasticsearch {
      hosts => ["myIpAddress"]
      user => "admin"
      password => "Lastikman2023!"
      cacert => "/etc/elasticsearch/certs/http_ca.crt"
      ssl => true
      ssl_certificate_verification => false
    
      #INDEX TEMPLATE
      document_id => "%{[generated_id]}"
      pipeline => "%{[@metadata][pipeline]}"
      template => "/etc/logstash/template/neoada-prod.template.json"
      template_name => "neoada-test"
      index => "neoada-alias"
      data_stream => false

      doc_as_upsert => true
      action => "update"
    }
  }
  else {
    elasticsearch {
      hosts => ["myIpAddress"]
      user => "myuser"
      password => "mypassword!"
      cacert => "/etc/elasticsearch/certs/http_ca.crt"
      ssl => true
      ssl_certificate_verification => false
      

      #INDEX TEMPLATE
      document_id => "%{[generated_id]}"
      template => "/etc/logstash/template/neoada-prod.template.json"
      template_name => "neoada-test"
      index => "neoada-alias"
      data_stream => false

      doc_as_upsert => true
      action => "update"
    }
  }

  stdout {
    codec => rubydebug
  }
  file {
    codec => json
    path => "/var/log/logstash/neoada.log-%{+YYYY-MM-dd}.txt"
  }
} # <--- END OF OUTPUT --->

I created a new index template and put my rollover alias in the settings.

PUT _index_template/index_template
{
  "index_patterns": ["my-index-stream-*"],                 
  "template": {
    "settings": {
      "number_of_shards": 1,
      "number_of_replicas": 0,
      "index.lifecycle.name": "new-lifecycle-policy",      
      "index.lifecycle.rollover_alias": "indexseries"    
    }
  }
}

I also created a new index and put the name of my rollover alias there

PUT %3Cneoada-stream-%7Bnow%2Fd%7D-000001%3E
{
  "aliases": {
    "neoada-alias": {
      "is_write_index": true
    }
  }
}

That's it, I did it with a rollover and the duplicates disappeared.

Thank you guys.

Best regards!

Keep in mind that this will not always work.

Rollover is similar to data streams in the fact that it also uses a backing indice, so if you receive a document with an id that was already indexed, but the backing indice is not the write indice anymore, it will be duplicated.

For example, assuming that today you receive a document with the id 1234, and tomorrow your index has a rollover, if after that you receive a document with the same id, the backing indice will be different and the document will be duplicated.

2 Likes

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.