Spark Structured Streaming JSON Serialization

Hey Everyone,

I hope this question hasn't been asked before, I looked around quite a bit before posting. If I missed it, sorry!

I'm trying to use the org.elasticsearch.spark.sql structured streaming sink for ES, and am running into some challenges when my column contains a string literal of valid JSON. I have one column called combined.

When the documents are written to ES they look like this:

"_source" : {
          "combined" : """{"sparkTimestamp":"2020-01-08T15:42:24.890Z","@timestamp":"2020-01-08T15:39:40.583Z","@metadata":{"beat":"winlogbeat","type":"_doc","version":"7.4.2","topic":"ingest-winlogbeat"},"host":{"hostname":"A HOSTNAME"................ truncated for brevity

As you can see, it's assuming that I have a normal string, and is nice enough to escape it for me too. :smile:

What I am really after though, is for the JSON string within the combined column to be the _source. It looks like this is possible with the older RDD approach, but I didn't see any way to make it work within the context of this sink. Is this possible? If there is another way, what are the message delivery guarantees?

If you're wondering why I'm doing this, then I'm happy to briefly explain. I'm using winlogbeat as a source, and while it is ECS compliant, a vast number of fields are within a custom extension. In addition, depending on the event, the fields will change. If I try to account for this in a spark managed(SQL) table, it's going to be very difficult because of the sparsity and sheer number of fields.

To alleviate this I parsed out core ECS fields and retained the original JSON from winlogbeat. Now that the raw table has been established, I want to index the original JSON into ES. This is effectively the combined column.

Any insight would be appreciated. Thanks!