I am attempting to load up Amazon Elasticsearch Service (version 7.4) with Kinesis Data Firehose. Right before I turned on the hose (no pun intended) I put an index defining the JSON object I would be sending in, to include a string lat,lon pair under a location dict. In my case I have two dict's for source and destination and could not specify them as integers as I exception handle the lack of GeoInt by defining the values as "Unidentified".
Here is an excerpt of what I send to ES using the aws4auth module in Python 3.8.
index = 'vpc-flows'
url = host + '/' + index + '/'
headers = { "Content-Type": "application/json" }
pload = {
"mappings": {
"properties": {
'accountId': { "type": "text" },
'interfaceId': { "type": "text" },
'interfaceArn': { "type": "text" },
'sourceInstanceId': { "type": "text" },
'sourceInstanceArn': { "type": "text" },
'sourceIp': { "type": "text" },
'sourceReverseDomain': { "type": "text" },
'sourceLocation': { "type": "geo_point" },
'sourceIpCountry': { "type": "text" },
'sourceIsp': { "type": "text" },
'sourceOrg': { "type": "text" },
'sourceAs': { "type": "text" },
'sourceAsname': { "type": "text" },
'destInstanceId': { "type": "text" },
'destInstanceArn': { "type": "text" },
'destIp': { "type": "text" },
'destReverseDomain': { "type": "text" },
'destinationLocation': { "type": "geo_point" },
'destIpCountry': { "type": "text" },
'destIsp': { "type": "text" },
'destOrg': { "type": "text" },
'destAs': { "type": "text" },
'destAsname': { "type": "text" },
'sourcePort': { "type": "text" },
'destPort': { "type": "text" },
'protocol': { "type": "text" },
'startTime': { "type": "text" },
'endTime': { "type": "text" },
'action': { "type": "text" }
}
}
}
r = requests.put(url, auth=awsauth, json=pload, headers=headers)
print(r.json())
And I get this in response, so far so good.
{'acknowledged': True, 'shards_acknowledged': True, 'index': 'vpc-flows'}
The source of the data is VPC flow logs, which is space-delimited and streamed compressed and b64 encoded to CloudWatch Logs. I decompress and decode this, split the lines, do some enrichment and create a new JSON dict and send it to Firehose using json.dumps(). Firehose is pointed at ES (7.4)
Here is in excerpt of that code:
flowLogRaw = {
'accountId': acctId,
'interfaceId': interfaceId,
'interfaceArn': eniArn,
'sourceInstanceId': sourceInstanceId,
'sourceInstanceArn': sourceInstanceArn,
'sourceIp': sourceIp,
'sourceReverseDomain': sourceReverseDomain,
'sourceLocation': {
'sourceIpLatid': sourceIpLatid,'sourceIpLongt': sourceIpLongt
},
'sourceIpCountry': sourceIpCountry,
'sourceIsp': sourceIsp,
'sourceOrg': sourceOrg,
'sourceAs': sourceAs,
'sourceAsname': sourceAsname,
'destInstanceId': destInstanceId,
'destInstanceArn': destInstanceArn,
'destIp': destIp,
'destReverseDomain': destReverseDomain,
'destinationLocation': {
'destIpLatid': destIpLatid,'destIpLongt': destIpLongt
},
'destIpCountry': destIpCountry,
'destIsp': destIsp,
'destOrg': destOrg,
'destAs': destAs,
'destAsname': destAsname,
'sourcePort': sourcePort,
'destPort': destPort,
'protocol': ianaProtocol,
'startTime': dtStartTime,
'endTime': dtEndTime,
'action': flowAction
}
geoIntFlowLog = json.dumps(flowLogRaw, default=str)
try:
response = firehose.put_record(DeliveryStreamName=firehoseStream,Record={'Data': geoIntFlowLog})
print(response)
except Exception as e:
print(e)
I can publish to the stream no issue, I do not get any throttling. However, I do get errors from Elasticsearch that return HTTP 400:
{"type":"mapper_parsing_exception","reason":"failed to parse field [sourceIpLatid] of type [geo_point]","caused_by":{"type":"parse_exception","reason":"unsupported symbol [.] in geohash [46.0929]","caused_by":{"type":"illegal_argument_exception","reason":"unsupported symbol [.] in geohash [46.0929]"}}}
I am confused why this is giving me a geohash illegal argument type. If I delete the index using my first Python code, ES will create the index as expected but of course it turns lat and lon into strings so I cannot map them.
I followed the excerpt here for the index mapping for creating it. Is there anything I can change in the code, or some other prep piece I should take before sending this from Firehose? Or should I drop Firehose and use Requests to send the data to Kibana that way?