Hello,
I'm actually stuck to perform the masking! I would request you to help in this regard
I need to perform masking of some sensitive data(like card number, account number, etc) from the input XML message.
I am currently fetching absolute XPATHs, start, and end positions from the Database using the JDBC_static filter for the list of the elements to be masked. I need to apply the masking logic for the field XPath mentioned in the database.
example:
my sample payload without masking is
<?xml version="1.0" encoding="utf-8"?><soapenv:Envelope xmlns:soapenv="http://schemas.xmlsoap.org/soap/envelope/" xmlns="http://www.examples.com/wsdl/HelloService.wsdl"> <soapenv:Body> <firstName>Accountholders name</firstName><AccountNumber>12123123123</AccountNumber> </soapenv:Body></soapenv:Envelope>
Let's assume the following we have received 2 records from DB for masking
1st record:
XPATH: :Envelope/:Body/:firstName
StartPosition: 3
end Position: 6
1st record:
XPATH: :Envelope/:Body/:AccountNumber
StartPosition: 3
end Position: 4
the output I supposed to get is
<?xml version="1.0" encoding="utf-8"?><soapenv:Envelope xmlns:soapenv="http://schemas.xmlsoap.org/soap/envelope/" xmlns:test="http://www.examples.com/wsdl/HelloService.wsdl"><soapenv:Body> <test:firstName>AccXXXXXXlder's name</test:firstName><test:AccountNumber>121XXXX3123</test:AccountNumber> </soapenv:Body></soapenv:Envelope>
Appreciated it if there is any quick help or suggestion. I'm trying to perform this in logstash.
Please find my sample logstash conf file for more information:
input {
beats {
port => 5044
}
}
filter {
if "Event" in [message]{
grok {
#match => {"message" => "^%{TIMESTAMP_ISO8601:timestamp}\: \[%{GREEDYDATA:thread}\] \(%{GREEDYDATA:msg}\) (?<bip_msg>[(BIP)([0-9]{3})([A-Z]{1}]+)\: %{DATA:event_msg} \: \'1\' \'(?<rfh2>.*(</usr>))\' \'%{GREEDYDATA:message}"}
match => {"message" => "^%{TIMESTAMP_ISO8601:timestamp}\: \[%{GREEDYDATA:thread}\] \(%{GREEDYDATA:msg}\) (?<bip_msg>[(BIP)([0-9]{3})([A-Z]{1}]+)\: %{DATA:event_msg} \: %{GREEDYDATA:message}"}
overwrite => [ "message" ]
}
dissect {
mapping => { 'message' => "'%{param1}' '%{rfh2}' '%{message}' '%{param4}' '%{esql_module}' '%{param5}' '%{param6}' '%{param7}' '%{param8}' '%{param9}'"}
}
if "<usr>" in [rfh2]{
xml {
source => "rfh2"
store_xml => true
target => "parsed_rfh2"
}
}
jdbc_static {
loaders => [
{
id => "DB_MW_MASKKEYFIELDS_REFERENCE"
query => "select SEARCH_KEY1,SEARCH_KEY2,SEARCH_KEY3,LOGPOINT,MASK_FIELDXPATH,START_POINT,END_POINT,REL_FIELDXPATH,MASKING_REQ from db2inst1.MW_MASKKEYFIELDS_REFERENCE order by SEARCH_KEY1"
local_table => "masking"
}
]
local_db_objects => [
{
name => "masking"
columns => [
["SEARCH_KEY1","VARCHAR(50)"],
["SEARCH_KEY2","VARCHAR(50)"],
["SEARCH_KEY3","VARCHAR(50)"],
["LOGPOINT","VARCHAR(50)"],
["MASK_FIELDXPATH","VARCHAR(200)"],
["START_POINT","INTEGER"],
["END_POINT","INTEGER"],
["REL_FIELDXPATH","VARCHAR(200)"],
["MASKING_REQ","VARCHAR(3)"]
]
}
]
local_lookups => [
{
id => "LOCAL_MW_MASKKEYFIELDS_REFERENCE"
query => "select SEARCH_KEY1,SEARCH_KEY2,SEARCH_KEY3,LOGPOINT,MASK_FIELDXPATH,START_POINT,END_POINT,REL_FIELDXPATH,MASKING_REQ FROM masking WHERE SEARCH_KEY1 = :SK1 AND (SEARCH_KEY2 = :SK2 OR SEARCH_KEY2 IS NULL) AND (SEARCH_KEY3 = :SK3 OR SEARCH_KEY3 IS NULL)"
parameters => {SK1 => "%{[parsed_rfh2][key1]}" SK2 => "%{[parsed_rfh2][key2]}" SK3 => "%{[parsed_rfh2][key3]}"}
target => "masking"
}
]
# using add_field here to add & rename values to the event root
add_field => { mask_fieldxpath => "%{[masking][0][mask_fieldxpath]}"}
staging_directory => "/tmp/logstash/jdbc_static/import_data"
loader_schedule => "* */2 * * *" # run loaders every 2 hours
jdbc_user => "db2inst1"
jdbc_password => "password"
jdbc_driver_class => "com.ibm.db2.jcc.DB2Driver"
jdbc_driver_library => "/usr/share/logstash/config/jars/db2jcc.jar"
jdbc_connection_string => "jdbc:db2://db2server:50000/TESTDB:retrieveMessagesFromServerOnGetMessage=true;"
}
xml {
source => "message"
store_xml => true
target => "pared_message"
remove_namespaces => true
#xpath => ["%{[mask_fieldxpath][0]}","m_mask_fieldxpath"]
xpath => **["/SOAP_Domain_Msg/Body/firstName/text()",**"m_mask_fieldxpath"]*This needs to be dynamic XPATH. I'd hard coded for now just to see the behaviour*
}
mutate{
gsub => [ "m_mask_fieldxpath", "1", "6" ]
}
}
else {
grok {
match => {"message" => "^%{TIMESTAMP_ISO8601:timestamp}\: \[%{GREEDYDATA:thread}\] \(%{GREEDYDATA:msg}\) (?<bip_msg>[(BIP)([0-9]{3})([A-Z]{1}]+)\: %{GREEDYDATA:message}"
}
overwrite => [ "message" ]
}
}
}
output {
elasticsearch {
hosts => ["http://elasticsearch:9200"]
user => "elastic"
password => "XXXXXXX"
index => "logstash-%{+YYYY.MM.dd}"
}
}