Logstash elastic as input

Hi.

I am using existing index as input and enriching it with the data from SQL server. This enriching process needs to run weekly for now. Currently I am loading past 10 days, but that cause duplicate. Preformatted textI am looking for a way for Logstash to know the last timestamp, so enriching process can start from there. Is there way to have it as parameter and save locally?

input {
  elasticsearch {
hosts => ["https://eq.a.am:9200"]
user => "logstash_internal"
password => "pasword"
index => "log-nnnnn-*"

query => '{
  "query": {
    "bool": {
     "must": [{"exists": {"field": "Properties.LoginName"}},
      {"range":{"Timestamp":{"gte":"now-10d"}}}
     ]
    }
  }
}'
  }
}
filter {
 jdbc_static {
loaders => [
  {
    id => "remote-nnnnn-users"
    query => "SELECT  username,offices,usertype,firstname,lastname FROM users"
    local_table => "localnnnnnusers"
  }
]
local_db_objects => [
  {
    name => "localnnnnnusers"
    index_columns => ["username"]
    columns => [
      ["username", "varchar(200)"],
      ["Offices", "varchar(200)"],
      ["usertype", "varchar(200)"],
      ["firstname", "varchar(200)"],
      ["lastname", "varchar(200)"]
    ]
  }
]
local_lookups => [
  {
    id => "local-users"
    query => "SELECT offices ,usertype ,firstname ,lastname FROM localnnnnnusers where username = :username"
    parameters => {username => "[Properties][LoginName]"}
    target => "userinformation"
  }
]
staging_directory => "/tmp/logstash/jdbc_static/import_data"
loader_schedule => "*/30 * * * *"
jdbc_user => "Auser"
jdbc_password => "password"
jdbc_driver_class => "com.microsoft.sqlserver.jdbc.SQLServerDriver"
jdbc_driver_library => "/usr/share/logstash/logstash-core/lib/jars/mssql-jdbc-8.2.2.jre8.jar"
jdbc_connection_string => "jdbc:sqlserver://l.a.am:1433;databaseName=P;integratedSecurity=false;"
  }
}


output {
  elasticsearch {
hosts => ["https://e.a.am:9200"]
cacert => "/etc/logstash/certs/e.crt"
user => "logstash_w"
password => "pasword"

index => "log-nnnnnwithusers-%{+YYYY.MM.dd}"
  }

}

There is an open issue for an enhancement to the input to support maintaining state like a jdbc input. It does not currently do so.

If you can identify fields that uniquely identify a document in the source then you can use a fingerprint filter to generate an id

fingerprint {
    method => "SHA256"
    source => [ "someField", "anotherField" ]
    concatenate_sources => true
    target => "[@metadata][id]"
}

and then in the output use

document_id => "%{[@metadata][id]}"

You will still process some events twice, but at least the new document will overwrite the old, it will not result in a duplicate.

Badger,

Thanks for the quick reply. Since input is the index, I can use document_id of the original index. Do I need to define that in fingerprint?

I changed output to below, but output index has %{[@metadata][_id]} as _id, what did I do wrong?

output {
  elasticsearch {
hosts => ["https://e.a.am:9200"]
cacert => "/etc/logstash/certs/e.crt"
user => "logstash_w"
password => "pasword"

document_id => "%{[@metadata][_id]}"
index => "log-nnnnnwithusers-%{+YYYY.MM.dd}"
  }
}

Thanks in advance

If your documents already have a unique id then you do not need to use a fingerprint filter (the only reason to do so would be if the unique id is extremely long, and a hash would be shorter).

You can pass that unique id to the document_id option using a sprintf reference.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.