Hello. Learning Logstash here. I am using logstash to apply aggregation formatting so that data arrives to elasticsearch in a specific format. It seems to work most of the time, but some data seems to get no aggregation and comes out as a flat array.
Here is an example of the output (Correct formatting then no formatting):
{
"_index": "disruptive_events",
"_id": "5634",
"_score": 1,
"_ignored": [
"event.czml_document.packet.description.keyword"
],
"_source": {
"event_id": 5634,
"@timestamp": "2024-04-04T22:57:01.649715964Z",
"event": {
"quantity": "JP",
"id": 5634,
"min": -48,
"comments": "Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod temporincididunt ut labore et dolore magna aliqua.",
"type": "Missile 5",
"max": 1819,
"lon": 13.8380799,
"assoc_geoint": [
{
"TIBS_MSG_NUM": 8,
"MIDB_Equip_Code": "MEC478",
"TDDS_SCN": 32,
"id": 229,
"Lat": 13.836691227842918,
"Lon": 39.376305883642964,
"SmjrAxis": 1.2582698243856676,
"trueHeadingDegrees": 113.06669824230329,
"crsDegrees": 41.09585540433796,
"TES_EventID": 14,
"Orientation": 185.68826641914205,
"Producer_Msg_Seq_Num": 81,
"correlIndx": "SG",
"TDDS_Trk_Num": "TK407",
"DeltaTime": 0.7080357612310085,
"UUID": "UUID-767119",
"Altitude": 221.15322130270908,
"DetectTime": "2023-01-01T09:16:01.954Z",
"SmnrAxis": 4.424461626025332,
"Entity_Upd_Num": 76
}
],
"gif": "viirs3.gif",
"assoc_sigint": [
{
"SmnrAxis": 4.78,
"id": 242,
"Lat": 13.847508900000001,
"confidence": 0.82,
"Lon": 39.3576539,
"SmjrAxis": 2.1,
"Frequency": 30.029,
"Orientation": 175.28,
"start": "2023-01-01T09:14:47.954Z",
"SVID": 488,
"signot": "Not2",
"orgSvid": "186",
"stop": "2023-01-01T09:14:53.894Z",
"sigintType": "ELINT",
"numObservations": 3,
"loationType": "Type3",
"functionCode": "FC561",
"country_code": "CC"
}
],
"start": "2023-01-01T09:15:47.954Z",
"duration": 610000,
"sensor": [
{
"id": 218,
"data": [
{
"id": 3826,
"label": "2023-01-01T09:15:51.584Z",
"y": 1467
}
],
"name": "sensor y",
"code": "y"
}
],
"lat": 39.3664349,
"code": 3,
"flags": "y,a,b",
"stop": "2023-01-01T09:15:53.894Z",
"aor": "USPACOM",
"uuid": "5634",
"czml_document": [
{
"packet": [
{
"position": [
{
"interpolationDegree": 1,
"id": 43,
"cartographicDegrees": [
0,
39.3664349,
13.8380799,
200,
0.32999999999999996,
39.3764349,
13.8480799,
997,
0.6599999999999999,
39.3864349,
13.8580799,
874,
0.9899999999999999,
39.396434899999996,
13.8680799,
1119,
1.3199999999999998,
39.406434899999994,
13.8780799,
1104,
1.65,
39.41643489999999,
13.8880799,
1852,
1.9799999999999998,
39.42643489999999,
13.898079899999999,
2094,
2.3099999999999996,
39.43643489999999,
13.908079899999999,
1787,
2.6399999999999997,
39.446434899999986,
13.918079899999999,
2086,
2.9699999999999998,
39.456434899999984,
13.928079899999998,
2154,
3.3,
39.46643489999998,
13.938079899999998,
2003,
3.6299999999999994,
39.47643489999998,
13.948079899999998,
1883,
3.9599999999999995,
39.48643489999998,
13.958079899999998,
1346,
4.289999999999999,
39.496434899999976,
13.968079899999998,
1448,
4.619999999999999,
39.506434899999974,
13.978079899999997,
1274,
4.949999999999999,
39.51643489999997,
13.988079899999997,
1209,
5.279999999999999,
39.52643489999997,
13.998079899999997,
700,
5.609999999999999,
39.53643489999997,
14.008079899999997,
419
],
"interpolationAlgorithm": "LAGRANGE",
"epoch": "2023-01-01T09:15:47.954Z"
}
],
"id": 74,
"name": "Geoeye1",
"label": [
{
"id": 43,
"outlineWidth": 2,
"outlineColor": {
"rgba": [
0,
0,
255,
255
]
},
"text": "",
"verticalOrigin": "CENTER",
"font": "11pt Lucida Console",
"style": "FILL_AND_OUTLINE",
"pixelOffset": {
"cartesian2": [
12,
0
]
},
"horizontalOrigin": "LEFT",
"show": true,
"fillColor": {
"rgba": [
0,
255,
0,
255
]
}
}
],
"billboard": [
{
"horizontalOrigin": "CENTER",
"id": 56,
"image": {
"uri": "plane-icon.png"
},
"scale": 1.5,
"verticalOrigin": "CENTER",
"eyeOffset": {
"cartesian": [
0,
0,
0
]
},
"show": true,
"pixelOffset": {
"cartesian2": [
0,
0
]
}
}
],
"path": [
{
"width": 1,
"resolution": 120,
"id": 43,
"material": {
"solidColor": {
"color": [
0,
255,
0,
255
]
}
},
"show": {
"interval": "2023-01-01T09:15:47.954Z/2023-01-01T09:15:53.894Z",
"boolean": true
}
}
],
"availability": "2023-01-01T09:15:47.954Z/2023-01-01T09:15:53.894Z",
"document_id": 76,
"description": "<!--HTML-->\r\n<p>GeoEye-1 is a high-resolution earth observation satellite owned by GeoEye, which was launched in September 2008.</p>\r\n\r\n<p>On December 1, 2004, General Dynamics C4 Systems announced it had been awarded a contract worth approximately $209 million to build the OrbView-5 satellite. Its sensor is designed by the ITT Exelis.</p>\r\n\r\n<p>The satellite, now known as GeoEye-1, was originally scheduled for April 2008 but lost its 30-day launch slot to a U.S. government mission which had been delayed. It was rescheduled for launch August 22, 2008 from Vandenberg Air Force Base aboard a Delta II launch vehicle. The launch was postponed to September 4, 2008, due to unavailability of the Big Crow telemetry-relay aircraft. It was delayed again to September 6 because Hurricane Hanna interfered with its launch crews.</p>\r\n\r\n<p>The launch took place successfully on September 6, 2008 at 11:50:57 a.m. PDT (18:50:57 UTC). The GeoEye-1 satellite separated successfully from its Delta II launch vehicle at 12:49 p.m. PDT (19:49 UTC), 58 minutes and 56 seconds after launch.</p>"
}
],
"id": 76,
"version": "1.0",
"name": "simple",
"clock": {
"range": "LOOP_STOP",
"step": "SYSTEM_CLOCK_MULTIPLIER",
"currentTime": null,
"interval": "2023-01-01T09:15:47.954Z/2023-01-01T09:15:53.894Z",
"multiplier": 0.15
}
}
],
"country_code": null
},
"@version": "1"
}
},
{
"_index": "disruptive_events",
"_id": "91859",
"_score": 1,
"_ignored": [
"czml_packet_description.keyword"
],
"_source": {
"geoint_producer_msg_seq_num": 80,
"czml_position_cartographicdegrees": [
0,
9.589343,
37.0299964,
200,
0.32999999999999996,
9.599343,
37.0399964,
1156,
0.6599999999999999,
9.609342999999999,
37.0499964,
1096,
0.9899999999999999,
9.619342999999999,
37.059996399999996,
1271,
1.3199999999999998,
9.629342999999999,
37.069996399999994,
1304,
1.65,
9.639342999999998,
37.07999639999999,
2071,
1.9799999999999998,
9.649342999999998,
37.08999639999999,
1837,
2.3099999999999996,
9.659342999999998,
37.09999639999999,
1995,
2.6399999999999997,
9.669342999999998,
37.109996399999986,
1817,
2.9699999999999998,
9.679342999999998,
37.119996399999984,
2015,
3.3,
9.689342999999997,
37.12999639999998,
2268,
3.6299999999999994,
9.699342999999997,
37.13999639999998,
1879,
3.9599999999999995,
9.709342999999997,
37.14999639999998,
1792,
4.289999999999999,
9.719342999999997,
37.159996399999976,
1651,
4.619999999999999,
9.729342999999997,
37.169996399999974,
1390,
4.949999999999999,
9.739342999999996,
37.17999639999997,
950,
5.279999999999999,
9.749342999999996,
37.18999639999997,
1131,
5.609999999999999,
9.759342999999996,
37.19999639999997,
871,
5.9399999999999995,
9.769342999999996,
37.209996399999966,
553
],
"sigint_stop": "2023-01-11T12:25:27.852Z",
"sigint_countrycode": "CC",
"czml_billboard_horizontalorigin": "CENTER",
"flags": "x,a,b,c",
"sigint_orgsvid": "228",
"@version": "1",
"geoint_correlindx": "LL",
"sigint_numobservations": 9,
"czml_label_outlinewidth": 2,
"czml_label_text": "",
"geoint_orientation": 63.11525047596006,
"geoint_id": 254,
"czml_label_verticalorigin": "CENTER",
"geoint_midb_equip_code": "MEC916",
"sensor_id": 237,
"code": 9,
"czml_packet_name": "Geoeye1",
"czml_label_style": "FILL_AND_OUTLINE",
"aor": "USCENTCOM",
"geoint_tibs_msg_num": 62,
"czml_label_packet_id": 80,
"country_code": null,
"czml_document_version": "1.0",
"czml_position_packet_id": 80,
"max": 1831,
"event_id": 91859,
"czml_document_clock_range": "LOOP_STOP",
"start": "2023-01-11T12:25:21.582Z",
"czml_packet_id": 80,
"czml_billboard_pixeloffset_cartesian2": [
0,
0
],
"czml_path_width": 1,
"czml_label_outlinecolor_rgba": [
0,
0,
255,
255
],
"czml_billboard_eyeoffset_cartesian": [
0,
0,
0
],
"sigint_orientation": 104.95,
"sensor_name": "sensor x",
"czml_path_show_boolean": true,
"stop": "2023-01-11T12:25:27.852Z",
"geoint_altitude": 911.4745835187982,
"geoint_trueheadingdegrees": 210.99991295912747,
"sigint_svid": 798,
"sigint_frequency": 70.886,
"min": -27,
"geoint_tes_eventid": 82,
"comments": "Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod temporincididunt ut labore et dolore magna aliqua.",
"sigint_signot": "Not2",
"czml_document_clock_interval": "2023-01-11T12:25:21.582Z/2023-01-11T12:25:27.852Z",
"sensor_data_label": "2023-01-11T12:25:21.912Z",
"czml_label_horizontalorigin": "LEFT",
"geoint_detecttime": "2023-01-11T12:25:04.582Z",
"sigint_confidence": 0.72,
"geoint_tdds_scn": 94,
"geoint_entity_upd_num": 73,
"czml_billboard_id": 62,
"czml_document_name": "simple",
"sigint_lon": 9.588967,
"czml_path_show_interval": "2023-01-11T12:25:21.582Z/2023-01-11T12:25:27.852Z",
"czml_document_clock_multiplier": 0.15,
"sensor_code": "x",
"czml_path_material_solidcolor_color_rgba": [
0,
255,
0,
255
],
"geoint_deltatime": 0.3196382414595468,
"lat": 9.589343,
"czml_document_clock_currenttime": "2023-01-11T12:25:21.582Z",
"geoint_smnraxis": 2.0111347192039224,
"czml_packet_availability": "2023-01-11T12:25:21.582Z/2023-01-11T12:25:27.852Z",
"czml_position_interpolationalgorithm": "LAGRANGE",
"czml_label_pixeloffset_cartesian2": [
12,
0
],
"lon": 37.0299964,
"czml_document_clock_step": "SYSTEM_CLOCK_MULTIPLIER",
"gif": "viirs4.gif",
"sigint_type": "COMINT",
"duration": 940000,
"czml_position_id": 49,
"@timestamp": "2024-04-04T22:57:02.977761259Z",
"czml_document_id": 82,
"sigint_smjraxis": 9.42,
"czml_label_show": true,
"czml_label_fillcolor_rgba": [
0,
255,
0,
255
],
"type": "Missile 5",
"geoint_lon": 9.58423964595611,
"czml_billboard_packet_id": 80,
"czml_path_resolution": 120,
"czml_label_font": "11pt Lucida Console",
"czml_billboard_show": true,
"czml_label_id": 49,
"sensor_data_id": 4214,
"sigint_lat": 37.0283214,
"czml_billboard_image_uri": "plane-icon.png",
"czml_position_interpolationdegree": 1,
"sigint_id": 263,
"quantity": "US",
"czml_position_epoch": "2023-01-11T12:25:21.582Z",
"czml_path_id": 49,
"sigint_functioncode": "FC571",
"geoint_tdds_trk_num": "TK857",
"geoint_lat": 37.03736379067881,
"geoint_crsdegrees": 337.49746226827347,
"czml_path_packet_id": 80,
"sensor_data_y": 545,
"czml_packet_document_id": 82,
"czml_billboard_verticalorigin": "CENTER",
"sigint_loationtype": "Type2",
"uuid": "91859",
"sigint_start": "2023-01-11T12:25:21.582Z",
"geoint_smjraxis": 4.2812689440137355,
"geoint_uuid": "UUID-934088",
"sigint_smnraxis": 1.31,
"czml_packet_description": "<!--HTML-->\r\n<p>GeoEye-1 is a high-resolution earth observation satellite owned by GeoEye, which was launched in September 2008.</p>\r\n\r\n<p>On December 1, 2004, General Dynamics C4 Systems announced it had been awarded a contract worth approximately $209 million to build the OrbView-5 satellite. Its sensor is designed by the ITT Exelis.</p>\r\n\r\n<p>The satellite, now known as GeoEye-1, was originally scheduled for April 2008 but lost its 30-day launch slot to a U.S. government mission which had been delayed. It was rescheduled for launch August 22, 2008 from Vandenberg Air Force Base aboard a Delta II launch vehicle. The launch was postponed to September 4, 2008, due to unavailability of the Big Crow telemetry-relay aircraft. It was delayed again to September 6 because Hurricane Hanna interfered with its launch crews.</p>\r\n\r\n<p>The launch took place successfully on September 6, 2008 at 11:50:57 a.m. PDT (18:50:57 UTC). The GeoEye-1 satellite separated successfully from its Delta II launch vehicle at 12:49 p.m. PDT (19:49 UTC), 58 minutes and 56 seconds after launch.</p>",
"czml_billboard_scale": 1.5
}
},
Here is my logstash.conf:
input {
jdbc {
jdbc_driver_library => "/usr/share/logstash/postgresql-42.7.1.jar"
jdbc_driver_class => "org.postgresql.Driver"
jdbc_connection_string => "jdbc:postgresql://postgres:5432/postgres"
jdbc_user => "postgres"
jdbc_password => "postgres"
schedule => "* * * * *" # every minute, adjust according to your needs
statement => "SELECT
disruptive_event.id AS event_id, disruptive_event.code, disruptive_event.start, disruptive_event.stop,
disruptive_event.quantity, disruptive_event.country_code, disruptive_event.comments, disruptive_event.flags,
disruptive_event.type, disruptive_event.gif, disruptive_event.max, disruptive_event.min, disruptive_event.duration, disruptive_event.lat,
disruptive_event.lon, disruptive_event.aor, disruptive_event.uuid,
assoc_geoint.id AS geoint_id, assoc_geoint.Lat AS geoint_lat, assoc_geoint.Lon AS geoint_lon, assoc_geoint.Altitude AS geoint_altitude,
assoc_geoint.SmjrAxis AS geoint_smjraxis, assoc_geoint.SmnrAxis AS geoint_smnraxis, assoc_geoint.Orientation AS geoint_orientation,
assoc_geoint.trueHeadingDegrees AS geoint_trueheadingdegrees, assoc_geoint.crsDegrees AS geoint_crsdegrees, assoc_geoint.correlIndx AS geoint_correlindx,
assoc_geoint.TDDS_SCN AS geoint_tdds_scn, assoc_geoint.TDDS_Trk_Num AS geoint_tdds_trk_num, assoc_geoint.TIBS_MSG_NUM AS geoint_tibs_msg_num,
assoc_geoint.TES_EventID AS geoint_tes_eventid, assoc_geoint.MIDB_Equip_Code AS geoint_midb_equip_code, assoc_geoint.Entity_Upd_Num AS geoint_entity_upd_num,
assoc_geoint.Producer_Msg_Seq_Num AS geoint_producer_msg_seq_num, assoc_geoint.DetectTime AS geoint_detecttime, assoc_geoint.DeltaTime AS geoint_deltatime, assoc_geoint.UUID AS geoint_uuid,
assoc_sigint.id AS sigint_id, assoc_sigint.SVID AS sigint_svid, assoc_sigint.sigintType AS sigint_type , assoc_sigint.orgSvid AS sigint_orgsvid,
assoc_sigint.start AS sigint_start, assoc_sigint.stop AS sigint_stop, assoc_sigint.confidence AS sigint_confidence, assoc_sigint.loationType AS sigint_loationtype,
assoc_sigint.signot AS sigint_signot, assoc_sigint.functionCode AS sigint_functioncode, assoc_sigint.numObservations AS sigint_numobservations,
assoc_sigint.country_code AS sigint_countrycode, assoc_sigint.Lat AS sigint_lat, assoc_sigint.Lon AS sigint_lon, assoc_sigint.SmjrAxis AS sigint_smjraxis,
assoc_sigint.SmnrAxis AS sigint_smnraxis, assoc_sigint.Orientation AS sigint_orientation, assoc_sigint.Frequency AS sigint_frequency,
sensor.id AS sensor_id, sensor.name AS sensor_name, sensor.code AS sensor_code,
sensor_data.id AS sensor_data_id, sensor_data.label AS sensor_data_label, sensor_data.y AS sensor_data_y,
czml_document.id AS czml_document_id, czml_document.name AS czml_document_name, czml_document.version AS czml_document_version,
czml_document.clock_interval AS czml_document_clock_interval, czml_document.clock_currentTime AS czml_document_clock_currentTime,
czml_document.clock_multiplier AS czml_document_clock_multiplier, czml_document.clock_range AS czml_document_clock_range,
czml_document.clock_step AS czml_document_clock_step,
czml_packet.id AS czml_packet_id, czml_packet.document_id AS czml_packet_document_id, czml_packet.name AS czml_packet_name,
czml_packet.availability AS czml_packet_availability, czml_packet.description AS czml_packet_description,
czml_billboard.id AS czml_billboard_id, czml_billboard.packet_id AS czml_billboard_packet_id, czml_billboard.eyeOffset_cartesian AS czml_billboard_eyeoffset_cartesian,
czml_billboard.horizontalOrigin AS czml_billboard_horizontalorigin, czml_billboard.image_uri AS czml_billboard_image_uri, czml_billboard.pixelOffset_cartesian2 AS czml_billboard_pixeloffset_cartesian2,
czml_billboard.scale AS czml_billboard_scale, czml_billboard.show AS czml_billboard_show, czml_billboard.verticalOrigin AS czml_billboard_verticalorigin,
czml_label.id AS czml_label_id, czml_label.packet_id AS czml_label_packet_id, czml_label.fillColor_rgba AS czml_label_fillcolor_rgba,
czml_label.font AS czml_label_font, czml_label.horizontalOrigin AS czml_label_horizontalorigin, czml_label.outlineColor_rgba AS czml_label_outlinecolor_rgba,
czml_label.outlineWidth AS czml_label_outlinewidth, czml_label.pixelOffset_cartesian2 AS czml_label_pixeloffset_cartesian2, czml_label.show AS czml_label_show,
czml_label.style AS czml_label_style, czml_label.text AS czml_label_text, czml_label.verticalOrigin AS czml_label_verticalorigin,
czml_path.id AS czml_path_id, czml_path.packet_id AS czml_path_packet_id, czml_path.show_boolean AS czml_path_show_boolean, czml_path.show_interval AS czml_path_show_interval,
czml_path.width AS czml_path_width,
czml_path.material_solidColor_color_rgba AS czml_path_material_solidcolor_color_rgba, czml_path.resolution AS czml_path_resolution,
czml_position.id AS czml_position_id, czml_position.packet_id AS czml_position_packet_id, czml_position.interpolationAlgorithm AS czml_position_interpolationalgorithm,
czml_position.interpolationDegree AS czml_position_interpolationdegree, czml_position.epoch AS czml_position_epoch, czml_position.cartographicDegrees AS czml_position_cartographicdegrees
FROM disruptive_event
LEFT JOIN assoc_geoint ON disruptive_event.id = assoc_geoint.event_id
LEFT JOIN assoc_sigint ON disruptive_event.id = assoc_sigint.event_id
LEFT JOIN sensor ON disruptive_event.id = sensor.event_id
LEFT JOIN sensor_data ON sensor.id = sensor_data.sensor_id
LEFT JOIN czml_document ON disruptive_event.id = czml_document.event_id
LEFT JOIN czml_packet ON czml_document.id = czml_packet.document_id
LEFT JOIN czml_billboard ON czml_packet.id = czml_billboard.packet_id
LEFT JOIN czml_label ON czml_packet.id = czml_label.packet_id
LEFT JOIN czml_path ON czml_packet.id = czml_path.packet_id
LEFT JOIN czml_position ON czml_packet.id = czml_position.packet_id"
}
}
filter {
aggregate {
task_id => "%{event_id}"
code => "
map['event_id'] = event.get('event_id')
map['event'] ||= {
'id' => event.get('event_id'),
'code' => event.get('code'),
'start' => event.get('start'),
'stop' => event.get('stop'),
'quantity' => event.get('quantity'),
'country_code' => event.get('country_code'),
'comments' => event.get('comments'),
'flags' => event.get('flags'),
'type' => event.get('type'),
'gif' => event.get('gif'),
'max' => event.get('max'),
'min' => event.get('min'),
'duration' => event.get('duration'),
'lat' => event.get('lat'),
'lon' => event.get('lon'),
'aor' => event.get('aor'),
'uuid' => event.get('uuid'),
'assoc_geoint' => [],
'assoc_sigint' => [],
'sensor' => [],
'czml_document' => []
}
if event.get('geoint_id')
# Check if this geoint entry already exists in the array.
unless map['event']['assoc_geoint'].any? { |geoint| geoint['id'] == event.get('geoint_id') }
map['event']['assoc_geoint'] << {
'id' => event.get('geoint_id'),
'Lat' => event.get('geoint_lat'),
'Lon' => event.get('geoint_lon'),
'Altitude' => event.get('geoint_altitude'),
'SmjrAxis' => event.get('geoint_smjraxis'),
'SmnrAxis' => event.get('geoint_smnraxis'),
'Orientation' => event.get('geoint_orientation'),
'trueHeadingDegrees' => event.get('geoint_trueheadingdegrees'),
'crsDegrees' => event.get('geoint_crsdegrees'),
'correlIndx' => event.get('geoint_correlindx'),
'TDDS_SCN' => event.get('geoint_tdds_scn'),
'TDDS_Trk_Num' => event.get('geoint_tdds_trk_num'),
'TIBS_MSG_NUM' => event.get('geoint_tibs_msg_num'),
'TES_EventID' => event.get('geoint_tes_eventid'),
'MIDB_Equip_Code' => event.get('geoint_midb_equip_code'),
'Entity_Upd_Num' => event.get('geoint_entity_upd_num'),
'Producer_Msg_Seq_Num' => event.get('geoint_producer_msg_seq_num'),
'DetectTime' => event.get('geoint_detecttime'),
'DeltaTime' => event.get('geoint_deltatime'),
'UUID' => event.get('geoint_uuid')
}
end
end
if event.get('sigint_id')
# Check if this sigint entry already exists in the array
unless map['event']['assoc_sigint'].any? { |sigint| sigint['id'] == event.get('sigint_id') }
map['event']['assoc_sigint'] << {
'id' => event.get('sigint_id'),
'SVID' => event.get('sigint_svid'),
'sigintType' => event.get('sigint_type'),
'orgSvid' => event.get('sigint_orgsvid'),
'start' => event.get('sigint_start'),
'stop' => event.get('sigint_stop'),
'confidence' => event.get('sigint_confidence'),
'loationType' => event.get('sigint_loationtype'),
'signot' => event.get('sigint_signot'),
'functionCode' => event.get('sigint_functioncode'),
'numObservations' => event.get('sigint_numobservations'),
'country_code' => event.get('sigint_countrycode'),
'Lat' => event.get('sigint_lat'),
'Lon' => event.get('sigint_lon'),
'SmjrAxis' => event.get('sigint_smjraxis'),
'SmnrAxis' => event.get('sigint_smnraxis'),
'Orientation' => event.get('sigint_orientation'),
'Frequency' => event.get('sigint_frequency')
}
end
end
if event.get('sensor_id')
sensor_index = map['event']['sensor'].find_index { |sensor| sensor['id'] == event.get('sensor_id') }
if sensor_index.nil?
# Sensor doesn't exist, create it and add the sensor data
map['event']['sensor'] << {
'id' => event.get('sensor_id'),
'name' => event.get('sensor_name'),
'code' => event.get('sensor_code'),
'data' => event.get('sensor_data_id') ? [{
'id' => event.get('sensor_data_id'),
'label' => event.get('sensor_data_label'),
'y' => event.get('sensor_data_y')
}] : []
}
else
# Sensor exists, add sensor data if it's not already there
unless map['event']['sensor'][sensor_index]['data'].any? { |data| data['id'] == event.get('sensor_data_id') }
map['event']['sensor'][sensor_index]['data'] << {
'id' => event.get('sensor_data_id'),
'label' => event.get('sensor_data_label'),
'y' => event.get('sensor_data_y')
}
end
end
end
# Handling CZML Documents
document = map['event']['czml_document'].find { |doc| doc['id'] == event.get('czml_document_id') }
if document.nil? && event.get('czml_document_id')
document = {
'id' => event.get('czml_document_id'),
'name' => event.get('czml_document_name'),
'version' => event.get('czml_document_version'),
'clock' => {
'interval' => event.get('czml_document_clock_interval'),
'currentTime' => event.get('czml_document_clock_currentTime'),
'multiplier' => event.get('czml_document_clock_multiplier'),
'range' => event.get('czml_document_clock_range'),
'step' => event.get('czml_document_clock_step')
},
'packet' => []
}
map['event']['czml_document'] << document
end
# Handling CZML Packets within Documents
if event.get('czml_document_id') && event.get('czml_packet_id')
packet = document['packet'].find { |pkt| pkt['id'] == event.get('czml_packet_id') }
if packet.nil?
packet = {
'id' => event.get('czml_packet_id'),
'document_id' => event.get('czml_document_id'),
'name' => event.get('czml_packet_name'),
'availability' => event.get('czml_packet_availability'),
'description' => event.get('czml_packet_description'),
'billboard' => [],
'label' => [],
'path' => [],
'position' => []
}
document['packet'] << packet
end
# Handling Billboard
if event.get('czml_billboard_id') && packet['billboard'].none? {|b| b['id'] == event.get('czml_billboard_id')}
billboard = {
'id' => event.get('czml_billboard_id'),
'eyeOffset' => {
'cartesian' => event.get('czml_billboard_eyeoffset_cartesian')
},
'horizontalOrigin' => event.get('czml_billboard_horizontalorigin'),
'image' => {
'uri' => event.get('czml_billboard_image_uri')
},
'pixelOffset' => {
'cartesian2' => event.get('czml_billboard_pixeloffset_cartesian2')
},
'scale' => event.get('czml_billboard_scale'),
'show' => event.get('czml_billboard_show'),
'verticalOrigin' => event.get('czml_billboard_verticalorigin')
}
packet['billboard'] << billboard
end
# Handling Label
if event.get('czml_label_id') && packet['label'].none? {|l| l['id'] == event.get('czml_label_id')}
label = {
'id' => event.get('czml_label_id'),
'fillColor' => {
'rgba' => event.get('czml_label_fillcolor_rgba')
},
'font' => event.get('czml_label_font'),
'horizontalOrigin' => event.get('czml_label_horizontalorigin'),
'outlineColor' => {
'rgba' => event.get('czml_label_outlinecolor_rgba')
},
'outlineWidth' => event.get('czml_label_outlinewidth'),
'pixelOffset' => {
'cartesian2' => event.get('czml_label_pixeloffset_cartesian2')
},
'show' => event.get('czml_label_show'),
'style' => event.get('czml_label_style'),
'text' => event.get('czml_label_text'),
'verticalOrigin' => event.get('czml_label_verticalorigin')
}
packet['label'] << label
end
# Handling Path
if event.get('czml_path_id') && packet['path'].none? {|p| p['id'] == event.get('czml_path_id')}
path = {
'id' => event.get('czml_path_id'),
'show' => {
'interval' => event.get('czml_path_show_interval'),
'boolean' => event.get('czml_path_show_boolean')
},
'width' => event.get('czml_path_width'),
'material' => {
'solidColor' => {
'color' => event.get('czml_path_material_solidcolor_color_rgba')
}
},
'resolution' => event.get('czml_path_resolution')
}
packet['path'] << path
end
# Handling Position
if event.get('czml_position_id') && packet['position'].none? {|p| p['id'] == event.get('czml_position_id')}
position = {
'id' => event.get('czml_position_id'),
'interpolationAlgorithm' => event.get('czml_position_interpolationalgorithm'),
'interpolationDegree' => event.get('czml_position_interpolationdegree'),
'epoch' => event.get('czml_position_epoch'),
'cartographicDegrees' => event.get('czml_position_cartographicdegrees')
}
packet['position'] << position
end
end
"
push_previous_map_as_event => true
timeout => 5
}
}
output {
elasticsearch {
hosts => ["http://elasticsearch:9200"]
index => "disruptive_events"
document_id => "%{event_id}" # Use the 'event_id' as the document ID
}
#stdout { codec => json_lines }
#stdout { codec => rubydebug }
}
Thanks for the help!