Syncing Tables from Microsoft SQL Server to Elastic using Logstash

I'm trying to sync my SQL server database to my elasticsearch database. I've got the following input setup now:

input {
  jdbc {
    jdbc_driver_library => "/usr/share/logstash/drivers/mssql-jdbc.jre8.jar"
    jdbc_driver_class => "com.microsoft.sqlserver.jdbc.SQLServerDriver"
    jdbc_connection_string => "${JDBC_CONNECTION_STRING}"
    jdbc_password => "${JDBC_PASSWORD}"
    jdbc_user => "${JDBC_USER}"
    schedule => "*/${SYNC_JOB_INTERVAL} * * * * *"
    statement => "SELECT TOP ${SYNC_JOB_BATCH_SIZE} Id, Title, Details as Description, COALESCE(Modified, Created) as Modified, Created, DATEDIFF(s, '1970-01-01 00:00:00', COALESCE(Modified, Created)) AS unix_ts_in_secs 
                  FROM Item
                  WHERE DATEDIFF(s, '1970-01-01 00:00:00', COALESCE(Modified, Created)) > :sql_last_value 
                  ORDER BY Modified ASC"
    use_column_value => true
    tracking_column => "unix_ts_in_secs"
    tracking_column_type => "numeric"
    record_last_run => true
    clean_run => false
    last_run_metadata_path => "/usr/share/logstash/data/ticket_last_run.yml"
  }
}

I store the last run metadata and use a calculated value using the DATEDIFF function. Using this I'm able to keep track of progress. The downside is however, that whenever something later in the pipeline fails such as a filter this value is still stored.

Is there a cleaner way of handling syncing?

Hello Melvin,

What you are doing so far is correct. I think what you need is to enable dead-letter-queues:

The dead letter queue (DLQ) is designed as a place to temporarily write events that cannot be processed. The DLQ gives you flexibility to investigate problematic events without blocking the pipeline or losing the events.

So in your example, whenever a filter fails the docs would be stored in the DLQ. Then, you can fix the pipeline and use the dead_letter_queue input to read the failed events and process them.

Best regards
Wolfram

Thanks for the response ! That sounds promising! I did read this on the documentation page

The dead letter queue is currently supported only for the Elasticsearch output and conditional statements evaluation. The dead letter queue is used for documents with response codes of 400 or 404, both of which indicate an event that cannot be retried. It’s also used when a conditional evaluation encounter an error.

I'm not entirely sure if this means it only fires within a failure in the output plugin, or that if a filter fails it also puts it into this queue? For clarity, I'm using a HTTP filter plugin to send the data to an API, which then returns the same data but enriched, I want to ensure that if something goes wrong with this setup, the data is not lost.

Are you aware if this queue would solve this issue?

Would this secondary pipeline have to be entirely seperate from the current pipeline, or could I just add the queue as secondary input? What would you reccomend?

Hmmm, I am not sure but it sounds as if this wouldn't fit your usecase then.

Depending on your usecase, you could try a few things:

  • ingest your other data to elastic and use an enrich processor with failure store enabled
  • use a ruby filter to get the data because in ruby ypu can use begin-rescue structures

At least the failure store in Elasticsearch definitely catches ingest pipeline failures (tried it out myself already):

A failure store is a secondary set of indices inside a data stream, dedicated to storing failed documents. A failed document is any document that, without the failure store enabled, would cause an ingest pipeline exception or that has a structure that conflicts with a data stream's mappings.

Thanks for the response!

For the ingesting using the failure store, from my limited understanding from reading the documentation this seems to be only for datastreams/time-series data. My data consists of various kinds of documents such as support tickets and change requests. So I'm guessing the ruby filter would be the preferred option.

I'll give that a try in the near future and if it's a suitable solution. I'll let you know!

The DLQ only makes sense when you have events that are not indexed because Logstash received a 400 or 404 from Elasticsearch, it has nothing to do with filter errors.

I don't think it will help you in this case.

What could go wrong and when it go wrong can you identify in the returned event? You will probably need to redirect the events that went wrong to another destination, be it a different index or even a file, so you can analyze and reprocess them.

I'm not sure what is the issue here, the last run will be populate before any filter as it comes from the input, is your pipeline breaking and you need to reprocess the data?

Hello,

Ok so I'll rule out the DLQ!

I have a HTTP filter which points to an API, during development my machine can sometimes have trouble keeping up with the requests and the request to the API gets lost. (Not entirely sure what the reason is) When this happends the inputs tracking column is still updated so it will not refetch the same data again.

So I'm looking for a way to handle this issue.

So would for example adding a tag to the event such as "http_filter_failed" be a sufficient way of handling this? Where I have a conditional in the output that outputs failed events to a seperate index?

I'll add the filter config below for more clarification:

filter {
  http {
    connect_timeout => 30
    request_timeout => 60
    url => "${API_HOST}"
    verb => "POST"
    body_format => "json"
    body => {
      Id => "%{id}"
      Title => "%{title}"
      Description => "%{description}"
      Created => "%{created}"
      Modified => "%{modified}"
    }

    target_body => "embedding_response"
  }
  
  # This was the temp fix  (This would have to be removed of course
  if ![embedding_response] {
    drop { }
  }

  if [embedding_response] {
    mutate {
      replace => {
        "title" => "%{[embedding_response][Title]}"
        "description" => "%{[embedding_response][Description]}"
      }

      copy => {
        "[embedding_response][titleEmbedding]" => "title_embedding"
        "[embedding_response][descriptionEmbedding]" => "description_embedding"
      }

      remove_field => [ "embedding_response" ]
    }
  }
}

If you can identify that it fails, then adding a tag would be enough.

Then you can change your output to have a conditional based on that tag.

Something like:

output {
    if "tag_failed" in [tags] {
        elasticsearch { destination for events with the failed tag }
    } else {
        elasticsearch { destination for events that worked }
    }
}

Yes, like Leandro said, you can add tag above or [@metadata][fieldname] which will not be sent to ES. @metadata is just the LS runtime variable/field.

According to the documentation, there is no the error handling in the http filter, but you can test or check github.