Need help to mask XML elements

Hello,

I'm actually stuck to perform the masking! I would request you to help in this regard
I need to perform masking of some sensitive data(like card number, account number, etc) from the input XML message.

I am currently fetching absolute XPATHs, start, and end positions from the Database using the JDBC_static filter for the list of the elements to be masked. I need to apply the masking logic for the field XPath mentioned in the database.

example:
my sample payload without masking is

<?xml version="1.0" encoding="utf-8"?><soapenv:Envelope xmlns:soapenv="http://schemas.xmlsoap.org/soap/envelope/" xmlns="http://www.examples.com/wsdl/HelloService.wsdl"> <soapenv:Body> <firstName>Accountholders name</firstName><AccountNumber>12123123123</AccountNumber> </soapenv:Body></soapenv:Envelope>

Let's assume the following we have received 2 records from DB for masking

1st record:
XPATH: :Envelope/:Body/:firstName
StartPosition: 3
end Position: 6
1st record:
XPATH: :Envelope/:Body/
:AccountNumber
StartPosition: 3
end Position: 4

the output I supposed to get is

<?xml version="1.0" encoding="utf-8"?><soapenv:Envelope xmlns:soapenv="http://schemas.xmlsoap.org/soap/envelope/" xmlns:test="http://www.examples.com/wsdl/HelloService.wsdl"><soapenv:Body> <test:firstName>AccXXXXXXlder's name</test:firstName><test:AccountNumber>121XXXX3123</test:AccountNumber> </soapenv:Body></soapenv:Envelope>

Appreciated it if there is any quick help or suggestion. I'm trying to perform this in logstash.

Please find my sample logstash conf file for more information:

input {
beats {
port => 5044
}

}

filter {

 if "Event" in [message]{
  grok {	
	#match => {"message" => "^%{TIMESTAMP_ISO8601:timestamp}\: \[%{GREEDYDATA:thread}\] \(%{GREEDYDATA:msg}\) (?<bip_msg>[(BIP)([0-9]{3})([A-Z]{1}]+)\: %{DATA:event_msg} \: \'1\' \'(?<rfh2>.*(</usr>))\' \'%{GREEDYDATA:message}"}
	match => {"message" => "^%{TIMESTAMP_ISO8601:timestamp}\: \[%{GREEDYDATA:thread}\] \(%{GREEDYDATA:msg}\) (?<bip_msg>[(BIP)([0-9]{3})([A-Z]{1}]+)\: %{DATA:event_msg} \: %{GREEDYDATA:message}"}
	overwrite => [ "message" ]
   } 
   dissect {
     mapping => { 'message' =>  "'%{param1}' '%{rfh2}' '%{message}' '%{param4}' '%{esql_module}' '%{param5}' '%{param6}' '%{param7}' '%{param8}' '%{param9}'"}
	 }
  
	if "<usr>" in [rfh2]{
	xml {
		source => "rfh2"
		store_xml => true
		target => "parsed_rfh2"
	  }
	}
	
	 jdbc_static {
		loaders => [ 
		  {
			id => "DB_MW_MASKKEYFIELDS_REFERENCE"
			query => "select SEARCH_KEY1,SEARCH_KEY2,SEARCH_KEY3,LOGPOINT,MASK_FIELDXPATH,START_POINT,END_POINT,REL_FIELDXPATH,MASKING_REQ from db2inst1.MW_MASKKEYFIELDS_REFERENCE order by SEARCH_KEY1"
			local_table => "masking"
		  }
		]
		local_db_objects => [ 
		  {
			name => "masking"
			columns => [
			  ["SEARCH_KEY1","VARCHAR(50)"],
              ["SEARCH_KEY2","VARCHAR(50)"],
              ["SEARCH_KEY3","VARCHAR(50)"],
              ["LOGPOINT","VARCHAR(50)"],
              ["MASK_FIELDXPATH","VARCHAR(200)"],
              ["START_POINT","INTEGER"],
              ["END_POINT","INTEGER"],
              ["REL_FIELDXPATH","VARCHAR(200)"],
              ["MASKING_REQ","VARCHAR(3)"]
			]
		  }
		]
		local_lookups => [ 
		  {
			id => "LOCAL_MW_MASKKEYFIELDS_REFERENCE"
			query => "select SEARCH_KEY1,SEARCH_KEY2,SEARCH_KEY3,LOGPOINT,MASK_FIELDXPATH,START_POINT,END_POINT,REL_FIELDXPATH,MASKING_REQ FROM masking WHERE SEARCH_KEY1 = :SK1 AND (SEARCH_KEY2 = :SK2 OR SEARCH_KEY2 IS NULL) AND (SEARCH_KEY3 = :SK3 OR SEARCH_KEY3 IS NULL)"
			parameters => {SK1 => "%{[parsed_rfh2][key1]}" SK2 => "%{[parsed_rfh2][key2]}" SK3 => "%{[parsed_rfh2][key3]}"}
			target => "masking"
		  }
		]
		# using add_field here to add & rename values to the event root
		add_field => { mask_fieldxpath => "%{[masking][0][mask_fieldxpath]}"}
		staging_directory => "/tmp/logstash/jdbc_static/import_data"
		loader_schedule => "* */2 * * *" # run loaders every 2 hours
		jdbc_user => "db2inst1"
		jdbc_password => "password"
		jdbc_driver_class => "com.ibm.db2.jcc.DB2Driver"
		jdbc_driver_library => "/usr/share/logstash/config/jars/db2jcc.jar"
		jdbc_connection_string => "jdbc:db2://db2server:50000/TESTDB:retrieveMessagesFromServerOnGetMessage=true;"
	  }
	  xml {
		source => "message"
		store_xml => true
		target => "pared_message"
		remove_namespaces => true
		
		#xpath => ["%{[mask_fieldxpath][0]}","m_mask_fieldxpath"]
		xpath => **["/SOAP_Domain_Msg/Body/firstName/text()",**"m_mask_fieldxpath"]*This needs to be dynamic XPATH. I'd hard coded for now just to see the behaviour*

		}
	  mutate{
	  gsub => [ "m_mask_fieldxpath", "1", "6" ]

	}
   }   
   else {
   grok {
   match => {"message" => "^%{TIMESTAMP_ISO8601:timestamp}\: \[%{GREEDYDATA:thread}\] \(%{GREEDYDATA:msg}\) (?<bip_msg>[(BIP)([0-9]{3})([A-Z]{1}]+)\: %{GREEDYDATA:message}"
	}
	overwrite => [ "message" ]
	}
	}

}

output {
elasticsearch {
hosts => ["http://elasticsearch:9200"]
user => "elastic"
password => "XXXXXXX"
index => "logstash-%{+YYYY.MM.dd}"
}
}

I cannot think of a way to do the xpath dynamically. The following code solves a different problem, but might give you some ideas.

Assuming you have

"message" => "<?xml version=\"1.0\" encoding=\"utf-8\"?><soapenv:Envelope xmlns:soapenv=\"http://schemas.xmlsoap.org/soap/envelope/\" xmlns:test=\"http://www.examples.com/wsdl/HelloService.wsdl\"><soapenv:Body> <test:firstName>AccountHolder's name</test:firstName><test:AccountNumber>12198763123</test:AccountNumber> </soapenv:Body></soapenv:Envelope>",

Then

    xml {
        source => "message"
        store_xml => true
        force_array => false
        target => "parsed_message"
        remove_namespaces => true
    }
    mutate { add_field => { "m" => '[ { "field": "firstName", "startPosition": 4, "endPosition": 8},
            { "field": "AccountNumber", "startPosition": 2, "endPosition": 4} ]' } }
    json { source => "m" target => "[@metadata][maskings]" remove_field => [ "m" ] }
    ruby {
        code => '
            maskings = event.get("[@metadata][maskings]")
            maskings.each { |x|
                fieldName = x["field"]
                field = event.get("[parsed_message][Body][#{fieldName}]")
                s = x["startPosition"]
                e = x["endPosition"]
                l = e - s
                field[s..e] = "X" * l
                event.set("[parsed_message][Body][#{fieldName}]", field)
            }
        '
    }

Will result in

"parsed_message" => {
    ...
             "Body" => {
            "firstName" => "AccoXXXXlder's name",
        "AccountNumber" => "12XX763123"
    }
}

Given that ruby arrays start at [0] you may want

s = x["startPosition"] - 1

instead of

s = x["startPosition"]

Thanks for providing the help, Badger.

I need one more help. post masking we need the whole XML as a string just like the event message. it would be great if you can suggest some deserializer post masking.

Thanks,

I have not tested it but something like

ruby { code => 'event.set("serialized_message", event.get("parsed_message").to_s)' }

might work.

Thanks for your prompt response.

I have used the same yesterday. It didn't work. I have used "nokogiri" for parsing the request and masking.

Masking is working fine now.

Thank you very much.

1 Like

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.