Mapping parser exception for geopoint index in Amazon ES 7.4

I am attempting to load up Amazon Elasticsearch Service (version 7.4) with Kinesis Data Firehose. Right before I turned on the hose (no pun intended) I put an index defining the JSON object I would be sending in, to include a string lat,lon pair under a location dict. In my case I have two dict's for source and destination and could not specify them as integers as I exception handle the lack of GeoInt by defining the values as "Unidentified".

Here is an excerpt of what I send to ES using the aws4auth module in Python 3.8.

index = 'vpc-flows'
url = host + '/' + index + '/'

headers = { "Content-Type": "application/json" }

pload = {
  "mappings": {
    "properties": {
        'accountId': { "type": "text"  },
        'interfaceId': { "type": "text"  },
        'interfaceArn': { "type": "text"  },
        'sourceInstanceId': { "type": "text"  },
        'sourceInstanceArn': { "type": "text"  },
        'sourceIp': { "type": "text"  },
        'sourceReverseDomain': { "type": "text"  },
        'sourceLocation': { "type": "geo_point"  },
        'sourceIpCountry': { "type": "text"  },
        'sourceIsp': { "type": "text"  },
        'sourceOrg': { "type": "text"  },
        'sourceAs': { "type": "text"  },
        'sourceAsname': { "type": "text"  },
        'destInstanceId': { "type": "text"  },
        'destInstanceArn': { "type": "text"  },
        'destIp': { "type": "text"  },
        'destReverseDomain': { "type": "text"  },
        'destinationLocation': { "type": "geo_point"  },
        'destIpCountry': { "type": "text"  },
        'destIsp': { "type": "text"  },
        'destOrg': { "type": "text"  },
        'destAs': { "type": "text"  },
        'destAsname': { "type": "text"  },
        'sourcePort': { "type": "text"  },
        'destPort': { "type": "text"  },
        'protocol': { "type": "text"  },
        'startTime': { "type": "text"  },
        'endTime': { "type": "text"  },
        'action': { "type": "text"  } 
    }
  }
}

r = requests.put(url, auth=awsauth, json=pload, headers=headers)
print(r.json())

And I get this in response, so far so good.

{'acknowledged': True, 'shards_acknowledged': True, 'index': 'vpc-flows'}

The source of the data is VPC flow logs, which is space-delimited and streamed compressed and b64 encoded to CloudWatch Logs. I decompress and decode this, split the lines, do some enrichment and create a new JSON dict and send it to Firehose using json.dumps(). Firehose is pointed at ES (7.4)

Here is in excerpt of that code:

flowLogRaw = {
            'accountId': acctId,
            'interfaceId': interfaceId,
            'interfaceArn': eniArn,
            'sourceInstanceId': sourceInstanceId,
            'sourceInstanceArn': sourceInstanceArn,
            'sourceIp': sourceIp,
            'sourceReverseDomain': sourceReverseDomain,
            'sourceLocation': {
                'sourceIpLatid': sourceIpLatid,'sourceIpLongt': sourceIpLongt
            },
            'sourceIpCountry': sourceIpCountry,
            'sourceIsp': sourceIsp,
            'sourceOrg': sourceOrg,
            'sourceAs': sourceAs,
            'sourceAsname': sourceAsname,
            'destInstanceId': destInstanceId,
            'destInstanceArn': destInstanceArn,
            'destIp': destIp,
            'destReverseDomain': destReverseDomain,
            'destinationLocation': {
                'destIpLatid': destIpLatid,'destIpLongt': destIpLongt
            },
            'destIpCountry': destIpCountry,
            'destIsp': destIsp,
            'destOrg': destOrg,
            'destAs': destAs,
            'destAsname': destAsname,
            'sourcePort': sourcePort,
            'destPort': destPort,
            'protocol': ianaProtocol,
            'startTime': dtStartTime,
            'endTime': dtEndTime,
            'action': flowAction
        }
        geoIntFlowLog = json.dumps(flowLogRaw, default=str)
        try:
            response = firehose.put_record(DeliveryStreamName=firehoseStream,Record={'Data': geoIntFlowLog})
            print(response)
        except Exception as e:
            print(e)

I can publish to the stream no issue, I do not get any throttling. However, I do get errors from Elasticsearch that return HTTP 400:

{"type":"mapper_parsing_exception","reason":"failed to parse field [sourceIpLatid] of type [geo_point]","caused_by":{"type":"parse_exception","reason":"unsupported symbol [.] in geohash [46.0929]","caused_by":{"type":"illegal_argument_exception","reason":"unsupported symbol [.] in geohash [46.0929]"}}}

I am confused why this is giving me a geohash illegal argument type. If I delete the index using my first Python code, ES will create the index as expected but of course it turns lat and lon into strings so I cannot map them.

I followed the excerpt here for the index mapping for creating it. Is there anything I can change in the code, or some other prep piece I should take before sending this from Firehose? Or should I drop Firehose and use Requests to send the data to Kibana that way?

I found a few things wrong, firstly, my values under sourceLocation and destinationLocation should've been labled as lat and lon so I swapped that like so:

...
'sourceLocation': {
            'lat': sourceIpLatid,'lon': sourceIpLongt
            },
            'sourceIpCountry': sourceIpCountry,
            'sourceIsp': sourceIsp,
            'sourceOrg': sourceOrg,
            'sourceAs': sourceAs,
            'sourceAsname': sourceAsname,
            'destInstanceId': destInstanceId,
            'destInstanceArn': destInstanceArn,
            'destIp': destIp,
            'destReverseDomain': destReverseDomain,
            'destinationLocation': {
                'lat': destIpLatid,'lon': destIpLongt
            },
...

I did away with going from Firehose and just published into ES directly using Requests and AWS4Auth, I am not sure if my url variable is correct, but I was getting an not_x_content_exception error if I did not add _doc after my URL before running PUT

awsAuthToken = AWS4Auth(accessKey, secretAccessKey, awsRegion, 'es', session_token=seshToken)
# create requests items
url = host + '/' + index + '/' + '_doc/'
headers = { "Content-Type": "application/json" }
geoIntFlowLog = json.dumps(flowLogRaw, default=str)
r = requests.post(url, auth=awsAuthToken, data=geoIntFlowLog, headers=headers)

Everything appears to be to be working now in Kibana, if only I could get my visualizations working right...

EDIT: I will be retesting Firehose as reading the error message now I think my issues were caused by not having lon and lat in there to begin with.