Trying to understand how to model an index for a specific set of questions

I'm having a hard time figuring out the best way to structure my index(es) to solve this problem, and having some help working it out here will really help me understand some things around indexing best practices. I have 2 data streams. One is patient incidents let's say falls that have happened in the hospital.

CustomerId
Patient mrn and other demographic data
Incident Data

And the second stream is Patient Admits to the hospital
CustomerId
Patient mrn and other demographic data
Admit date
Admit Facility

I want a watch that surfaces matches every time a patient is admitted that has previously been involved in an incident. Let's assume that an MRN match is enough to qualify that for simplicity's sake.

How do I best index the data to solve this problem?

The admit stream is much more active (100k-millions daily in magnitude) is insert only, and the data more transient. Once I have looked at an admit, I probably won't use it again. So these could be purged after a few days. Incident data changes much less frequently (and almost never changes the patient involved.) with a few hundred insert or update operations a day. These are persistent and can never be purged (unless the source record is deleted)

In relational models, I can just have 2 tables, and join them on MRN. But I'm having trouble coming up with a schema strategy for ES.

Should I index patients and create 2 types of nested records, one for incident records, and one for Admits? Or is this more of a parent/child relationship?

Or can I just index Incidents and Admits as completely separate types and have my watch look for at least 1 result in each set?

One way to achieve this could be to have the data in two completely separate time-based indices )as they have different retention periods). If we then assume you have an index time on the admission records (so that you can identify all newly created admissions within a time frame, e.g. the last minute). You could then run a watch that retrieves all recently admitted MRN and used a chained watch to then find which of these also exist in the incidents index. A somewhat similar example can be found in the examples repository on GitHub. As long as each run returns a reasonable number of records this should perform well and scale.

1 Like

Thanks a ton Christian. I had not heard about Chained Inputs before. Your response puts the problem into a context that I can wrap my head around.

What exactly is this expression saying?

"must": [{
  "terms": {
      "process_host": ["{{#ctx.payload.started_processes.aggregations.process_hosts.buckets}}{{key}}",
           "{{/ctx.payload.started_processes.aggregations.process_hosts.buckets}}"]
	}
 }

process_host = the key value for any bucket in input 1 (started_processess), but what is {{/ctx.payload.started_processes.aggregations.process_hosts.buckets}}

OK, really my issue is not having enough knowlege about Execution Contexts. The best reference I have found so far is here
https://www.elastic.co/guide/en/x-pack/5.4/condition-compare.html#_accessing_values_in_the_execution_context

But it's not complete enough for me to really understand how to write my own expressions. What if I just want to match process_host to any bucket key returned by input 1

"process_host":"ctx.payload.started_processess.aggregations.process_hosts.buckets.process_host"
or maybe
"process_host":"ctx.payload.started_processess.aggregations.process_hosts.buckets.keys"

This is the watch I am working on. Both Indexes/types contain a field mrn. The new-admit query works. I have made a watch using just that as a search input, and get alerts about my new admits. But I can't seem to figure out how to use the newAdmit payload to match to exisitingContent on MRN. This watch throws no errors, but also never matches anything. I know for 100% certainty that the mrn I am indexing to patient-admit matches 3 results in the converge-patient-content index, and can perform that search independantly. The problem must be the term filter of the existingContent filter, or the Compare condition. But I'm not sure what I would change.

{
	"metadata": {
		"window_period": "30s"
	},
	"trigger": {
		"schedule": {
			"interval": "30s"
		}
	},
	"input": {				
		"chain": {
			"inputs": [{
				"newAdmits": {
					"search": {
						"request": {
							"indices": ["patient-admit"],
							"body": {
								"query": {
									"bool": {
										"must": [{
											"range": {
												"timestamp": {
													"gte": "now-{{ctx.metadata.window_period}}"
												}
											}
										}]
									}
								}
							}
						}
					}
				}
			},
			{
				"existingContent": {
					"search": {
						"request": {
							"indices": ["converge-patient-content"],
							"body": {
								"query": {
									"bool": {
										"must": [{
											"term": {
												"mrn": "ctx.payload.newAdmits.mrn"
											}
										}]
									}
								}
							}
						}
					}
				}
			}]
		}
	},
	"condition" : {
		"compare": {
			"ctx.payload.existingContent.hits.total": {
				"gt": 0
			}
		}
	},
	"actions": {
		"index_payload" : { 
			"index" : {
				"index" : "matches", 
				"doc_type" : "payloads"
			}
		}
	}
}

If you look at the example I linked to, the second chained query that perform a lookup based on recently admitted patients is using a terms query which contains all hits from the first query. In your example you seem to be searching for a single term, which looks incorrect to me.

1 Like

Thanks. I finally found the Mustache documentation, and got to the bottom of that pesky terms expression. For reference This watch gets me really close to what I need. I just need to do a bit of additional work to transform the payload.

PUT _xpack/watcher/watch/new-admit-with-existing-content
{
	"metadata": {
		"window_period": "30s"
	},
	"trigger": {
		"schedule": {
			"interval": "30s"
		}
	},
	"input": {
		"chain": {
			"inputs": [{
				"newAdmits": {
					"search": {
						"request": {
							"indices": ["patient-admit"],
							"body": {
								"query": {
									"bool": {
										"must": [{
											"range": {
												"timestamp": {
													"gte": "now-{{ctx.metadata.window_period}}"
												}
											}
										}]
									}
								},
								"aggs": {
									"mrns": {
										"terms": {
											"field": "mrn"
											"size": 1000
										}
									}
								},
								"size": 0
							}
						}
					}
				}
			},
			{
				"existingContent": {
					"search": {
						"request": {
							"indices": ["converge-patient-content"],
							"body": {
								"query": {
									"bool": {
										"must": [{
											"terms": {
												"mrn": [
													"{{#ctx.payload.newAdmits.aggregations.mrns.buckets}}{{key}}",
													"{{/ctx.payload.newAdmits.aggregations.mrns.buckets}}"
												]
											}
										}]
									}
								}
							}
						}
					}
				}
			}]
		}
	},
	"condition" : {
		"compare": {
			"ctx.payload.existingContent.hits.total": {
				"gt": 0
			}
		}
	},
	"actions": {
		"log": {
			"logging": {
				"text": "Admitted MRN has ConVerge Content:{{#ctx.payload._value}}{{.}}:{{/ctx.payload._value}}"
			}
		},
		"index_payload" : { 
			"index" : {
				"index" : "matches", 
				"doc_type" : "payloads"
			}
		}
	}
}

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.