Discrepancy with Data Indexing, Processed records and Time Stamp for ML Job

I'm working towards creating an ML job through the Kibana UI as well as using the ML wrapper API that we developed internally.

Using the wrapper, first I'm indexing all the test data and then starting the job as well as the datafeed. However, this is not letting to process all the records. If I use the same data index and create a job using the UI with same configuration parameters, the whole data index set is being processed.

This image shows the difference in number of records processed for the same data set- particularly, last two ML jobs are configured with the same parameters (including the query delay and frequencey) but still there is a difference in number of records processed as well as the latest date stamp.

The job named test is created using UI and processed all the 322 indexed records but the other jobs, which are created through the wrapper API only processed partial data.

PS: There is no invalid data as well.

Please let me know where the issue is and what could be the possible solution. I will be happy to provide more information as needed.

Thank You
Rohith

Rohith,

In order to help, I think we'll need the following information:

  • The earliest timestamp of the 322 indexed records
  • The latest timestamp of the 322 indexed records
  • The sequence of API calls that you're making
  • The relevant time parameters you're passing for the starting of the datafeed

thanks

Earliest timestamp= "11/01/2016"
Latest timestamp= "09/18/2017"- Today

Sequence of API calls-- Creating the index for mapping -> Indexing the data in elastic search using bulk_api-> Creating the job-> opening the job-> creating the data feed-> starting the datafeed (running in the real-time)-> Retrieving the results periodically.

All the calls are automated using the scheduler, with the span of one day typically. But it will go down to the minute level soon

Relevant time parameters for datafeed:
analysis_config": {
"bucket_span": "300s"
}
datafeed_config": {
"datafeed_id": "59c0391bc5aab63997650d85",
"job_id": "59c0391bc5aab63997650d85",
"query_delay": "120s",
"frequency": "150s",
"indices": [
"i59c0391bc5aab63997650d85"
],
"types": [
"59c0391bc5aab63997650d85"
],
"query": {
"match_all": {
"boost": 1
}
}

I also changed the Query delay and frequency to multiple levels till 300s and 3600s respectively. But still the problem remains the same. I'm not sure why the records till the current date stamp are not processed when job is created using the API.

I need to ask this question...you say:

starting the datafeed (running in the real-time)

But when you start the datafeed, you can issue the "start time" for the data set:

POST _xpack/ml/datafeeds/datafeed-it-ops-kpi/_start
{
  "start": "2017-04-07T18:22:16Z"
}

I assume that you issue your start command and pass a start time of "11/01/2016" so that all of the historical data will be analyzed?

I tried but it didn't work either. Normally, I wasn't passing the start time so it is processing the data from the earliest time stamp. However, for some reason, it wasn't processing till the current date.

When I start the job from 11/01/2016, it is processing the data only till a certain date- for example it is processing till
5/18/2017 and sometimes it is processing till 09/04/2017 but wasn't processing the data till the current date, which is in contrast with the job created from the UI.

Changing the query delay and frequencey didn't help me much.

The query_delay parameter is only relevant for real-time operation, not for historical data.

So, I've tried to replicate your situation and cannot. I created two identical jobs, one with the UI and one with the API and I get the same number of records processed, the full 86,274 events in the index:

For your reference, here's the API script that was used to create the API version of the job:

#!/bin/bash

HOST='localhost'
PORT=9200
VER=5.5.0
JOB_ID="farequote_api"
ROOT="http://${HOST}:${PORT}/_xpack/ml"
JOBS="${ROOT}/anomaly_detectors"
DATAFEEDS="${ROOT}/datafeeds"
printf "\n== Script started for... $JOBS/$JOB_ID"

printf "\n\n== Stopping datafeed... "
curl -u elastic:changeme -s -X POST ${DATAFEEDS}/datafeed-${JOB_ID}/_stop

printf "\n\n== Deleting datafeed... "
curl -u elastic:changeme -s -X DELETE ${DATAFEEDS}/datafeed-${JOB_ID}

printf "\n\n== Closing job... "
curl -u elastic:changeme -s -X POST ${JOBS}/${JOB_ID}/_close

printf "\n\n== Deleting job... "
curl -u elastic:changeme -s -X DELETE ${JOBS}/${JOB_ID}

printf "\n\n== Creating job... \n"
curl -u elastic:changeme -s -X PUT -H 'Content-Type: application/json' ${JOBS}/${JOB_ID}?pretty -d '{
    "description" : "Unusual responsetimes by airlines",
    "analysis_config" : {
        "bucket_span": "5m",
        "detectors" :[{"function":"max", "field_name":"responsetime","by_field_name":"airline"}],
        "influencers" : [ "airline" ]
    },
    "data_description" : {
       "time_field":"@timestamp"
    }
}'
printf "\n\n== Creating datafeed... \n"
curl -u elastic:changeme -s -X PUT -H 'Content-Type: application/json' ${DATAFEEDS}/datafeed-${JOB_ID}?pretty -d '{
      "job_id" : "'"$JOB_ID"'",
      "indexes" : [
        "farequote"
      ],
      "types" : [
        "responsetime"
      ],
      "scroll_size" : 1000
}'


printf "\n\n== Opening job for ${JOB_ID}... "
curl -u elastic:changeme -X POST ${JOBS}/${JOB_ID}/_open

printf "\n\n== Starting datafeed-${JOB_ID}... "
curl -u elastic:changeme -X POST "${DATAFEEDS}/datafeed-${JOB_ID}/_start"

printf "\n\n== Finished ==\n\n"

I appreciate your effort Rich.

I find it unusual as well because with the same configuration and with the same logic the number of records processed were different each time.

One work around, I found was to restart the datafeed after the initial set of records were processed- then it is processing the rest of the records. If 200 records were processed for the first time and if I restart datafeed after that, it is processing rest of the records and then the job is running real-time. I sense it is because of the lag in elasticsearch indexing and making the all the data points available before ml job started processing but it shouldn't be the case as the records are very very few in number. You might have a better idea here!

I'm also not sure if I have to restart the datafeed after indexing the records every time so that they can be processed- I'm yet to test this though.

What could be the possible issue? I will be happy to provide more information.

Thanks for your valuable time.

^^this is the only thing that is different in our setups then- since you ingest the data as part of the workflow, and I don't. I would suggest adding a wait of a few seconds after the command to _bulk ingest before starting the ML job's datafeed. If you're running it sequentially in a script, the time between those two events could literally just be milliseconds as you currently have it.

Uh! Adding the wait for a few seconds is the trick. It is processing all the records now. I hope I will be able to run the job in real-time without any need to restart the datafeed by setting up an optimal query delay and frequency.

Thanks for your continued support. I'm grateful!

Rohith

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.