Wrong date format when reading from elasticsearch index

i'm reading data from elactisearch index ,
i have date format issue 1501545600000 wich supposed to be in yyyy/mm/dd format

from pyspark import SparkConf
from pyspark.sql import SQLContext

q ="""{
  "query": {
    "match_all": {}
  }  
}"""

es_read_conf = {
    "es.nodes" : "localhost",
    "es.port" : "9200",
    "es.resource" : "sub1",
    "es.query" : q
}

es_rdd = sc.newAPIHadoopRDD(
    inputFormatClass="org.elasticsearch.hadoop.mr.EsInputFormat",
    keyClass="org.apache.hadoop.io.NullWritable", 
    valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable", 
    conf=es_read_conf)

 df2 = [convert_ts(doc) for doc in df]

this used to work ; no longer cuz i changed the code

    try:
        ts_from_doc = get('Refill_Bar_End_Date_and_Time', None)

        if not ts_from_doc:
            raise ValueError('`Refill_Bar_End_Date_and_Time` not found')

        # incoming as millisec so convert to sec
        as_date = dt.fromtimestamp(
            int(ts_from_doc / 1000.0)
        ).strftime('%Y-%m-%d %H:%M:%S')

        hit['Refill_Bar_End_Date_and_Time'] = as_date

    except Exception as e:
        print(e)
        pass

so could u tell me what to modify

Hello @samargh

Can you share the index mapping of the index you're reading (sub1)?

before indexing i tried this

PUT sub1
{
  "mappings": 
  {
      "properties": {
      "Refill_Bar_End_Date_and_Time": {
           "type":"date"
      }
}

then
GET sub1/_search/
: gives

"hits" : [
      {
        "_index" : "sub1",
        "_type" : "_doc",
        "_id" : "fUGwHnIBKLcVv2X4Scjq",
        "_score" : 1.0,
        "_source" : {
          "Subscriber" : "550",
          "Refill_Bar_End_Date_and_Time" : 1578524400000,
        }
      }

while in kibana discover i get
Subscriber 550
Refill_Bar_End_Date_and_Time Jan 9, 2020 @ 00:00:00.000
but reading the content of the index is giving me a wrong format that i need to convert it

Kibana generates the string for you using your browser timezone.
From what you shared, the field is a date field and it is expressed as epochmillis.

The code seems correct, except if PySpark is converting the field already to date.
Can you please tell us what is the error and the type of the value returned from PySpark?

my code used to be this one

from pyspark.sql import SparkSession
from elasticsearch import Elasticsearch
from elasticsearch_dsl import Search
from datetime import datetime as dt
import pandas as pd


def convert_ts(hit):
    hit = hit['_source']
# change Refill_Bar_End_Date_and_Time
    try:
        ts_from_doc = hit.get('Refill_Bar_End_Date_and_Time', None)

        if not ts_from_doc:
            raise ValueError('`Refill_Bar_End_Date_and_Time` not found')

        # incoming as millisec so convert to sec
        as_date = dt.fromtimestamp(
            int(ts_from_doc / 1000.0)
        ).strftime('%Y-%m-%d %H:%M:%S')

        hit['Refill_Bar_End_Date_and_Time'] = as_date

    except Exception as e:
        print(e)
        pass
    return hit
es = Elasticsearch(['http://localhost:9200'], timeout=600)
documents = es.search(index='subscribers-20200101', body={})['hits']['hits']
documents = [convert_ts(doc) for doc in documents]
print(documents)```
but they asked me to work on spark session/context
this is my new code : ```
from pyspark import SparkConf
from pyspark.sql import SQLContext

q ="""{
  "query": {
    "match_all": {}
  }  
}"""

es_read_conf = {
    "es.nodes" : "localhost",
    "es.port" : "9200",
    "es.resource" : "sub01",
    "es.query" : q
}

es_rdd = sc.newAPIHadoopRDD(
    inputFormatClass="org.elasticsearch.hadoop.mr.EsInputFormat",
    keyClass="org.apache.hadoop.io.NullWritable", 
    valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable", 
    conf=es_read_conf)

i tried the to add the same try but i get this error :

File "", line 14
except Exception as e:
^
IndentationError: unindent does not match any outer indentation level

i tried the same methode i get error

ts_from_doc = get('Refill_Bar_End_Date_and_Time', None)

        if not ts_from_doc:
            raise ValueError('`Refill_Bar_End_Date_and_Time` not found')

        # incoming as millisec so convert to sec
        as_date = dt.fromtimestamp(
            int(ts_from_doc / 1000.0)
        ).strftime('%Y-%m-%d %H:%M:%S')

        hit['Refill_Bar_End_Date_and_Time'] = as_date
def convert_ts(hit):
    hit = hit['_source']
    try:
        ts_from_doc = get('Refill_Bar_End_Date_and_Time', None)
​
        if not ts_from_doc:
            raise ValueError('`Refill_Bar_End_Date_and_Time` not found')
​
        # incoming as millisec so convert to sec
        as_date = dt.fromtimestamp(
            int(ts_from_doc / 1000.0)
        ).strftime('%Y-%m-%d %H:%M:%S')
​
        hit['Refill_Bar_End_Date_and_Time'] = as_date
​
    except Exception as e:
        print(e)
        pass
    return hit
ss 
ss = [convert_ts(doc) for doc in es_rdd]```

TypeError                      Traceback (most recent call last)
<ipython-input-11-45f80019aa54> in <module>
----> 1 ss = [convert_ts(doc) for doc in es_rdd]

TypeError: 'RDD' object is not iterable

This is a Python error, because the indentation is not correct.

how can i modify my code ?

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.