Run query and recreate index every run

I have logstash config file and would like to run this everytime and recreate index everytime. that way it mimic live data

basically when running job's status is completed it should drop from query and I don't want to see that in my index.

input {
jdbc {
jdbc_validate_connection => true
jdbc_connection_string => "jdbc:oracle:thin:@hostname:1521/db_name"
jdbc_user => "user"
jdbc_password => "password"
jdbc_driver_library => "/usr/lib/oracle/12.2/client64/lib/ojdbc8.jar"
jdbc_driver_class => "Java::oracle.jdbc.driver.OracleDriver"
statement => "select JOB, STATUS from JOBS where STATUS='running'"
schedule => "*/1 * * * *"
}
}

filter {
}

output {
elasticsearch {
hosts => ["localhost:9200"]
index => "jobs_running"
document_id => "%{job}"
}
stdout { codec => rubydebug }
}

But it is not doing that it is just adding new record to it.

How do I achive this.

  • If your table contains all the old entries (and is not super super huge), you could query all the jobs and then put a condition in your output based on the 'STATUS' field. If it isn't 'RUNNING', you use action => delete. Otherwise you update/create the entry.
  • If the table does not contain all the data, you could create a pipeline that loads all the elasticsearch documents from your index, checks if the job is still running (jdbc streaming: search for the id and status = "running") and deletes it if it isn't ( = no jdbc result). (And you'd still need an additional pipeline to create/update the active jobs).
  • I don't think that you can just delete the whole index with logstash. You could maybe index into a new index (jobs_running_XXX based on the timestamp in some way) and use curator to change an alias (named 'jobs_running' and always pointing to the latest index) and delete old indices. But that won't be 100% 'live', I guess.

Maybe someone has another idea :slight_smile:

Jenni,
Thanks for reply. Basically job monitoring agent changes status= when job completes/fails/aborted etc..
I would just like to get status=running and use that data for visulization.

table is huge so can't scan whole thing every time.

I testing scanning whole table it takes couple hour at first time and then it works fine as I do job > :sql_last_value

But can't figure out how I can just scan for status=running and when status changes drop that record from elasticsearch.

So far I have understand that logstash can't delete record but it can update record. how do I get that done? I have not yet figure out as well.

Can I run a curl command that will remove all record before logstash execute every minute to delete whole index and then logstash will recreate it automatically?

Or another option is I can dump this in to csv format and read that.
but does it creates new indice everytime when I read CSV format file?

To be honest, it sounds like you should inform yourself a bit more about the basics of Elasticsearch and Logstash: Importing the exact same data from a different source won't change anything about your output. It's still the same data and the same output configuration after all. And Logstash's ES output isn't completely unable to delete. But it deletes documents, not indices. That's why I mentioned the action option.

Back to your main problem: I think I don't understand your setup completely. It sounds like 'job' is an auto increment ID and you're only importing new jobs. But if you are doing a partial import of your data and delete and recreate your whole index every time, it will never contain all running jobs, will it? But if there are so many running jobs that you cannot scan all the entries*, it would be great to have a timestamp for the modification date (the last status change). Then you could just import all the changed jobs and basically use my first suggestion to solve this.

*(if there aren't that many running jobs, but the query takes very long, you might be missing an index on that column)

Jenni,
Thanks. sorry for confusion. Let me try again.

Job is uniq id and get incremented. Throught out the datacenter user submit's jobs. each job gets uniq id which is job
status=running when job starts running. when it completes it becomes success.

I am only interested in data which has status=running.

Lets say three jobs are running
#1, #2, #3
after five min #1 and #2 are completed and #4 #5 #6 and #7 started

i.e now I am only interested in #3 to #7 which status=running.

when I run query database is going to give me only that five row (#3 to #7) as job #1 and #2 are completed.

But using my setup all job stays there logstash properly pulls data but now elk has total 7 record

But if you only import new jobs by filtering with
job > :sql_last_value and you delete your index every time, you lose job 3. It's not in the index and not in the query. And you said that importing all running jobs at once without that condition would take too long...

If the number of running jobs is manageable, try my second suggestion. I think that should work and searching for specific entries based on their ID should be fast enough :slight_smile:

can you explain little more on your second step?

  • If the table does not contain all the data, you could create a pipeline that loads all the elasticsearch documents from your index, checks if the job is still running (jdbc streaming: search for the id and status = "running") and deletes it if it isn't ( = no jdbc result). (And you'd still need an additional pipeline to create/update the active jobs).

The basic idea would be to create the documents in the same way as you already do and then check them on a regular basis with an additonal pipeline – something like this:

input {
  elasticsearch {
    # all the settings that you need to load your existing jobs from the index
    ...
  }
}
filter {
  jdbc_streaming {
    # search for the database entry of the current document with status 'running'
    ...
    statement => "select JOB from JOBS where JOB = :job and STATUS='running'"
    target => 'found_job'
    parameters => {
     "job" => "JOB"
    }
  }
}
output {
  if !("" in [found_job][0]) {
    # There is no result, so the job does either not exist in the table at all or does at least not have the status 'running'
    elasticsearch {
      # All the settings for your ES connection
      ...
      document_id => "%{JOB}"
      action => "delete"
    }
  }
}

oh that make sense. let me work on it today. will ask for advice if I get stuck and report it back if it works.

Thank you very much

input {
elasticsearch {
hosts => "localhost"
index => "running_jobs"
query => '{ "query": { "match": { "status": running } }, "sort": [ "_doc" ] }'
}
}

filter {
jdbc {
statement => "select JOB, STATUS from JOBS where STATUS='running'"
target => 'found_job'
parameters => {
"job" => "JOB"
}
}
}

output {
if !("" in [found_job][0]) {
elasticsearch {
hosts => ["localhost:9200"]
index => "running_jobs"
document_id => "%{JOB}"
}
action => "delete"
}
stdout { codec => rubydebug }
}

Fails with syntax error what am I missing. looks like something in if statement is not right.

This is from log
exception=>"LogStash::ConfigurationError", :message=>"Expected one of #, { at line 35, column 15 (byte 1165) after output {\n if !("" in [found_job][0]) {\n elasticsearch {\n hosts => ["localhost:9200"]\n\t index => "running_jobs" \n document_id => "%{job}"\n\t}\n action ",

Your "action" is not inside the ES output because there is a closing '}' before it.

Great it works from command line just have to make minor change to query

query => '{ "query": { "match": { "status": "running" } }, "sort": [ "_doc" ] }'

and on filter section I had type jdbc it should read jdbc_streaming like you said.

Will just have to confirm if it deletes document or not

Thank you very much Jenni

Hi Jenni,
Can you explain how this configuration works. My delete is not working.

input {
elasticsearch {
hosts => "localhost"
index => "running_jobs"
query => '{ "query": { "match": { "status": "running" } }, "sort": [ "_doc" ] }'
schedule => "*/2 * * * *"
}
}

filter {
jdbc_streaming {
statement => "select JOB, STATUS from JOBS where STATUS='running' JOB > :sql_last_value"
target => 'found_job'
parameters => {
"job" => "JOB"
}
}
}

output {
if !(" " in [found_job][0]) {
elasticsearch {
hosts => ["localhost:9200"]
index => "running_jobs"
document_id => "%{JOB}"
action => "delete"
}
stdout { id => "%{JOB}" }
}
}

What does target=>found_job does. is this like a array? and saves output of select statement?
what does this if statement does ?
if not found any record then delete?

Logic as explain on this thread is correct but I am missing something as I don't understand exactly how it works and hence it is not working

This is my config file

input {
elasticsearch {
hosts => "localhost"
index => "running_jobs"
query => '{ "query": { "match_all": {} } }'
schedule => "*/1 * * * *"
}
}

filter {
jdbc_streaming {
### Remove part of connection strings from here
statement => "select JOB from JOBS where STATUS='running'"
target => 'found_job'
parameters => {
"job" => "JOB"
}
}
}

output {
if !(" " in [found_job][0]) {
elasticsearch {
hosts => ["localhost:9200"]
index => "running_jobs"
document_id => "%{job}"
action => "delete"
}
}
}

Lets say I have 10 job in running_jobs index ( that gets input from elasticsearch)
jdbc_streaming runs and get 8 job as 'running' once it connect to database, because two job has completed and now it's status is 'completed'

I want to delete that two job from elasticsearch index.

it is not working.
what I am doing wrong?

how is this if !(" " in [found_job}[)}) works?

what is target => 'found_job' saves.

how do I debug this? how can I list what is being save in found_job[0] ?

Anyone??

is there anyone can explain?

I am still have a problem with this thing to work.

here is simple explanation.

Elk input index = running_job

Job=1 Status=running
Job=2 Status=running
Job=3 Status=running
Job=4 Status=running

I have four document in index status of them is running.

Lets say in 30 min first two job status becomes complete and two more job starts

i.e I want to delete job=1 and job=2

and insert two document job=5 and job=6

job is document_id which is uniq and gets incremented in database

here is my config file but it does not work. All document just gets deleted.

I want to compare job field from elk to job filed from jdbc_streaming and if not found delete it as I am just querying running job. that means everything else can go.

have so much time but can't figure out this one.

input {
elasticsearch {
hosts =>"localhost"
index => "running_jobs"
query => '{ "query": { "match_all": {} } }'
schedule => "*/1 * * * *"
}
}

filter {
jdbc_streaming {
jdbc_validate_connection => true
jdbc_driver_library => "/usr/lib/oracle/12.2/client64/lib/ojdbc8.jar"
jdbc_driver_class => "Java::oracle.jdbc.driver.OracleDriver"
statement => "select JOB, STATUS from JOBS where STATUS='running'”
target => 'found_job'
parameters => {
"job" => "JOB"
}
}
}

output {
if !(“ ” in [found_job][0]) {
elasticsearch {
hosts => ["localhost:9200"]
index => "running_jobs"
document_id => "%{job}"
action => "delete"
}
} else {
elasticsearch {
hosts => ["localhost:9200"]
index => "running_jobs"
document_id => "%{job}"
}
} ##if
}

I am gone try to bump up this one last time.

I have found out that [found_job][0][job] is the job# that I am looking for
still not sure how this if works

Anyone?