I am interfacing Elasticsearch with Spark, using the Elasticsearch-Hadoop plugin and I am having difficulty writing a dataframe with a timestamp
type column to Elasticsearch.
The problem is when I try to write using dynamic/multi resource formatting to create a daily index.
From the relevant documentation I get the impression that this is possible, however, the python example below fails to run unless I change my dataframe type to date
.
import pyspark
conf = pyspark.SparkConf()
conf.set('spark.jars', 'elasticsearch-spark-20_2.11-6.1.2.jar')
conf.set('es.nodes', '127.0.0.1:9200')
conf.set('es.read.metadata', 'true')
conf.set('es.nodes.wan.only', 'true')
from datetime import datetime, timedelta
now = datetime.now()
before = now - timedelta(days=1)
after = now + timedelta(days=1)
cols = ['idz', 'name', 'time']
vals = [(0,'maria', before), (1, 'lolis', after)]
time_df = spark.createDataFrame(vals, cols)
When I try to write, I use the following:
time_df.write.mode('append').format(
'org.elasticsearch.spark.sql'
).options(
**{'es.write.operation': 'index' }
).save('xxx-{time|yyyy.MM.dd}/1')
Unfortunatelly this renders an error:
.... Caused by: java.lang.IllegalArgumentException: Invalid format:
"2018-03-04 12:36:12.949897" is malformed at " 12:36:12.949897" at
org.joda.time.format.DateTimeFormatter.parseDateTime(DateTimeFormatter.java:945)
On the other hand this works perfectly fine if I use dates when I create my dataframe:
cols = ['idz', 'name', 'time']
vals = [(0,'maria', before.date()), (1, 'lolis', after.date())]
time_df = spark.createDataFrame(vals, cols)
Is it possible to format a dataframe timestamp
to be written to daily indexes with this method, without also keeping a date
column around? How about monthly indexes?
Pyspark version:
spark version 2.2.1
Using Scala version 2.11.8, OpenJDK 64-Bit Server VM, 1.8.0_151
Elasticsearch version
number "6.2.2" build_hash "10b1edd"
build_date "2018-02-16T19:01:30.685723Z" build_snapshot false
lucene_version "7.2.1" minimum_wire_compatibility_version "5.6.0"
minimum_index_compatibility_version "5.0.0"
Mirror question on SO: