If your table contains all the old entries (and is not super super huge), you could query all the jobs and then put a condition in your output based on the 'STATUS' field. If it isn't 'RUNNING', you use action => delete. Otherwise you update/create the entry.
If the table does not contain all the data, you could create a pipeline that loads all the elasticsearch documents from your index, checks if the job is still running (jdbc streaming: search for the id and status = "running") and deletes it if it isn't ( = no jdbc result). (And you'd still need an additional pipeline to create/update the active jobs).
I don't think that you can just delete the whole index with logstash. You could maybe index into a new index (jobs_running_XXX based on the timestamp in some way) and use curator to change an alias (named 'jobs_running' and always pointing to the latest index) and delete old indices. But that won't be 100% 'live', I guess.
Jenni,
Thanks for reply. Basically job monitoring agent changes status= when job completes/fails/aborted etc..
I would just like to get status=running and use that data for visulization.
table is huge so can't scan whole thing every time.
I testing scanning whole table it takes couple hour at first time and then it works fine as I do job > :sql_last_value
But can't figure out how I can just scan for status=running and when status changes drop that record from elasticsearch.
So far I have understand that logstash can't delete record but it can update record. how do I get that done? I have not yet figure out as well.
Can I run a curl command that will remove all record before logstash execute every minute to delete whole index and then logstash will recreate it automatically?
To be honest, it sounds like you should inform yourself a bit more about the basics of Elasticsearch and Logstash: Importing the exact same data from a different source won't change anything about your output. It's still the same data and the same output configuration after all. And Logstash's ES output isn't completely unable to delete. But it deletes documents, not indices. That's why I mentioned the action option.
Back to your main problem: I think I don't understand your setup completely. It sounds like 'job' is an auto increment ID and you're only importing new jobs. But if you are doing a partial import of your data and delete and recreate your whole index every time, it will never contain all running jobs, will it? But if there are so many running jobs that you cannot scan all the entries*, it would be great to have a timestamp for the modification date (the last status change). Then you could just import all the changed jobs and basically use my first suggestion to solve this.
*(if there aren't that many running jobs, but the query takes very long, you might be missing an index on that column)
Jenni,
Thanks. sorry for confusion. Let me try again.
Job is uniq id and get incremented. Throught out the datacenter user submit's jobs. each job gets uniq id which is job
status=running when job starts running. when it completes it becomes success.
I am only interested in data which has status=running.
Lets say three jobs are running #1, #2, #3
after five min #1 and #2 are completed and #4#5#6 and #7 started
i.e now I am only interested in #3 to #7 which status=running.
when I run query database is going to give me only that five row (#3 to #7) as job #1 and #2 are completed.
But using my setup all job stays there logstash properly pulls data but now elk has total 7 record
But if you only import new jobs by filtering with
job > :sql_last_value and you delete your index every time, you lose job 3. It's not in the index and not in the query. And you said that importing all running jobs at once without that condition would take too long...
If the number of running jobs is manageable, try my second suggestion. I think that should work and searching for specific entries based on their ID should be fast enough
If the table does not contain all the data, you could create a pipeline that loads all the elasticsearch documents from your index, checks if the job is still running (jdbc streaming: search for the id and status = "running") and deletes it if it isn't ( = no jdbc result). (And you'd still need an additional pipeline to create/update the active jobs).
The basic idea would be to create the documents in the same way as you already do and then check them on a regular basis with an additonal pipeline – something like this:
input {
elasticsearch {
# all the settings that you need to load your existing jobs from the index
...
}
}
filter {
jdbc_streaming {
# search for the database entry of the current document with status 'running'
...
statement => "select JOB from JOBS where JOB = :job and STATUS='running'"
target => 'found_job'
parameters => {
"job" => "JOB"
}
}
}
output {
if !("" in [found_job][0]) {
# There is no result, so the job does either not exist in the table at all or does at least not have the status 'running'
elasticsearch {
# All the settings for your ES connection
...
document_id => "%{JOB}"
action => "delete"
}
}
}
filter {
jdbc {
statement => "select JOB, STATUS from JOBS where STATUS='running'"
target => 'found_job'
parameters => {
"job" => "JOB"
}
}
}
output {
if !("" in [found_job][0]) {
elasticsearch {
hosts => ["localhost:9200"]
index => "running_jobs"
document_id => "%{JOB}"
}
action => "delete"
}
stdout { codec => rubydebug }
}
Fails with syntax error what am I missing. looks like something in if statement is not right.
This is from log
exception=>"LogStash::ConfigurationError", :message=>"Expected one of #, { at line 35, column 15 (byte 1165) after output {\n if !("" in [found_job][0]) {\n elasticsearch {\n hosts => ["localhost:9200"]\n\t index => "running_jobs" \n document_id => "%{job}"\n\t}\n action ",
filter {
jdbc_streaming {
statement => "select JOB, STATUS from JOBS where STATUS='running' JOB > :sql_last_value"
target => 'found_job'
parameters => {
"job" => "JOB"
}
}
}
output {
if !(" " in [found_job][0]) {
elasticsearch {
hosts => ["localhost:9200"]
index => "running_jobs"
document_id => "%{JOB}"
action => "delete"
}
stdout { id => "%{JOB}" }
}
}
What does target=>found_job does. is this like a array? and saves output of select statement?
what does this if statement does ?
if not found any record then delete?
filter {
jdbc_streaming {
### Remove part of connection strings from here
statement => "select JOB from JOBS where STATUS='running'"
target => 'found_job'
parameters => {
"job" => "JOB"
}
}
}
output {
if !(" " in [found_job][0]) {
elasticsearch {
hosts => ["localhost:9200"]
index => "running_jobs"
document_id => "%{job}"
action => "delete"
}
}
}
Lets say I have 10 job in running_jobs index ( that gets input from elasticsearch)
jdbc_streaming runs and get 8 job as 'running' once it connect to database, because two job has completed and now it's status is 'completed'
I want to delete that two job from elasticsearch index.
it is not working.
what I am doing wrong?
how is this if !(" " in [found_job}[)}) works?
what is target => 'found_job' saves.
how do I debug this? how can I list what is being save in found_job[0] ?
I have four document in index status of them is running.
Lets say in 30 min first two job status becomes complete and two more job starts
i.e I want to delete job=1 and job=2
and insert two document job=5 and job=6
job is document_id which is uniq and gets incremented in database
here is my config file but it does not work. All document just gets deleted.
I want to compare job field from elk to job filed from jdbc_streaming and if not found delete it as I am just querying running job. that means everything else can go.
Apache, Apache Lucene, Apache Hadoop, Hadoop, HDFS and the yellow elephant
logo are trademarks of the
Apache Software Foundation
in the United States and/or other countries.