while in kibana discover i get
Subscriber 550
Refill_Bar_End_Date_and_Time Jan 9, 2020 @ 00:00:00.000
but reading the content of the index is giving me a wrong format that i need to convert it
Kibana generates the string for you using your browser timezone.
From what you shared, the field is a date field and it is expressed as epochmillis.
The code seems correct, except if PySpark is converting the field already to date.
Can you please tell us what is the error and the type of the value returned from PySpark?
from pyspark.sql import SparkSession
from elasticsearch import Elasticsearch
from elasticsearch_dsl import Search
from datetime import datetime as dt
import pandas as pd
def convert_ts(hit):
hit = hit['_source']
# change Refill_Bar_End_Date_and_Time
try:
ts_from_doc = hit.get('Refill_Bar_End_Date_and_Time', None)
if not ts_from_doc:
raise ValueError('`Refill_Bar_End_Date_and_Time` not found')
# incoming as millisec so convert to sec
as_date = dt.fromtimestamp(
int(ts_from_doc / 1000.0)
).strftime('%Y-%m-%d %H:%M:%S')
hit['Refill_Bar_End_Date_and_Time'] = as_date
except Exception as e:
print(e)
pass
return hit
es = Elasticsearch(['http://localhost:9200'], timeout=600)
documents = es.search(index='subscribers-20200101', body={})['hits']['hits']
documents = [convert_ts(doc) for doc in documents]
print(documents)```
but they asked me to work on spark session/context
this is my new code : ```
from pyspark import SparkConf
from pyspark.sql import SQLContext
q ="""{
"query": {
"match_all": {}
}
}"""
es_read_conf = {
"es.nodes" : "localhost",
"es.port" : "9200",
"es.resource" : "sub01",
"es.query" : q
}
es_rdd = sc.newAPIHadoopRDD(
inputFormatClass="org.elasticsearch.hadoop.mr.EsInputFormat",
keyClass="org.apache.hadoop.io.NullWritable",
valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable",
conf=es_read_conf)
i tried the to add the same try but i get this error :
File "", line 14
except Exception as e:
^
IndentationError: unindent does not match any outer indentation level
i tried the same methode i get error
ts_from_doc = get('Refill_Bar_End_Date_and_Time', None)
if not ts_from_doc:
raise ValueError('`Refill_Bar_End_Date_and_Time` not found')
# incoming as millisec so convert to sec
as_date = dt.fromtimestamp(
int(ts_from_doc / 1000.0)
).strftime('%Y-%m-%d %H:%M:%S')
hit['Refill_Bar_End_Date_and_Time'] = as_date
def convert_ts(hit):
hit = hit['_source']
try:
ts_from_doc = get('Refill_Bar_End_Date_and_Time', None)
if not ts_from_doc:
raise ValueError('`Refill_Bar_End_Date_and_Time` not found')
# incoming as millisec so convert to sec
as_date = dt.fromtimestamp(
int(ts_from_doc / 1000.0)
).strftime('%Y-%m-%d %H:%M:%S')
hit['Refill_Bar_End_Date_and_Time'] = as_date
except Exception as e:
print(e)
pass
return hit
ss
ss = [convert_ts(doc) for doc in es_rdd]```
TypeError Traceback (most recent call last)
<ipython-input-11-45f80019aa54> in <module>
----> 1 ss = [convert_ts(doc) for doc in es_rdd]
TypeError: 'RDD' object is not iterable
Apache, Apache Lucene, Apache Hadoop, Hadoop, HDFS and the yellow elephant
logo are trademarks of the
Apache Software Foundation
in the United States and/or other countries.