Can you denormalize kafka topics in logstash before indexing to ES

My question is if logstash is capable of denormalizing kafka topics

Can you provide an example of what you are looking to achieve? It is IMO not very clear from your current description.

@Christian_Dahlqvist i have a topic under the name of transactions contains the following

> {"schema":{"type":"struct","fields":[{"type":"string","optional":true,"field":"sender"},
{"type":"string","optional":true,"field":"receiver"},
{"type":"string","optional":true,"field":"receiverWalletId"},
{"type":"string","optional":true,"field":"status"},
{"type":"string","optional":true,"field":"type"},
{"type":"int32","optional":true,"field":"amount"}, 
{"type":"int32","optional":true,"field":"totalFee"}, 
{"type":"int64","optional":true,"field":"createdAt"}, 
{"type":"int64","optional":true,"field":"updatedAt"}, 
{"type":"int32","optional":true,"field":"__v"}, 
{"type":"string","optional":true,"field":"from"}, 
{"type":"string","optional":true,"field":"orderId"}, 
{"type":"string","optional":true,"field":"id"}],"optional":false,"name":"mongo_conn.digi.t
     ransactions"},"payload":
> {"sender":"5cef970ca2e9c273c646c483","receiver":"5cef970ca2e9c273c646c483","re
> ceiverWalletId":"5cef970ca2e9c273c646c484","status":"pending_hyperledger_fabric","
>  type":"topup","amount":100,"totalFee":0,"createdAt":1561548725656,"updatedAt":15
>     61548726855,"__v":0,"from":"smt","orderId":"23e99026-f915-7d20-a579-0ba300a43714","id":"5d1357b57e7cc0353dc6a233"}}

notice that everything i need is under "payload" so my fields are [payload][sender] , [payload][reciever] , on and on, i have a other topic under the name of users, i did not push the topic yet, facing a problem with it, but i can assure you it's the same format as transactions and under payload i have [payload][userName],[payload][lastname], etc.. i want to be able to denormalize my topics into one single topic that have the [payload][username] and lastaname nested in transactions.[payload][sender] and [payload][reciever]

@Christian_Dahlqvist is that clear enough ?

OK. So the events coming into Logstash looks like this:

{
	"schema": {
		"type": "struct",
		"fields": [{
				"type": "string",
				"optional": true,
				"field": "sender"
			},
			{
				"type": "string",
				"optional": true,
				"field": "receiver"
			},
			{
				"type": "string",
				"optional": true,
				"field": "receiverWalletId"
			},
			{
				"type": "string",
				"optional": true,
				"field": "status"
			},
			{
				"type": "string",
				"optional": true,
				"field": "type"
			},
			{
				"type": "int32",
				"optional": true,
				"field": "amount"
			},
			{
				"type": "int32",
				"optional": true,
				"field": "totalFee"
			},
			{
				"type": "int64",
				"optional": true,
				"field": "createdAt"
			},
			{
				"type": "int64",
				"optional": true,
				"field": "updatedAt"
			},
			{
				"type": "int32",
				"optional": true,
				"field": "__v"
			},
			{
				"type": "string",
				"optional": true,
				"field": "from"
			},
			{
				"type": "string",
				"optional": true,
				"field": "orderId"
			},
			{
				"type": "string",
				"optional": true,
				"field": "id"
			}
		],
		"optional": false,
		"name": "mongo_conn.digi.transactions "
	},
	"payload ": {
		"sender": "5cef970ca2e9c273c646c483",
		"receiver": "5cef970ca2e9c273c646c483",
		"receiverWalletId": "5cef970ca2e9c273c646c484",
		"status": "pending_hyperledger_fabric",
		"type": "topup",
		"amount": 100,
		"totalFee": 0,
		"createdAt": 1561548725656,
		"updatedAt": 1561548726855,
		"__v": 0,
		"from": "smt",
		"orderId": "23e99026-f915-7d20-a579-0ba300a43714",
		"id": "5d1357b57e7cc0353dc6a233"
	}
}

What is the expected output for this event?

1 Like

Alright so the other event coming to logstash is users

{"schema":{"type":"struct","fields":[{"type":"string","optional":true,"field":"role"},{"type":"string","optional":true,"field":"status"},{"type":"boolean","optional":true,"field":"isPhoneVerified"},{"type":"array","items":{"type":"string","optional":true},"optional":true,"field":"personalDocuments"},{"type":"string","optional":true,"field":"email"},{"type":"string","optional":true,"field":"firstName"},{"type":"string","optional":true,"field":"lastName"},{"type":"string","optional":true,"field":"phoneNumber"},{"type":"string","optional":true,"field":"password"},{"type":"string","optional":false,"field":"created_at"},{"type":"string","optional":false,"field":"updated_at"},{"type":"int32","optional":true,"field":"__v"},{"type":"string","optional":true,"field":"verifyPhoneCode"},{"type":"string","optional":true,"field":"pushToken"},{"type":"string","optional":true,"field":"id"}],"optional":false,"name":"mongo_conn.digi.users"},"payload":{"role":"client","status":"pending","isPhoneVerified":true,"personalDocuments":,"email":"siabdfatteh@gmail.com","firstName":"siabdelfattehupdated","lastName":"test","phoneNumber":"*******","password":"$2a$10$PC0u2lhm.6t7zxgwXmKYeCJv/3uqzz6.mlO7wzJdMZfIvQeoa/Nq","created_at":"2019-05-29 14:59:43 +0000","updated_at":"2019-05-29 15:14:26 +0000","__v":0,"verifyPhoneCode":null,"pushToken":"d-Ro_TQAjDI:APljlGykedMFVCmoBiNlbYU9Mh1o-1FcS7mSpqHxOw6A1V_6preRBK_vfl4PDWMN666asCtBgIX-_SEj53DT2YRyzSR0b7xk_K9WGmBqKZywmQ-jvlLi89l3lUE1LC0rxGZRluY47XZ","id":"5d1d19a4e690300f31780c7b"}}

i want to embeed the users [payload] inside the transactions [payload][sender]

Are you saying that you want to do a lookup of [payload][sender] from a transaction message and replace it with the [payload] from a user message that contains a matching [payload][id]? It is still really unclear what you are trying to do.

How are you linking these two events?

yes that is exactly what i want to do i want to link the reciever and sender id's in transactions with the users id to have all the info of that user under the reciever or sender in transactions, so i can be able to analyze transactions per age, or the user name.

That is a very difficult problem if it is unconstrained. Can the same sender or receiver be found in multiple transactions? How big is the set of senders and receivers? Can you process all the senders and receivers before going through the transactions?

Ideally you would process the senders and receivers into some sort of database (elasticsearch, or memcached, or an http service, or something that supports jdbc) then use a filter to do lookups when processing transactions.

1 Like

yes every transaction contains a sender and a reciever, with their id's, and i have a other index on elasticsearch contains all the senders and recievers info with the same id

OK, so you can do the lookups with an elasticsearch filter.

Please bear with me i'm new to this, i want to make a visualization (chart) on kibana for transactions per certain users info like age for example where one is already in a index on elasticsearch and the other on a different index, in normal sql cases we join these tables per id, i want to do the same thing but the only possible way to do that is by denormalizing my topics and make them all in one index.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.