A question on anomaly detection

So we have succeeded in putting in a process which can ingest one line csv logs. Lots of them. Now the next stage is to put in some sort of anomaly detection. However here is the problem I am facing.

I am not a domain expert and the person who is going to be end user says that "all" values in csv are important to him. And the csv has around 75+ columns.

I cannot see myself making 75+ population anaysis based on serial number and one field each.

Is there an option to monitor all the fields in an event for a given serial number?

A sample of the data I have:

{
  "_index": "logstash-starkindustries-2021.01",
  "_type": "_doc",
  "_id": "fgibhjdgsfib356",
  "_version": 1,
  "_score": null,
  "_source": {
    "serial": 45689457890,
    "sensor_A": 76.5,
    "sensor_B": 65.9,
    "sensor_C": 608,
    "sensor_D": 200,
    "sensor_E": 20,
    "sensor_F": 65,
	....
	....
	....
	....
	"Result": "Pass",
    "Time_Taken": 56,
}

Each serial number has one csv log of one line exactly. And hence only one event in the index against it.

Now I have created a population analysis based on the Time taken but there are so many other fields which are of interest. And any one showing something out of ordinary can be very valuable.

The processing is done in logstash so if there is any suggestion to change the way I am ingesting data from csv then I will be happy to implement it.

There are a couple of ways you could handle this situation with the functionality the stack provides.

If you think all sensors values are independent and want to compare all sensor_A values to one another, all sensor_B values to one another and so on, then if you transform the data in the index into the format,

{
  "serial": 45689457890,
  "sensor_type": "sensor_A",
  "value": 76.5
}

Then you can create a population analysis using a partition field. The config would be along the lines of:

{
  ...
  "analysis_config": {
    "detectors": [
      {
        "function": "mean",
        "field_name": "value",
        "partition_field_name": "sensor"
      }
    ]
  },
  ...
}

This can be configured from the advanced job creation wizard. Remember to select sensor as an influencer. This creates one population model for each sensor and tells you about unusual values for a sensor versus other values for the same sensor. Another thing to remember is that we compute the detector function (in the example case the mean) of all sensor values which land in the same time interval (determined by job's bucket length) so we are comparing the mean of all sensor_A values in say a 5 minute interval to means of sensor_A in other 5 minute intervals.

Another way we have of analysing data like this in the stack is using outlier detection. This treats all the sensor values for a single serial number as a vector and finds vectors of sensor values which are far from other vectors of sensor values in the collection. This might be more suitable if you think the sensor values for the same serial number are strongly related or you don't want to aggregate sensor values for different serial numbers. However, note there are a couple of things to be aware of: this is currently a single shot analysis so you need to trigger it to run on one collection of documents at a time, you might well need to randomly partition the full data set into batches and process them one at a time if it is large, the results depend on the distance between all sensors values (we standardise the data so different sensor values are put into the same range) so outliers depend on the correlation which is present between the values of individual sensors. This means individual sensor values might not be outside the range of normal values for just that sensor for outliers, but be unusual because how they relate to other sensor values for the same serial number.

1 Like

I think this is the way I will go. I saw last night that there are 150+ fields in the CSV. So there will be an explosion in the number of documents in the index if I change the ingest logic in logstash. Instead of one event per serial number there will be 150+ events.

Just worried if there will be performance inpact on Elasticsearch. There are potentially one million of such serial numbers per year.

Elasticsearch can handle this and indeed much higher event rates; it just depends on how much hardware you need. However, I would note that this data schema is not necessarily enormously less performant because the documents you are indexing are smaller and you have many fewer fields.

I thought I would quickly test this on my laptop. Here I was indexing the MiniBooNE data set with around 130000 documents with 51 columns. For indexing in the expanded format I used the Elasticsearch bulk import python helper indexing 100000 documents per batch. For reference, the python I used to do this:

def read_rows(files):
    for file in files:
        print('reading file ...')
        reader = csv.reader(file)
        headers = next(reader, None)
        count = 0
        for row in reader:
            for name, value in zip(headers, row):
                count = count + 1
                if count % 100000 == 0:
                    print('indexed', count, 'documents')
                doc = {'_index': args.index_name, 'name': name, 'value': value}
                yield doc

stream = read_rows(files)
for ok, response in streaming_bulk(es, actions = stream, chunk_size=100000):
    if not ok:
        print(response)

The total time to index the file in the expanded format, around 66 million documents in total, on my laptop is:

real 4m49.646s
user 2m16.950s
sys  0m5.116s

By comparison indexing flat it takes:

real 0m38.126s
user 0m8.204s
sys  0m0.577s

But there is likely overhead from creating the docs in python as well so this isn't a completely fair comparison. Here are the index characteristics:

It may well be that if you disabled the _source field then the expanded version would even be smaller, but as is it is only 60% larger.

Thanks for taking a look at this. I will be going with this way only. I will report back on how it went. Right now struggling with ruby code to bring about the suggested transform in logstash.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.