Need help to mask XML elements

Hello,

I'm actually stuck to perform the masking! I would request you to help in this regard
I need to perform masking of some sensitive data(like card number, account number, etc) from the input XML message.

I am currently fetching absolute XPATHs, start, and end positions from the Database using the JDBC_static filter for the list of the elements to be masked. I need to apply the masking logic for the field XPath mentioned in the database.

example:
my sample payload without masking is

<?xml version="1.0" encoding="utf-8"?><soapenv:Envelope xmlns:soapenv="http://schemas.xmlsoap.org/soap/envelope/" xmlns="http://www.examples.com/wsdl/HelloService.wsdl"> <soapenv:Body> <firstName>Accountholders name</firstName><AccountNumber>12123123123</AccountNumber> </soapenv:Body></soapenv:Envelope>

Let's assume the following we have received 2 records from DB for masking

1st record:
XPATH: :Envelope/:Body/:firstName
StartPosition: 3
end Position: 6
1st record:
XPATH: :Envelope/:Body/
:AccountNumber
StartPosition: 3
end Position: 4

the output I supposed to get is

<?xml version="1.0" encoding="utf-8"?><soapenv:Envelope xmlns:soapenv="http://schemas.xmlsoap.org/soap/envelope/" xmlns:test="http://www.examples.com/wsdl/HelloService.wsdl"><soapenv:Body> <test:firstName>AccXXXXXXlder's name</test:firstName><test:AccountNumber>121XXXX3123</test:AccountNumber> </soapenv:Body></soapenv:Envelope>

Appreciated it if there is any quick help or suggestion. I'm trying to perform this in logstash.

Please find my sample logstash conf file for more information:

input {
beats {
port => 5044
}

}

filter {

 if "Event" in [message]{
  grok {	
	#match => {"message" => "^%{TIMESTAMP_ISO8601:timestamp}\: \[%{GREEDYDATA:thread}\] \(%{GREEDYDATA:msg}\) (?<bip_msg>[(BIP)([0-9]{3})([A-Z]{1}]+)\: %{DATA:event_msg} \: \'1\' \'(?<rfh2>.*(</usr>))\' \'%{GREEDYDATA:message}"}
	match => {"message" => "^%{TIMESTAMP_ISO8601:timestamp}\: \[%{GREEDYDATA:thread}\] \(%{GREEDYDATA:msg}\) (?<bip_msg>[(BIP)([0-9]{3})([A-Z]{1}]+)\: %{DATA:event_msg} \: %{GREEDYDATA:message}"}
	overwrite => [ "message" ]
   } 
   dissect {
     mapping => { 'message' =>  "'%{param1}' '%{rfh2}' '%{message}' '%{param4}' '%{esql_module}' '%{param5}' '%{param6}' '%{param7}' '%{param8}' '%{param9}'"}
	 }
  
	if "<usr>" in [rfh2]{
	xml {
		source => "rfh2"
		store_xml => true
		target => "parsed_rfh2"
	  }
	}
	
	 jdbc_static {
		loaders => [ 
		  {
			id => "DB_MW_MASKKEYFIELDS_REFERENCE"
			query => "select SEARCH_KEY1,SEARCH_KEY2,SEARCH_KEY3,LOGPOINT,MASK_FIELDXPATH,START_POINT,END_POINT,REL_FIELDXPATH,MASKING_REQ from db2inst1.MW_MASKKEYFIELDS_REFERENCE order by SEARCH_KEY1"
			local_table => "masking"
		  }
		]
		local_db_objects => [ 
		  {
			name => "masking"
			columns => [
			  ["SEARCH_KEY1","VARCHAR(50)"],
              ["SEARCH_KEY2","VARCHAR(50)"],
              ["SEARCH_KEY3","VARCHAR(50)"],
              ["LOGPOINT","VARCHAR(50)"],
              ["MASK_FIELDXPATH","VARCHAR(200)"],
              ["START_POINT","INTEGER"],
              ["END_POINT","INTEGER"],
              ["REL_FIELDXPATH","VARCHAR(200)"],
              ["MASKING_REQ","VARCHAR(3)"]
			]
		  }
		]
		local_lookups => [ 
		  {
			id => "LOCAL_MW_MASKKEYFIELDS_REFERENCE"
			query => "select SEARCH_KEY1,SEARCH_KEY2,SEARCH_KEY3,LOGPOINT,MASK_FIELDXPATH,START_POINT,END_POINT,REL_FIELDXPATH,MASKING_REQ FROM masking WHERE SEARCH_KEY1 = :SK1 AND (SEARCH_KEY2 = :SK2 OR SEARCH_KEY2 IS NULL) AND (SEARCH_KEY3 = :SK3 OR SEARCH_KEY3 IS NULL)"
			parameters => {SK1 => "%{[parsed_rfh2][key1]}" SK2 => "%{[parsed_rfh2][key2]}" SK3 => "%{[parsed_rfh2][key3]}"}
			target => "masking"
		  }
		]
		# using add_field here to add & rename values to the event root
		add_field => { mask_fieldxpath => "%{[masking][0][mask_fieldxpath]}"}
		staging_directory => "/tmp/logstash/jdbc_static/import_data"
		loader_schedule => "* */2 * * *" # run loaders every 2 hours
		jdbc_user => "db2inst1"
		jdbc_password => "password"
		jdbc_driver_class => "com.ibm.db2.jcc.DB2Driver"
		jdbc_driver_library => "/usr/share/logstash/config/jars/db2jcc.jar"
		jdbc_connection_string => "jdbc:db2://db2server:50000/TESTDB:retrieveMessagesFromServerOnGetMessage=true;"
	  }
	  xml {
		source => "message"
		store_xml => true
		target => "pared_message"
		remove_namespaces => true
		
		#xpath => ["%{[mask_fieldxpath][0]}","m_mask_fieldxpath"]
		xpath => **["/SOAP_Domain_Msg/Body/firstName/text()",**"m_mask_fieldxpath"]*This needs to be dynamic XPATH. I'd hard coded for now just to see the behaviour*

		}
	  mutate{
	  gsub => [ "m_mask_fieldxpath", "1", "6" ]

	}
   }   
   else {
   grok {
   match => {"message" => "^%{TIMESTAMP_ISO8601:timestamp}\: \[%{GREEDYDATA:thread}\] \(%{GREEDYDATA:msg}\) (?<bip_msg>[(BIP)([0-9]{3})([A-Z]{1}]+)\: %{GREEDYDATA:message}"
	}
	overwrite => [ "message" ]
	}
	}

}

output {
elasticsearch {
hosts => ["http://elasticsearch:9200"]
user => "elastic"
password => "XXXXXXX"
index => "logstash-%{+YYYY.MM.dd}"
}
}

I cannot think of a way to do the xpath dynamically. The following code solves a different problem, but might give you some ideas.

Assuming you have

"message" => "<?xml version=\"1.0\" encoding=\"utf-8\"?><soapenv:Envelope xmlns:soapenv=\"http://schemas.xmlsoap.org/soap/envelope/\" xmlns:test=\"http://www.examples.com/wsdl/HelloService.wsdl\"><soapenv:Body> <test:firstName>AccountHolder's name</test:firstName><test:AccountNumber>12198763123</test:AccountNumber> </soapenv:Body></soapenv:Envelope>",

Then

    xml {
        source => "message"
        store_xml => true
        force_array => false
        target => "parsed_message"
        remove_namespaces => true
    }
    mutate { add_field => { "m" => '[ { "field": "firstName", "startPosition": 4, "endPosition": 8},
            { "field": "AccountNumber", "startPosition": 2, "endPosition": 4} ]' } }
    json { source => "m" target => "[@metadata][maskings]" remove_field => [ "m" ] }
    ruby {
        code => '
            maskings = event.get("[@metadata][maskings]")
            maskings.each { |x|
                fieldName = x["field"]
                field = event.get("[parsed_message][Body][#{fieldName}]")
                s = x["startPosition"]
                e = x["endPosition"]
                l = e - s
                field[s..e] = "X" * l
                event.set("[parsed_message][Body][#{fieldName}]", field)
            }
        '
    }

Will result in

"parsed_message" => {
    ...
             "Body" => {
            "firstName" => "AccoXXXXlder's name",
        "AccountNumber" => "12XX763123"
    }
}

Given that ruby arrays start at [0] you may want

s = x["startPosition"] - 1

instead of

s = x["startPosition"]

Thanks for providing the help, Badger.

I need one more help. post masking we need the whole XML as a string just like the event message. it would be great if you can suggest some deserializer post masking.

Thanks,

I have not tested it but something like

ruby { code => 'event.set("serialized_message", event.get("parsed_message").to_s)' }

might work.

Thanks for your prompt response.

I have used the same yesterday. It didn't work. I have used "nokogiri" for parsing the request and masking.

Masking is working fine now.

Thank you very much.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.