Elastic Json parse into logstash

Hello,

I am retrieving indices from an elasticsearch containing syslogs messages through a curl commande then they are read by a logstash.
and want to extract 4 fields from this messages (datetime_received,"ip_host_pkt","source_msg, "_type") and renamed them to date,host, message, type.

original message from the extract.
{
"_id": "AWUbrXRdjZ-987654akh",
"_index": "index-2018-08-09",
"_score": 0.0,
"_source": {
"datetime_log": "2018-08-09T00:34:36.051+02:00",
"datetime_receive": "2018-08-09T00:34:36.051+02:00",
"group": "DEFAUT",
"ip_host": "22.33.44.55",
"ip_host_pkt": "22.33.44.55",
"source_msg": "22.33.44.55: -Trashback= XXXXXXXXX XXXXXXXXXX XXXXXXXXX 8002DAD8 YYYYYYYY ZZZZZZZZZ UUUUUUUU IIIIIIIIII",
"unix_level": "local7",
"unix_priority": "crit"
},
"_type": "logs_2"
},

Tryed to remove fields I don't want to keep but without success. below is my logstash conf.
I am stuck, if anyone have a clue.

Logstash Config
input {
file {
path => "/data/*.log"
start_position => "beginning"
sincedb_path => "/dev/null"
codec => "json"
}
}

filter {
json {
source => "message"
}
mutate {

remove_field => ["[message][0][_source][unix_priority]" ]
remove_field => ["[message][0][_source][_id]" ]
remove_field => ["[message][0][_source][_index]" ]

}

thnaks

Hi @moughrom

The simplest way to retrieve documents from Elasticsearch and read these into Logstash is by using the Logstash Elasticsearch input plugin, see: Elasticsearch input plugin | Logstash Reference [8.11] | Elastic

the elastic source is secured and not Under my authority, the only way to access it is through the proxy api and only with a POST commande to retrieve an authentication token to launch curl GET command.

also I have a full conf logstash ready but to receive syslog in csv format.

But now I am getting json file as input, so I want to read the multiline json and extract 4 fields (date, msg, host, type) and transform then as csv to process them with my ready logstash csv conf.

Hi @moughrom

If all you need to do is to rename and remove fields, you can do this inside Elasticsearch without extracting the documents, using Ingest operations, here is an example based on your example data:

My problem is that the format of the data source has change from csv to json and I want to lean on the dev done in logstash csv conf to enrich data, geo,...

So we can't bypass the logstash.

I have syslogs in json (elastic extract) format and need to retrieve 4 fields convert those as csv and process the csv with all the logstash filters already set.

1 - Extract fields from a json multiline

2 - transform those extract as csv.

"_id": "AWUbrXRdjZ-987654akh",
"_index": "index-2018-08-09",
"_score": 0.0,
"_source": {
"datetime_log": "2018-08-09T00:34:36.051+02:00",
"datetime_receive": "2018-08-09T00:34:36.051+02:00",
"group": "DEFAUT",
"ip_host": "22.33.44.55",
"ip_host_pkt": "22.33.44.55",
"source_msg": "22.33.44.55: -Trashback= XXXXXXXXX XXXXXXXXXX XXXXXXXXX 8002DAD8 YYYYYYYY ZZZZZZZZZ UUUUUUUU IIIIIIIIII",
"unix_level": "local7",
"unix_priority": "crit"
},
"_type": "logs_2"
},

Hi @moughrom

What is "(elastic extract) format"? i.e. what request or technology are you using to extract the JSON?

this is the json format extracted from elasticsearch.

"_id": "AWUbrXRdjZ-987654akh",
"_index": "index-2018-08-09",
"_score": 0.0,
"_source": {
"datetime_log": "2018-08-09T00:34:36.051+02:00",
"datetime_receive": "2018-08-09T00:34:36.051+02:00",
"group": "DEFAUT",
"ip_host": "22.33.44.55",
"ip_host_pkt": "22.33.44.55",
"source_msg": "22.33.44.55: -Trashback= XXXXXXXXX XXXXXXXXXX XXXXXXXXX 8002DAD8 YYYYYYYY ZZZZZZZZZ UUUUUUUU IIIIIIIIII",
"unix_level": "local7",
"unix_priority": "crit"
},
"_type": "logs_2"
},

but only interested with 4 fields

"_id": "AWUbrXRdjZ-987654akh",
"_index": "index-2018-08-09",
"_score": 0.0,
"_source": {
"datetime_log": "2018-08-09T00:34:36.051+02:00",
"datetime_receive": "2018-08-09T00:34:36.051+02:00",
"group": "DEFAUT",
"ip_host": "22.33.44.55",
"ip_host_pkt": "22.33.44.55",
"source_msg": "22.33.44.55: -Trashback= XXXXXXXXX XXXXXXXXXX XXXXXXXXX 8002DAD8 YYYYYYYY ZZZZZZZZZ UUUUUUUU IIIIIIIIII",
"unix_level": "local7",
"unix_priority": "crit"
},
"_type": "logs_2"
},

Thanks @moughrom my question is how is the JSON format extracted from Elasticsearch? I understand curl is involved. I do not understand what the request is and this makes a difference as there are many options. Please could you give an example of the request (with the auth token redacted). Thanks

two curl command.

first one to get an authentification token

and the second one to extract data.

Hi @moughrom please could you give an example of the curl command used to extract data, this makes a difference, thankyou

this is the one used to retrieve data.

curl -k -silent -H "Authorization: Bearer $token" -X GET https://mydomain/api/v1/proxy/myindex-2018-06-06/logs_unix/_search -d'{"from" : 0, "size" : 3000}'

OK that makes sense. Are the curl commands issued by a bash script &/or is there any other scripted processing before the raw elasticsearch query output is read by logstash?

It will be executed at the end by a bash script and it has been executed as a script.

it is the only script run to retrieve a json log files.

Hey Dominic,

here are the answers to your questions.

Are the curl commands issued by a bash script >>> YES

is there any other scripted processing before the raw elasticsearch query output is read by logstash>>> NO

Hi @moughrom

That's great. Logstash has codecs to deal with multiline messages and with JSON, however, chaining codecs is not supported (the reasons are documented in Github). As you are using bash script this gives us an opportunity to process the extracted JSON, which contains multiple documents as well as metadata for the query, into a file containing newline-delimited JSON with one line per document.

curl -X GET http://0.0.0.0:9200/index-2018-08-09/_search | sed -e "s/^.*_source\"://" | grep -v "}]}}" | awk '/^}/ {print (NR==1?"":RS)$0;next} {printf "%s",$0}' | grep -v "^}$" | sed -e "s/$/\}/" > /Users/Shared/logs/test.json

You can add your additional curl parameters and Elasticsearch instance.

Then, you can use Logstash to ingest this using the JSON codec. Use the JSON not json_line.

input {
  # Read all documents from Elasticsearch matching the given query
  file {
    path => "/Users/Shared/logs/*.json"
    codec => json
  }
}

Logstash filters can then rename, remove etc. You can add any extra filters you already have here.

filter {
  mutate {
    rename => {"datetime_receive" => "date"}
    rename => {"datetime_log" => "datetime_log"}
    rename => {"group" => "group"}
    rename => {"ip_host_pkt" => "host"}
    rename => {"source_msg" => "message"}
    remove_field => ["unix_priority","@version","tags","@timestamp"]
  }
}

note that some fields, e.g. datetime_log appear to be renamed to themselves, this is to prevent them getting stored in a _source subfield.

Note that the _type field can no longer be modified from 6.0 onwards so I have left that as is.

The full code is in this GIST https://gist.github.com/djp-search/52175df2262072a41252d7761b8e72bc

Hope this helps!

Hey Dominic,

I am only getting one syslog message in the json output file, I have change > to >> to concatanate, but still getting one msg at max I am getting 3 msg when I use -d '{"from":"zero","size":3000}'

curl -X GET http://0.0.0.0:9200/index-2018-08-09/_search | sed -e "s/^.*_source"://" | grep -v "}]}}" | awk '/^}/ {print (NR==1?"":RS)$0;next} {printf "%s",$0}' | grep -v "^}$" | sed -e "s/$/}/" >> /Users/Shared/logs/test.json

please advice.

here it is the query:

curl -H "Authorization: Bearer $token" -X GET https://localhost/api/v1/proxy/index-2018-06-11/logs_unix/_search -d'{"size":1000}' --insecure | sed -e "s/^.*_source"://" | grep -v "}]}}" | awk '/^}/ {print (NR==1?"":RS)$0;next} {printf "%s",$0}' | grep -v "^}$" | sed -e "s/$/}/" > /data/log.json

and the result, only one line:

{"datetime_log":"2018-06-11T02:55:19.933+02:00","datetime_receive":"2018-06-11T02:55:19.933+02:00","group":"DEFAUT","ip_host":"192.168.33.12","ip_host_pkt":"192.168.33.12","source_msg":"mdd-04935DDDD1 : Jun 11 02:55:19 #NOTIFY# IF: GigabitEthernet 0/2 is DOWN","unix_level":"local12","unix_priority":"notice"},"_type":"logs_unix"}],"max_score":0.0,"total":200603},"timed_out":false,"took":101328}}

Hi @moughrom

I don't have the resources to fully replicate your process offline.

Here are some basic pointers:

The simplest way to "fix" Elasticsearch data is inside the cluster using painless. You could do this using appropriate requests and the AUTH tokens, with the limitation that you would not have the added value of the logstash geoip lookup. However, all the other field operations in your requirements could be performed simply and reliably inside the cluster without extracting the data.

If you do need to pull the data out, you may need to do some parsing before resubmitting to Elasticsearch. I would choose the technology that you are most comfortable with to do this.

From your replies above ^^ , it looks like AWK is not matching your pattern. You could fix this by looking more into AWK, however, I would contra-indicate the append operation ">>" as you don't want to write the same document multiple times to the file.

The reason I suggested AWK was to avoid adding more technology, however, there are many options of third-party open-source scripting languages that you could use to process the data to a format that is ready for ingest via logstash. I would suggest to go with whichever you are most experienced in.