Mapping parser exception for geopoint index in Amazon ES 7.4

I am attempting to load up Amazon Elasticsearch Service (version 7.4) with Kinesis Data Firehose. Right before I turned on the hose (no pun intended) I put an index defining the JSON object I would be sending in, to include a string lat,lon pair under a location dict. In my case I have two dict's for source and destination and could not specify them as integers as I exception handle the lack of GeoInt by defining the values as "Unidentified".

Here is an excerpt of what I send to ES using the aws4auth module in Python 3.8.

index = 'vpc-flows'
url = host + '/' + index + '/'

headers = { "Content-Type": "application/json" }

pload = {
  "mappings": {
    "properties": {
        'accountId': { "type": "text"  },
        'interfaceId': { "type": "text"  },
        'interfaceArn': { "type": "text"  },
        'sourceInstanceId': { "type": "text"  },
        'sourceInstanceArn': { "type": "text"  },
        'sourceIp': { "type": "text"  },
        'sourceReverseDomain': { "type": "text"  },
        'sourceLocation': { "type": "geo_point"  },
        'sourceIpCountry': { "type": "text"  },
        'sourceIsp': { "type": "text"  },
        'sourceOrg': { "type": "text"  },
        'sourceAs': { "type": "text"  },
        'sourceAsname': { "type": "text"  },
        'destInstanceId': { "type": "text"  },
        'destInstanceArn': { "type": "text"  },
        'destIp': { "type": "text"  },
        'destReverseDomain': { "type": "text"  },
        'destinationLocation': { "type": "geo_point"  },
        'destIpCountry': { "type": "text"  },
        'destIsp': { "type": "text"  },
        'destOrg': { "type": "text"  },
        'destAs': { "type": "text"  },
        'destAsname': { "type": "text"  },
        'sourcePort': { "type": "text"  },
        'destPort': { "type": "text"  },
        'protocol': { "type": "text"  },
        'startTime': { "type": "text"  },
        'endTime': { "type": "text"  },
        'action': { "type": "text"  } 
    }
  }
}

r = requests.put(url, auth=awsauth, json=pload, headers=headers)
print(r.json())

And I get this in response, so far so good.

{'acknowledged': True, 'shards_acknowledged': True, 'index': 'vpc-flows'}

The source of the data is VPC flow logs, which is space-delimited and streamed compressed and b64 encoded to CloudWatch Logs. I decompress and decode this, split the lines, do some enrichment and create a new JSON dict and send it to Firehose using json.dumps(). Firehose is pointed at ES (7.4)

Here is in excerpt of that code:

flowLogRaw = {
            'accountId': acctId,
            'interfaceId': interfaceId,
            'interfaceArn': eniArn,
            'sourceInstanceId': sourceInstanceId,
            'sourceInstanceArn': sourceInstanceArn,
            'sourceIp': sourceIp,
            'sourceReverseDomain': sourceReverseDomain,
            'sourceLocation': {
                'sourceIpLatid': sourceIpLatid,'sourceIpLongt': sourceIpLongt
            },
            'sourceIpCountry': sourceIpCountry,
            'sourceIsp': sourceIsp,
            'sourceOrg': sourceOrg,
            'sourceAs': sourceAs,
            'sourceAsname': sourceAsname,
            'destInstanceId': destInstanceId,
            'destInstanceArn': destInstanceArn,
            'destIp': destIp,
            'destReverseDomain': destReverseDomain,
            'destinationLocation': {
                'destIpLatid': destIpLatid,'destIpLongt': destIpLongt
            },
            'destIpCountry': destIpCountry,
            'destIsp': destIsp,
            'destOrg': destOrg,
            'destAs': destAs,
            'destAsname': destAsname,
            'sourcePort': sourcePort,
            'destPort': destPort,
            'protocol': ianaProtocol,
            'startTime': dtStartTime,
            'endTime': dtEndTime,
            'action': flowAction
        }
        geoIntFlowLog = json.dumps(flowLogRaw, default=str)
        try:
            response = firehose.put_record(DeliveryStreamName=firehoseStream,Record={'Data': geoIntFlowLog})
            print(response)
        except Exception as e:
            print(e)

I can publish to the stream no issue, I do not get any throttling. However, I do get errors from Elasticsearch that return HTTP 400:

{"type":"mapper_parsing_exception","reason":"failed to parse field [sourceIpLatid] of type [geo_point]","caused_by":{"type":"parse_exception","reason":"unsupported symbol [.] in geohash [46.0929]","caused_by":{"type":"illegal_argument_exception","reason":"unsupported symbol [.] in geohash [46.0929]"}}}

I am confused why this is giving me a geohash illegal argument type. If I delete the index using my first Python code, ES will create the index as expected but of course it turns lat and lon into strings so I cannot map them.

I followed the excerpt here for the index mapping for creating it. Is there anything I can change in the code, or some other prep piece I should take before sending this from Firehose? Or should I drop Firehose and use Requests to send the data to Kibana that way?

I found a few things wrong, firstly, my values under sourceLocation and destinationLocation should've been labled as lat and lon so I swapped that like so:

...
'sourceLocation': {
            'lat': sourceIpLatid,'lon': sourceIpLongt
            },
            'sourceIpCountry': sourceIpCountry,
            'sourceIsp': sourceIsp,
            'sourceOrg': sourceOrg,
            'sourceAs': sourceAs,
            'sourceAsname': sourceAsname,
            'destInstanceId': destInstanceId,
            'destInstanceArn': destInstanceArn,
            'destIp': destIp,
            'destReverseDomain': destReverseDomain,
            'destinationLocation': {
                'lat': destIpLatid,'lon': destIpLongt
            },
...

I did away with going from Firehose and just published into ES directly using Requests and AWS4Auth, I am not sure if my url variable is correct, but I was getting an not_x_content_exception error if I did not add _doc after my URL before running PUT

awsAuthToken = AWS4Auth(accessKey, secretAccessKey, awsRegion, 'es', session_token=seshToken)
# create requests items
url = host + '/' + index + '/' + '_doc/'
headers = { "Content-Type": "application/json" }
geoIntFlowLog = json.dumps(flowLogRaw, default=str)
r = requests.post(url, auth=awsAuthToken, data=geoIntFlowLog, headers=headers)

Everything appears to be to be working now in Kibana, if only I could get my visualizations working right...

EDIT: I will be retesting Firehose as reading the error message now I think my issues were caused by not having lon and lat in there to begin with.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.