Logstash filter to process jason array fileds as seperate fileds in elastic indexl

Hello All,

After trying several time,I'm unable to process one column in oracle table that contains json data and I would require every field in that as seperate filed created in elastic index.Could someone guide what am i doing wrong here and even if I get incorrect data then only getting 1 recors only ,not all.

package_id is unique here and I'm getting only 1 record,total 8 records in table.

{"status":"Start","errorData":null,"creationDate":1668596975812,"lastModificationDate":null,"dataInfoProcess":[{"RequestId":"123AA","Counter":1,"size":12,"docId":"ED9","processid":"","lastPart":true}]}

would require in index:
Status,errrordata,requestid.....as seperate fields created.

Above image package_data column data is stored as json and would require in elastic index to come as each seperate fields created.

Template

PUT _index_template/tcs-package-details
{
  "template": {
    "settings": {
      "index": {
        "lifecycle": {
          "name": "tcs-policy",
          "rollover_alias": "tcs-package-details"
        },
        "number_of_shards": "1",
        "number_of_replicas": "0"
      }
    },
    "mappings": {
      "_routing": {
        "required": false
      },
      "numeric_detection": false,
      "dynamic_date_formats": [
        "strict_date_optional_time",
        "yyyy/MM/dd HH:mm:ss Z||yyyy/MM/dd Z"
      ],
      "_source": {
        "excludes": [],
        "includes": [],
        "enabled": true
      },
      "dynamic": true,
      "date_detection": true
    }
  },
  "index_patterns": [
    "tcs-package-details-*"
  ],
  "composed_of": []
}

Logstash config:

input {
  
  jdbc { 
	jdbc_connection_string => "jdbc:oracle:thin:@abc:1821/tcs"
	jdbc_user => "random"
	jdbc_password => "random"
    jdbc_driver_library => "../lib/ojdbc8-11.2.0.1.jar"
    jdbc_driver_class => "Java::oracle.jdbc.driver.OracleDriver"
	jdbc_paging_enabled => true
	last_run_metadata_path => "../config/lastrun-tcs-p-details.yml"
	schedule => "*/25 * * * * *"
	connection_retry_attempts => 5
	connection_retry_attempts_wait_time => 10
  statement=> "select PACKAGE_ID,PACKAGE_DATA, from_tz(CAST (CREATION_DATE AS TIMESTAMP), 'UTC') as CREATION_DATE, from_tz(CAST (LAST_MOD_DATE AS TIMESTAMP), 'UTC') as LAST_MOD_DATE   from tcs_ar2mig_newpack where LAST_MOD_DATE >= SYS_EXTRACT_UTC(:sql_last_value)"
  type => "tcs-package-details"
  }
  
  
}

filter {
  json {source => "package_data"} 
        split {field => "package_data"}
        mutate {remove_field => [ "package_data"]}
}


output {
#stdout { codec => json_lines }
  if [type] == "tcs-package-details"{
	elasticsearch {
		hosts => "abc:9200"
		ilm_pattern => "{now/d}-000001"
		"doc_as_upsert" => true
		ilm_rollover_alias => "tcs-package-details"
		ilm_policy => "tcs-policy"
		"document_id" => "%{package_id}"
		
	}

  }
	
  
}


Kindly guide here,how it can be achieved

Thanx

Hi @PRASHANT_MEHTA

What does the final elasticsearch document look like currently?
Also, what do the Logstash logs have in them when you attempt to ingest this data?

What happens if you remove the lines:

split {field => "package_data"}
mutate {remove_field => [ "package_data"]}

leaving only:

json {source => "package_data"} 

in your filter section.

Hello @eMitch ,

Thanx for your time to look into this.This is how the data comes in elastic index in discover:


I'will test with ur suggessted way as well and try with any other alernatives also and update
with latest outcome.I'd require all these data as sepearte fields in my index.

Thanx

Hello ,

By laeving only: json {source => "package_data"}

In logs I can see only my sql statement printed but in elastic index no data comes.
Note:Oracle column that PACKAGE_DATA is just string which should be converted into individual json fileds.
I suppose if I'm correct then this is wht need to be done: i.e stingyfied json to json

Any suggestion would be helpful.

Thanx

Hello @eMitch ,

Now I'm able to parse data but the json string internally has multiple jason object and I want to parse them as well as seperate fileds.Unable to understand how it can be done and even would require to parse
creation and lastmoddate to utc format to show in index.

This works and below image as output:

filter {
json {source => "package_data"}
split {field => "package_data"}
mutate {remove_field => [ "package_data"]}
}

image

2)Tried this dosent works

filter {
 json {source => "package_data"}
 split {field => "package_data"}
 split {
    field => "[package_data][packagePartDatas]"
  }
  mutate {
    rename => {
      "[packagePartDatas][serviceRequestId]" => "serviceRequestId"
      "[packagePartDatas][partCounter]" => "partCounter"
      "[packagePartDatas][size]" => "partSize"
      "[packagePartDatas][fileId]" => "fileId"
      "[packagePartDatas][packageId]" => "partPackageId"
      "[packagePartDatas][lastPart]" => "lastPart"
    }
  }
 mutate {remove_field => [ "package_data"]}
}

Seperate link for same issue:

Thanx

Hey @PRASHANT_MEHTA, glad it's partially working! Progress is good.
Lets try not to create multiple posts for the same topic if we can help it.

Given the screenshot from this post, it looks as though your data is coming in correctly.

What version of elasticsearch/kibana are you running? Kibana can have a hard time displaying nested JSON objects, but the data shoulds till be searchable and available.

For example, with the new data that you've ingested, what happens if you search on packagePartDatas.packageId?

Can you also provide us with an updated index mapping for this index?

Hello @eMitch ,

Thanx for your time to look into this.

1)Which version: ELK 7.9.1. Most probably with in few months will migrate to latest licensed version of elastic.

  1. What happens if you search on packagePartDatas.packageId ?
    I am able to get different values for this field ,but challnege is I surely needs these to be created seperated field value.Below Image is my Intention.More than searching,I'd require to better display the data in kibana.


A unique packgeId goes in different state :

Sample Data in table column PACKAGE_DATA:

{"status":"READY_PROCESSING",
 "errorD":null,"creationDate":1678258723919,"ModDate":1678283083479,"packageId":"1701","platform":"P36","systemId":"TIS_FULL","userId":"?","connectXmlFileId":"","size":0,"packageKind":"FULL",
"packagePartDatas":
[
{
"serviceRequestId":"ADFDF0645EDB486",
"partCounter":1,
"size":126624,
"fileId":"F65F2308E45A757",
"packageId":"1701",
"lastPart":true
}],
"packageStateHistories":
[
{
"status":"READY_PROCESSING",
"errorD":null,
"creationDate":1678258723920,
"ModDate":1678258723920
},
{"status":"COMBINED","errorD":null,"creationDate":1678259010610,"ModDate":1678259010610},
{"status":"ERROR","errorD":
{"errorCode":"BATCH_10","errorMessage":"cannot be resolved","stackTrace":"None"},
"creationDate":1678259017840,
"ModDate":1678259017840
},
{"status":"COMBINED","errorD":null,"creationDate":1678259751703,"ModDateModDateModDate":1678259751703},
{"status":"INTRANSIT","errorD":null,"creationDate":1678260352291,"ModDate":1678260352291},
{"status":"MERGED","errorD":null,"creationDate":1678260671302,"ModDate":1678260671302},
{"status":"CONVERTED","errorD":null,"creationDate":1678269641370,"ModDate":1678269641370},
{"status":"ERROR","errorD":{"errorCode":"BATCH_FRAMEWORK_1041","errorMessage":"to absolute file path",
"stackTrace":"[cap-unknown]"}

Mapping:

Template

PUT _index_template/tcs-package-details
{
  "template": {
    "settings": {
      "index": {
        "lifecycle": {
          "name": "tcs-policy",
          "rollover_alias": "tcs-package-details"
        },
        "number_of_shards": "1",
        "number_of_replicas": "0"
      }
    },
    "mappings": {
      "_routing": {
        "required": false
      },
      "numeric_detection": false,
      "dynamic_date_formats": [
        "strict_date_optional_time",
        "yyyy/MM/dd HH:mm:ss Z||yyyy/MM/dd Z"
      ],
      "_source": {
        "excludes": [],
        "includes": [],
        "enabled": true
      },
      "dynamic": true,
      "date_detection": true
    }
  },
  "index_patterns": [
    "tcs-package-details-*"
  ],
  "composed_of": []
}

Now after having close look at images you may understand my intention to parse data particularly the way I require.
Any suggestion would be helpful.

Thanx

Ok, I understand more what you're attempting to do here. It looks like this dataset may need to be denormalized another layer. Remember, when elasticsearch indexes objects like this, it flattens the data into same fields, causing the relationships between your packageStateHistorsies.status and packageStateHistories.creationDate to be lost.

For example:

Once indexed, these values are no longer associated together even though the raw-json shows the "relationship" still. This will cause the views you build in kibana to be incorrect.

I think you should try and use a third split filter to split out [package_data][packagePartDatas][packageStateHistories] into their own events as well. This will increase the volume of your data, but the document granularity will be down to the "history event" level. IE: each document will pertain to an event of its history, therefore you will be able to group the documents by packageId and then sort the documents accordingly by each historical event (creationDate and status).

Hello @eMitch

Thanx for your time and simplification of the explanation.
Currently I'm facing challenges to use split filter correctly ,to create own events i.e package_data.packagePartDatas.PackageStateHistories. I tried several ways but unable to parse correctly.
I'd also like creationdate and lastModDate to be converted to correct UTC time format,even that is also I'm unable to parse corrcetly.
Note: package_data is oracle column name and its not part of json string itself.
Kindly guide.

With below filter I'm still geting this:

Filter:

filter {
 if [type] == "tcs-package-details" {
 json {source => "package_data"}
 split {field => "package_data"}
 split {field => "[package_data][packagePartDatas][packageStateHistories]"}
mutate {
    convert => {
      "package_data.packageStateHistories.lastModificationDate" => "integer"
      "package_data.packageStateHistories.creationDate" => "integer"
    }
  }

  date {
    match => ["package_data.packageStateHistories.lastModificationDate", "package_data.packageStateHistories.creationDate"]
    target => ["package_data.packageStateHistories.lastModificationDate", "package_data.packageStateHistories.creationDate"]
    timezone => "UTC"
  }

 mutate {remove_field => [ "package_data"]}
}
}

image

Thanx

Hello @eMitch ,

filter {
json {
source => "package_data"
remove_field => [ "package_data" ]
}
}
filter {
split {
field => "packageStateHistories"
}
split {
field => "packagePartDatas"
}
}

The above works and issue is resolved,tnx @eMitch .@special thanx @leandrojmp

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.