Spark Structured Streaming JSON Serialization

Hey Everyone,

I hope this question hasn't been asked before, I looked around quite a bit before posting. If I missed it, sorry!

I'm trying to use the org.elasticsearch.spark.sql structured streaming sink for ES, and am running into some challenges when my column contains a string literal of valid JSON. I have one column called combined.

When the documents are written to ES they look like this:

"_source" : {
          "combined" : """{"sparkTimestamp":"2020-01-08T15:42:24.890Z","@timestamp":"2020-01-08T15:39:40.583Z","@metadata":{"beat":"winlogbeat","type":"_doc","version":"7.4.2","topic":"ingest-winlogbeat"},"host":{"hostname":"A HOSTNAME"................ truncated for brevity

As you can see, it's assuming that I have a normal string, and is nice enough to escape it for me too. :smile:

What I am really after though, is for the JSON string within the combined column to be the _source. It looks like this is possible with the older RDD approach, but I didn't see any way to make it work within the context of this sink. Is this possible? If there is another way, what are the message delivery guarantees?

If you're wondering why I'm doing this, then I'm happy to briefly explain. I'm using winlogbeat as a source, and while it is ECS compliant, a vast number of fields are within a custom extension. In addition, depending on the event, the fields will change. If I try to account for this in a spark managed(SQL) table, it's going to be very difficult because of the sparsity and sheer number of fields.

To alleviate this I parsed out core ECS fields and retained the original JSON from winlogbeat. Now that the raw table has been established, I want to index the original JSON into ES. This is effectively the combined column.

Any insight would be appreciated. Thanks!

I don't think this is a supported write configuration in ES Hadoop at the moment. We convert all Dataset objects into Dataframe types before consuming them. I think it's reasonable that we should be supporting this method if ingest - I believe that the Hive integration is able to support it, but in a somewhat more clumsy fashion.

One thing you might want to try: When you are done transforming your data and are ready to index the Dataset, try converting the Dataset[String] into an RDD[String] and saving it to Elasticsearch that way instead? I know that we do support this sort of write operation with RDD's so hopefully that will be good enough to get you past this hurdle.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.