Ingesting JSON Data Samples w/ Logstash

Hello,

I'm trying to test a machine learning classifier in elasticsearch, but I am having a hard time ingesting my data. I've tried using JQ to stream the data via Bulk API, so now I'm ready to try Logstash which may be far easier than writing code for this task. I need help with formatting as well because the data is in .jsonl format. For testing purposes, I'm using the Elastic Ember testing malware data and the version of my elastic cluster is 7.13. Below is a sample of the data:

{"sha256": "0abb4fda7d5b13801d63bee53e5e256be43e141faa077a6d149874242c3f02c2", "md5": "63956d6417f8f43357d9a8e79e52257e", "appeared": "2006-12", "label": 0, "avclass": "", "histogram": [45521, 13095, 12167, 12496, 12429, 11709, 11864, 12057, 12881, 11798, 11802, 11783, 12029, 12081, 11756, 12532, 11980, 11628, 11504, 11715, 11809, 12414, 11779, 11708, 11956, 11622, 11859, 11775, 11717, 11507, 11873, 11781, 12015, 11690, 11676, 11782, 11820, 11859, 12025, 11786, 11731, 11445, 11556, 11676, 12057, 11636, 11669, 11903, 12004, 11741, 11833, 12329, 11778, 11859, 11806, 11586, 11775, 11885, 11863, 12047, 11869, 12077, 11724, 12037, 13129, 11931, 12101, 12202, 11956, 12625, 11877, 11804, 11999, 11869, 11578, 11591, 11933, 12020, 11695, 11915, 12565, 11755, 11597, 12224, 11786, 11709, 12321, 12325, 11671, 11624, 11573, 11879, 11578, 11802, 12060, 11792, 11527, 12248, 11703, 11793, 12143, 12701, 12071, 11871, 12582, 12346, 12303, 11892, 12190, 12011, 11826, 12261, 12139, 11913, 11994, 12155, 13023, 13136, 11897, 12164, 12228, 11972, 11916, 11951, 12061, 12243, 12009, 12266, 12655, 12023, 11819, 12283, 11882, 12303, 11751, 11888, 11976, 12472, 11622, 13260, 11969, 12127, 11735, 12024, 11592, 11699, 11604, 11657, 11974, 11714, 11918, 11815, 11851, 11806, 11710, 11590, 11835, 11971, 11757, 11874, 11813, 11834, 11610, 11723, 11988, 11714, 11774, 12021, 11816, 11834, 11607, 11829, 11665, 11641, 11722, 11869, 11864, 11784, 11528, 11733, 11923, 11749, 11972, 11721, 11977, 11712, 11772, 11721, 11891, 11796, 11991, 12200, 12432, 11643, 11877, 12040, 11874, 11804, 11932, 12179, 11940, 11764, 11743, 11653, 11854, 11800, 12092, 12021, 11969, 11931, 11890, 11982, 11956, 11710, 11792, 12095, 11749, 11815, 11722, 11825, 11846, 11804, 11567, 11926, 11839, 11814, 11921, 11981, 11910, 11640, 11681, 12030, 12822, 12105, 12001, 12008, 12180, 11862, 11992, 11888, 12211, 12155, 11734, 11819, 12154, 11696, 12185, 11951, 12511, 12001, 11914, 11872, 12342, 12170, 12596, 22356], "byteentropy": [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1898, 6, 6, 3, 1, 1, 11, 19, 32, 3, 2, 4, 7, 9, 15, 31, 1864, 11, 9, 7, 8, 15, 25, 12, 12, 8, 10, 6, 8, 12, 23, 18, 1774, 13, 12, 17, 26, 15, 51, 26, 33, 8, 10, 10, 13, 3, 8, 29, 1631, 13, 28, 19, 29, 37, 105, 53, 52, 16, 11, 6, 14, 7, 12, 15, 4564, 92, 129, 52, 121, 197, 180, 120, 196, 13, 7, 20, 39, 16, 36, 362, 8838, 197, 304, 225, 238, 327, 473, 284, 338, 21, 53, 80, 111, 36, 77, 686, 5752, 86, 226, 182, 290, 180, 965, 518, 208, 110, 180, 165, 197, 207, 217, 2805, 7344, 169, 324, 230, 860, 490, 2336, 1285, 501, 277, 259, 360, 902, 321, 566, 4256, 8104, 268, 489, 434, 1219, 633, 3454, 2183, 688, 619, 493, 607, 534, 383, 375, 6141, 7935, 297, 487, 394, 733, 455, 1048, 1313, 1035, 560, 485, 439, 564, 359, 462, 1866, 3005, 89, 238, 175, 372, 262, 590, 517, 519, 394, 226, 236, 295, 308, 307, 659, 17257, 2362, 1165, 2188, 4313, 3314, 2149, 4166, 5604, 1518, 1984, 1812, 3512, 2432, 3869, 7891, 14457, 3068, 2198, 2894, 5416, 4317, 2584, 4108, 6279, 1030, 1379, 1225, 2923, 1626, 3374, 6610, 370748, 370509, 370926, 373544, 370740, 370211, 372830, 375415, 371989, 372095, 371755, 373615, 372116, 373375, 373929, 375883], "strings": {"numstrings": 14573, "avlength": 5.972071639333013, "printabledist": [1046, 817, 877, 803, 738, 909, 831, 842, 871, 763, 796, 773, 821, 839, 959, 831, 877, 789, 824, 840, 863, 812, 887, 856, 787, 819, 849, 849, 833, 898, 852, 858, 751, 986, 859, 887, 935, 943, 904, 959, 827, 899, 772, 858, 875, 896, 879, 917, 916, 795, 823, 974, 891, 853, 910, 918, 822, 807, 825, 832, 801, 812, 826, 836, 811, 1157, 879, 957, 1111, 1611, 930, 935, 927, 1217, 867, 915, 1185, 1039, 1169, 1231, 956, 844, 1196, 1133, 1411, 1023, 850, 960, 965, 915, 853, 802, 836, 845, 804, 900], "printables": 87031, "entropy": 6.569897560341239, "paths": 3, "urls": 0, "registry": 0, "MZ": 51}, "general": {"size": 3101705, "vsize": 380928, "has_debug": 0, "exports": 0, "imports": 156, "has_relocations": 0, "has_resources": 1, "has_signature": 0, "has_tls": 0, "symbols": 0}, "header": {"coff": {"timestamp": 1124149349, "machine": "I386", "characteristics": ["CHARA_32BIT_MACHINE", "RELOCS_STRIPPED", "EXECUTABLE_IMAGE", "LINE_NUMS_STRIPPED", "LOCAL_SYMS_STRIPPED"]}, "optional": {"subsystem": "WINDOWS_GUI", "dll_characteristics": [], "magic": "PE32", "major_image_version": 0, "minor_image_version": 0, "major_linker_version": 7, "minor_linker_version": 10, "major_operating_system_version": 4, "minor_operating_system_version": 0, "major_subsystem_version": 4, "minor_subsystem_version": 0, "sizeof_code": 26624, "sizeof_headers": 1024, "sizeof_heap_commit": 4096}}, "section": {"entry": ".text", "sections": [{"name": ".text", "size": 26624, "entropy": 6.532239617101003, "vsize": 26134, "props": ["CNT_CODE", "MEM_EXECUTE", "MEM_READ"]}, {"name": ".rdata", "size": 6656, "entropy": 5.433081641309689, "vsize": 6216, "props": ["CNT_INITIALIZED_DATA", "MEM_READ"]}, {"name": ".data", "size": 512, "entropy": 1.7424160994148217, "vsize": 172468, "props": ["CNT_INITIALIZED_DATA", "MEM_READ", "MEM_WRITE"]}, {"name": ".rsro", "size": 0, "entropy": -0.0, "vsize": 135168, "props": ["CNT_UNINITIALIZED_DATA", "MEM_READ", "MEM_WRITE"]}, {"name": ".rsrc", "size": 27648, "entropy": 5.020929764194735, "vsize": 28672, "props": ["CNT_INITIALIZED_DATA", "MEM_READ"]}]}, "imports": {"KERNEL32.dll": ["SetFileTime", "CompareFileTime", "SearchPathA", "GetShortPathNameA", "GetFullPathNameA", "MoveFileA", "lstrcatA", "SetCurrentDirectoryA", "GetFileAttributesA", "GetLastError", "CreateDirectoryA", "SetFileAttributesA", "Sleep", "GetTickCount", "GetFileSize", "GetModuleFileNameA", "ExitProcess", "GetCurrentProcess", "CopyFileA", "lstrcpynA", "GetCommandLineA", "GetWindowsDirectoryA", "CloseHandle", "GetUserDefaultLangID", "GetDiskFreeSpaceA", "GlobalUnlock", "GlobalLock", "GlobalAlloc", "CreateThread", "CreateProcessA", "CreateFileA", "GetTempFileNameA", "lstrcpyA", "lstrlenA", "SetEndOfFile", "UnmapViewOfFile", "MapViewOfFile", "CreateFileMappingA", "GetSystemDirectoryA", "RemoveDirectoryA", "lstrcmpA", "GetVolumeInformationA", "InterlockedExchange", "RtlUnwind", "lstrcmpiA", "GetEnvironmentVariableA", "ExpandEnvironmentStringsA", "GlobalFree", "WaitForSingleObject", "GetExitCodeProcess", "SetErrorMode", "GetModuleHandleA", "LoadLibraryA", "GetProcAddress", "FreeLibrary", "MultiByteToWideChar", "WritePrivateProfileStringA", "GetPrivateProfileStringA", "VirtualQuery", "WriteFile", "ReadFile", "SetFilePointer", "FindClose", "FindNextFileA", "FindFirstFileA", "DeleteFileA", "GetTempPathA", "MulDiv"], "USER32.dll": ["CloseClipboard", "SetClipboardData", "EmptyClipboard", "OpenClipboard", "TrackPopupMenu", "GetWindowRect", "AppendMenuA", "CreatePopupMenu", "GetSystemMetrics", "EndDialog", "SetWindowPos", "SetClassLongA", "IsWindowEnabled", "DialogBoxParamA", "LoadBitmapA", "GetClassInfoA", "SetDlgItemTextA", "GetDlgItemTextA", "MessageBoxA", "CharPrevA", "LoadCursorA", "GetWindowLongA", "GetSysColor", "CharNextA", "ExitWindowsEx", "CreateDialogParamA", "DestroyWindow", "SetTimer", "SetCursor", "IsWindowVisible", "CallWindowProcA", "GetMessagePos", "ScreenToClient", "CheckDlgButton", "RegisterClassA", "SetWindowTextA", "wsprintfA", "SetForegroundWindow", "ShowWindow", "SendMessageTimeoutA", "FindWindowExA", "IsWindow", "GetDlgItem", "SetWindowLongA", "GetClientRect", "LoadImageA", "GetDC", "EnableWindow", "PeekMessageA", "DispatchMessageA", "SendMessageA", "InvalidateRect", "PostQuitMessage"], "GDI32.dll": ["SetTextColor", "SetBkMode", "SetBkColor", "CreateBrushIndirect", "DeleteObject", "CreateFontIndirectA", "GetDeviceCaps"], "SHELL32.dll": ["SHFileOperationA", "SHGetSpecialFolderLocation", "SHGetMalloc", "SHBrowseForFolderA", "SHGetPathFromIDListA", "ShellExecuteA"], "ADVAPI32.dll": ["RegEnumValueA", "RegSetValueExA", "RegQueryValueExA", "RegOpenKeyExA", "RegEnumKeyA", "RegDeleteValueA", "RegDeleteKeyA", "RegCloseKey", "RegCreateKeyExA"], "COMCTL32.dll": ["ImageList_AddMasked", "ImageList_Create", "ImageList_Destroy", "ordinal17"], "ole32.dll": ["OleInitialize", "CoCreateInstance", "OleUninitialize"], "VERSION.dll": ["VerQueryValueA", "GetFileVersionInfoA", "GetFileVersionInfoSizeA"], "snmpapi.dll": ["SnmpUtilOidCpy", "SnmpUtilOidNCmp", "SnmpUtilVarBindFree"]}, "exports": [], "datadirectories": [{"name": "EXPORT_TABLE", "size": 0, "virtual_address": 0}, {"name": "IMPORT_TABLE", "size": 200, "virtual_address": 35312}, {"name": "RESOURCE_TABLE", "size": 28672, "virtual_address": 352256}, {"name": "EXCEPTION_TABLE", "size": 0, "virtual_address": 0}, {"name": "CERTIFICATE_TABLE", "size": 0, "virtual_address": 0}, {"name": "BASE_RELOCATION_TABLE", "size": 0, "virtual_address": 0}, {"name": "DEBUG", "size": 0, "virtual_address": 0}, {"name": "ARCHITECTURE", "size": 0, "virtual_address": 0}, {"name": "GLOBAL_PTR", "size": 0, "virtual_address": 0}, {"name": "TLS_TABLE", "size": 0, "virtual_address": 0}, {"name": "LOAD_CONFIG_TABLE", "size": 72, "virtual_address": 35240}, {"name": "BOUND_IMPORT", "size": 0, "virtual_address": 0}, {"name": "IAT", "size": 660, "virtual_address": 32768}, {"name": "DELAY_IMPORT_DESCRIPTOR", "size": 0, "virtual_address": 0}, {"name": "CLR_RUNTIME_HEADER", "size": 0, "virtual_address": 0}]}

If you have a file that contains one JSON object per line then you can ingest it using a file input with a json codec, or a file input with a json filter.

1 Like

Thank you for the response. I have about 300k lines of the data mentioned above in each file. Should I use the Json_lines codec as well?

No, do not use a json_lines codec (see the documentation). Use a json codec.

Thank you, I read it and realized it was for json with "/n". I was able to get my pipeline to connect to my elasticsearch cluster, but I don't see any data being written to my specified index "ember-*". Below is my pipeline.conf:

input {
  file {
    id => "ember_training"
    path => "/path/to/training_features*.jsonl"
    mode => "read"
    codec => "json"
  }
}
output {
    elasticsearch {
    hosts => "https://172.xxx.xxx.xxx:9200"
    user => "elastic_username"
    password => "<elastic_pw>"
    index => "ember-*"
    ssl => "true"
    cacert => '/tmp/elasticsearch-ca.pem'
    }
  }

Does the file exist? Do you really want the default file_completed_action.

I created a new index and edited the pipeline.conf file, but I am still having a no luck with getting data into elasticsearch. When I run the --log.level debug, I don't see anything that really sticks out other than my output configurations shows and logstash shuts down. I also added file_completed_action => "log" into the input plugin.

Try adding

sincedb_path => "/dev/null"

and restarting logstash.

I was actually just looking at that. After diving into the --log.level errors, I found that it was failing because I needed to specify file_completed_log_path.

I'm no longer seeing any errors and I still can't seem to get data through. I'm starting to think the files that I have are too large or maybe the .jsonl extension.

The file input does not care one wit what the file name is. It just globs it and goes.

Unless single lines in the file are absurdly large (the line length being a significant fraction of the memory available to logstash) then I do not think file size would matter either.

If you are running this on the command line I would suggest replacing the elasticsearch output with a stdout output. Make sure logstash is getting the events before worrying about indexing them.

Ok, I will try this as well.

I was still unable to see data even after changing the output to stdout. I also restructured the data to .json. See below one of the log messages:

[2021-06-08T23:11:43,444][DEBUG][org.logstash.config.ir.CompiledPipeline][main] Compiled output
 P[output-stdout{"codec"=>"json"}|[file]/tmp/ember_training.conf:13:3:```
stdout{ codec => "json" }
```]

Increase the log.level to trace and look at the messages from the filewatch library that the file input uses.

So, after looking at the input settings, it seems as though I have some settings that I don't want such as the delimiter setting "/n". Also, if I want to read multiple files, should I use the exit_after_read?

[2021-06-08T23:34:27,991][DEBUG][logstash.codecs.json     ] config LogStash::Codecs::JSON/@id = "json_eeb602e2-70f1-4ac2-ac20-808152ef1954"
[2021-06-08T23:34:27,992][DEBUG][logstash.codecs.json     ] config LogStash::Codecs::JSON/@enable_metric = true
[2021-06-08T23:34:27,992][DEBUG][logstash.codecs.json     ] config LogStash::Codecs::JSON/@charset = "UTF-8"
[2021-06-08T23:34:28,027][DEBUG][logstash.inputs.file     ] config LogStash::Inputs::File/@mode = "read"
[2021-06-08T23:34:28,028][DEBUG][logstash.inputs.file     ] config LogStash::Inputs::File/@path = ["/path/to/data"]
[2021-06-08T23:34:28,033][DEBUG][logstash.inputs.file     ] config LogStash::Inputs::File/@codec = <LogStash::Codecs::JSON id=>"json_eeb602e2-70f1-4ac2-ac20-808152ef1954", enable_metric=>true, charset=>"UTF-8">
[2021-06-08T23:34:28,033][DEBUG][logstash.inputs.file     ] config LogStash::Inputs::File/@id = "data"
[2021-06-08T23:34:28,033][DEBUG][logstash.inputs.file     ] config LogStash::Inputs::File/@file_completed_action = "log"
[2021-06-08T23:34:28,033][DEBUG][logstash.inputs.file     ] config LogStash::Inputs::File/@sincedb_path = "/dev/null"
[2021-06-08T23:34:28,033][DEBUG][logstash.inputs.file     ] config LogStash::Inputs::File/@file_completed_log_path = "/tmp/"
[2021-06-08T23:34:28,034][DEBUG][logstash.inputs.file     ] config LogStash::Inputs::File/@enable_metric = true
[2021-06-08T23:34:28,034][DEBUG][logstash.inputs.file     ] config LogStash::Inputs::File/@add_field = {}
[2021-06-08T23:34:28,035][DEBUG][logstash.inputs.file     ] config LogStash::Inputs::File/@stat_interval = 1.0
[2021-06-08T23:34:28,035][DEBUG][logstash.inputs.file     ] config LogStash::Inputs::File/@discover_interval = 15
[2021-06-08T23:34:28,035][DEBUG][logstash.inputs.file     ] config LogStash::Inputs::File/@sincedb_write_interval = 15.0
[2021-06-08T23:34:28,035][DEBUG][logstash.inputs.file     ] config LogStash::Inputs::File/@start_position = "end"
[2021-06-08T23:34:28,035][DEBUG][logstash.inputs.file     ] config LogStash::Inputs::File/@delimiter = "\n"
[2021-06-08T23:34:28,036][DEBUG][logstash.inputs.file     ] config LogStash::Inputs::File/@close_older = 3600.0
[2021-06-08T23:34:28,036][DEBUG][logstash.inputs.file     ] config LogStash::Inputs::File/@sincedb_clean_after = 1209600.0
[2021-06-08T23:34:28,036][DEBUG][logstash.inputs.file     ] config LogStash::Inputs::File/@file_chunk_size = 32768
[2021-06-08T23:34:28,036][DEBUG][logstash.inputs.file     ] config LogStash::Inputs::File/@file_chunk_count = 140737488355327
[2021-06-08T23:34:28,036][DEBUG][logstash.inputs.file     ] config LogStash::Inputs::File/@file_sort_by = "last_modified"
[2021-06-08T23:34:28,036][DEBUG][logstash.inputs.file     ] config LogStash::Inputs::File/@file_sort_direction = "asc"
[2021-06-08T23:34:28,037][DEBUG][logstash.inputs.file     ] config LogStash::Inputs::File/@exit_after_read = false
[2021-06-08T23:34:28,037][DEBUG][logstash.inputs.file     ] config LogStash::Inputs::File/@check_archive_validity = false

After doing a little research on delimiters, I've found that my .jsonl files were delimited by double quotes, so I would need to change my delimiter to \"

Do you know why the read starts at the "end"? Also, it seems as though the json codec plugin is delimiting as if I'm using the json_lines codec plugin.

[2021-06-09T00:21:59,655][INFO ][logstash.javapipeline    ][main] Pipeline Java execution initialization time {"seconds"=>0.75}
[2021-06-09T00:21:59,873][TRACE][logstash.inputs.file     ][main] Registering file input {:path=>["/tmp/ember2018/training_features_0.json"]}
[2021-06-09T00:21:59,905][INFO ][logstash.javapipeline    ][main] Pipeline started {"pipeline.id"=>"main"}
[2021-06-09T00:21:59,916][DEBUG][org.logstash.execution.PeriodicFlush][main] Pushing flush onto pipeline.
[2021-06-09T00:21:59,918][DEBUG][logstash.javapipeline    ] Pipeline started successfully {:pipeline_id=>"main", :thread=>"#<Thread:0x2af66037 run>"}
[2021-06-09T00:21:59,934][TRACE][logstash.agent           ] Converge results {:success=>true, :failed_actions=>[], :successful_actions=>["id: main, action_type: LogStash::PipelineAction::Create"]}
[2021-06-09T00:21:59,947][INFO ][filewatch.observingread  ][main][ember_training] START, creating Discoverer, Watch with file and sincedb collections
[2021-06-09T00:21:59,967][INFO ][logstash.agent           ] Pipelines running {:count=>1, :running_pipelines=>[:main], :non_running_pipelines=>[]}
[2021-06-09T00:21:59,973][DEBUG][filewatch.sincedbcollection][main][ember_training] open: reading from /dev/null
[2021-06-09T00:21:59,977][TRACE][filewatch.sincedbcollection][main][ember_training] open: count of keys read: 0
[2021-06-09T00:22:00,003][TRACE][filewatch.discoverer     ][main][ember_training] discover_files {:count=>0}
[2021-06-09T00:22:01,019][DEBUG][filewatch.sincedbcollection][main][ember_training] writing sincedb (delta since last write = 1623198121)
[2021-06-09T00:22:01,023][TRACE][filewatch.sincedbcollection][main][ember_training] sincedb_write: /dev/null (time = 2021-06-09 00:22:01 UTC)
[2021-06-09T00:22:02,186][DEBUG][logstash.instrument.periodicpoller.jvm] collector name {:name=>"ParNew"}
[2021-06-09T00:22:02,190][DEBUG][logstash.instrument.periodicpoller.jvm] collector name {:name=>"ConcurrentMarkSweep"}
[2021-06-09T00:22:04,915][DEBUG][org.logstash.execution.PeriodicFlush][main] Pushing flush onto pipeline.
[2021-06-09T00:22:07,200][DEBUG][logstash.instrument.periodicpoller.jvm] collector name {:name=>"ParNew"}
[2021-06-09T00:22:07,200][DEBUG][logstash.instrument.periodicpoller.jvm] collector name {:name=>"ConcurrentMarkSweep"}
[2021-06-09T00:22:09,915][DEBUG][org.logstash.execution.PeriodicFlush][main] Pushing flush onto pipeline.
[2021-06-09T00:22:12,207][DEBUG][logstash.instrument.periodicpoller.jvm] collector name {:name=>"ParNew"}
[2021-06-09T00:22:12,207][DEBUG][logstash.instrument.periodicpoller.jvm] collector name {:name=>"ConcurrentMarkSweep"}
[2021-06-09T00:22:14,038][TRACE][filewatch.discoverer     ][main][ember_training] discover_files {:count=>0}
[2021-06-09T00:22:14,915][DEBUG][org.logstash.execution.PeriodicFlush][main] Pushing flush onto pipeline.
[2021-06-09T00:22:16,039][DEBUG][filewatch.sincedbcollection][main][ember_training] writing sincedb (delta since last write = 15)
[2021-06-09T00:22:16,040][TRACE][filewatch.sincedbcollection][main][ember_training] sincedb_write: /dev/null (time = 2021-06-09 00:22:16 UTC)
[2021-06-09T00:22:17,214][DEBUG][logstash.instrument.periodicpoller.jvm] collector name {:name=>"ParNew"}
[2021-06-09T00:22:17,214][DEBUG][logstash.instrument.periodicpoller.jvm] collector name {:name=>"ConcurrentMarkSweep"}
[2021-06-09T00:22:19,915][DEBUG][org.logstash.execution.PeriodicFlush][main] Pushing flush onto pipeline.
[2021-06-09T00:22:22,221][DEBUG][logstash.instrument.periodicpoller.jvm] collector name {:name=>"ParNew"}
[2021-06-09T00:22:22,222][DEBUG][logstash.instrument.periodicpoller.jvm] collector name {:name=>"ConcurrentMarkSweep"}
[2021-06-09T00:22:24,915][DEBUG][org.logstash.execution.PeriodicFlush][main] Pushing flush onto pipeline.
[2021-06-09T00:22:27,228][DEBUG][logstash.instrument.periodicpoller.jvm] collector name {:name=>"ParNew"}

That is the default value if you do not set it. However, when @mode is read that is ignored.

Ok, thank you