Dec 4th, 2025: [EN] Let's migrate this expense tool from n8n to Elastic One Workflow

Building a Conversational Expense Assistant with Elastic One Workflow, Gemini, and Telegram

A little while back I stumbled on an excellent walkthrough by Som (published yesterday):
"Build a Conversational Expense Assistant Using Elasticsearch Agent Builder with Telegram, n8n, and Bedrock" (change before publication)

The idea is elegant. Telegram as the chat UI, n8n orchestrating everything, STT, Bedrock for intent and extraction, Elasticsearch + Agent Builder for storage and queries. Super clean. Surprisingly smooth. And honestly? It instantly inspired me. Which doesn't happen often with technical walkthroughs.

But since I spend most of my days inside Elastic's Workflow Engine, I started wondering:

"What if I re-created a simplified version of Som's assistant, but used Elastic One Workflow as the orchestrator instead of n8n?"

Same idea, same "talk to your expenses" magic, but with the entire flow living natively inside Elastic. Which seemed like it should work. And it did, mostly.

One Workflow is still early and doesn't have n8n's full power yet. I made some simplifications: using Gemini for LLM tasks (partly for LLM diversity, partly because I wanted to try it) and polling Telegram instead of webhooks. The polling works fine, though webhooks would be better.

So I built it. It's a compact expense assistant. Handles voice and text. Classifies intent. Indexes structured expenses with semantic embeddings. Uses ES|QL tools for analytics. Replies directly inside Telegram. That's the gist of it.

The main differences from Som's version? Elastic One Workflow replaces n8n, Gemini replaces Bedrock, and I'm using scheduled polling instead of Telegram webhooks. Everything happens inside Elastic, which was the whole point. I wasn't trying to build a 1:1 clone — just wanted to see how the same idea would work when everything lives inside the ELK stack. Though honestly, I'm not sure polling every 15 seconds is the best approach long-term. Webhooks would be cleaner, but One Workflow doesn't support them yet. So polling it is.

It polls Telegram every 15 seconds. Converts voice to text — I used Deepgram, but any STT service works. Uses Gemini to classify intent as either INGEST or QUERY. Then either indexes the expense or runs ES|QL queries via Agent Builder tools. Low confidence? It asks for clarification. The whole thing is pretty straightforward once you get past the setup.


image_placeholder.png

You'll need Elasticsearch with Agent Builder and inference endpoints. GCP/Gemini set up with Vertex AI. A Telegram bot token (get one from @BotFatherBotFather if you don't have it). And an STT provider — I used Deepgram because it was easy to set up, but there are others.


Setting Up the Index

Create the Elasticsearch index first. Nothing fancy here:

PUT /expenses
{
  "mappings": {
    "properties": {
      "@timestamp": { "type": "date" },
      "amount":       { "type": "float" },
      "merchant":     { "type": "keyword" },
      "category":     { "type": "keyword" },
      "payment_method": { "type": "keyword" },
      "raw_transcript": { "type": "text" },
      "user_id": { "type": "keyword" },
      "telegram_chat_id": { "type": "keyword" },
      "semantic_text": {
        "type": "semantic_text",
        "inference_id": "gemini_embeddings"
      }
    }
  }
}


Setting Up Gemini

This part took me longer than expected. GCP setup is tedious. You know the drill: create a project, enable Vertex AI API, create a service account with Vertex AI User role, download the JSON key. Pick a region where Vertex AI is available — I used us-central1 because that's what I had set up already. There might be better regions, I didn't really check.

Then in Kibana, go to Stack Management → Connectors, create a Gemini connector. Name it whatever. Set the API URL to your region's endpoint. Paste your project ID and region. Use gemini-2.5-pro as the model (or whatever version you want, I guess). Paste the entire service account JSON into the credentials field. The whole JSON, not just parts of it. I made that mistake the first time.

For embeddings, you need an inference endpoint. Create it like this:

PUT _inference/text_embedding/gemini_embeddings
{
  "service": "googlevertexai",
  "service_settings": {
    "project_id": "my-gemini-project-12345",
    "location": "us-central1",
    "model_id": "text-embedding-004"
  }
}

Then create the .inference connector in Kibana. Stack Management → Connectors → AI Connector. Set task type to text_embedding, provider to googlevertexai, inference ID to gemini_embeddings — this must match what you used in the index mapping, otherwise it won't work. Paste the same service account JSON in the secrets section. The connector will automatically create/update the inference endpoint when you save. Or at least it should. I had to refresh a couple times.

Important: the inference_id must match what's in your index mapping. The endpoint needs to exist before you can index documents with semantic_text fields. Elasticsearch will auto-generate embeddings when you index. At least that's what the docs say — I had some issues with this initially but it worked eventually.

Example document:

POST /expenses/_doc
{
  "@timestamp": "2025-01-15T10:30:00Z",
  "amount": 250.0,
  "merchant": "cafe",
  "category": "food",
  "payment_method": "credit_card",
  "raw_transcript": "Spent 250 on lunch at the cafe",
  "semantic_text": "Spent 250 on lunch at the cafe",
  "user_id": "user123",
  "telegram_chat_id": "chat456"
}

The semantic_text field auto-generates embeddings. You can check if it worked, though I'm not entirely sure what the output looks like when it fails. I just assumed it worked because queries returned results:

GET /expenses/_search
{
  "_source": {
    "includes": ["*", "_inference_fields"]
  },
  "query": {
    "match_all": {}
  },
  "size": 1
}


Creating the ES|QL Tools

I created two ES|QL tools for Agent Builder. Well, I tried to create more but these two were enough for what I needed. Could probably add more later if needed. First one searches by date range:

POST kbn://api/agent_builder/tools
{
  "id": "search_expenses_by_date",
  "type": "esql",
  "description": "Search expenses within a date range. Returns amount, merchant, category, and payment method.",
  "tags": ["expenses", "analytics"],
  "configuration": {
    "query": "FROM expenses | WHERE @timestamp >= ?start_date AND @timestamp <= ?end_date | WHERE category == ?category | STATS total = SUM(amount) BY category, payment_method | SORT total DESC",
    "params": {
      "start_date": {
        "type": "date",
        "description": "Start date in ISO format (e.g., 2025-01-01)"
      },
      "end_date": {
        "type": "date",
        "description": "End date in ISO format (e.g., 2025-01-31)"
      },
      "category": {
        "type": "keyword",
        "description": "Category filter (optional - use empty string for all categories)",
        "optional": true,
        "defaultValue": ""
      }
    }
  }
}

ES|QL tools use ?param_name syntax. The category parameter is optional — empty string returns all categories. I think. I didn't test all the edge cases. Probably should have, but it worked for my use case.

Second tool does semantic search. This one was more useful than I expected. The semantic search actually works pretty well:

POST kbn://api/agent_builder/tools
{
  "id": "semantic_search_expenses",
  "type": "esql",
  "description": "Semantically search expenses using natural language query. Useful for finding expenses by description, merchant name, or context.",
  "tags": ["expenses", "semantic-search"],
  "configuration": {
    "query": "FROM expenses METADATA _score | WHERE MATCH(semantic_text, ?query) | SORT _score DESC, @timestamp DESC | LIMIT ?limit",
    "params": {
      "query": {
        "type": "text",
        "description": "Natural language search query"
      },
      "limit": {
        "type": "integer",
        "description": "Maximum number of results",
        "optional": true,
        "defaultValue": 10
      }
    }
  }
}

The MATCH function does semantic search on semantic_text fields using the inference endpoint from the mapping. You need METADATA _score to sort by relevance — I learned this the hard way after wondering why my results weren't sorted properly. The error message wasn't super helpful either.


The Workflow

The workflow runs every 15 seconds. Polls Telegram. Processes messages. It's long because it handles voice vs text, intent classification, confidence checking, routing. Probably could be shorter but I didn't optimize it much. Here it is:

name: "Expense Assistant Workflow"
description: "Scheduled workflow that polls Telegram and processes expense messages"
enabled: true

triggers:
  - type: scheduled
    with:
      every: "15s"

consts:
  telegram_bot_token: "<TELEGRAM_BOT_TOKEN>"
  telegram_api_url: "https://api.telegram.org/bot"
  last_update_id: 0  # This will be stored/retrieved from Elasticsearch

steps:
    # Step 1: Poll Telegram for new messages
    - name: poll_telegram
      type: http
      with:
        url: "{{ consts.telegram_api_url }}{{ consts.telegram_bot_token }}/getUpdates"
        method: GET
        body:
          offset: "{{ steps.get_last_update_id.output._source.last_update_id | default: 0 | plus: 1 }}"
      on-failure:
        continue: true  # Skip if there's a conflict, will retry on next run
    
    # Step 2: Check if there are new messages
    - name: check_new_messages
      type: if
      condition: "{{ steps.poll_telegram.output.result }}"
      steps:
        # Step 3: Process each message
        - name: process_messages
          type: foreach
          foreach: "{{ steps.poll_telegram.output.result | json: 2 }}"
          steps:
            # Extract message data from Telegram update
            - name: extract_message_data
              type: console
              with:
                message: "Processing message from user {{ foreach.item.message.from.id | json: 2 }}"
            
            # Check if message has text or voice
            - name: check_message_type
              type: if
              condition: "{{ foreach.item.message.voice != null or foreach.item.message.audio != null }}"
              steps:
                # Voice message - get file and transcribe
                - name: get_voice_file
                  type: http
                  with:
                    url: "{{ consts.telegram_api_url }}{{ consts.telegram_bot_token }}/getFile"
                    method: GET
                    body:
                      file_id: "{{ foreach.item.message.voice.file_id | default: foreach.item.message.audio.file_id }}"
                
                - name: transcribe_voice
                  type: http
                  with:
                    url: "https://api.deepgram.com/v1/listen"
                    method: POST
                    headers:
                      Authorization: "Token YOUR_DEEPGRAM_KEY"
                      Content-Type: "application/json"
                    body:
                      url: "https://api.telegram.org/file/bot{{ consts.telegram_bot_token }}/{{ steps.get_voice_file.output.result.file_path }}"
                  on-failure:
                    fallback:
                      - name: fallback_transcription
                        type: http
                        with:
                          url: "http://localhost:8000/transcribe"
                          method: POST
                          body:
                            audio_url: "https://api.telegram.org/file/bot{{ consts.telegram_bot_token }}/{{ steps.get_voice_file.output.result.file_path }}"
              else:
                # Text message
                - name: use_text_directly
                  type: console
                  with:
                    message: "Processing text message: {{ foreach.item.message.text | json: 2 }}"
            
            # Extract transcript (from voice or text)
            - name: get_transcript
              type: console
              with:
                message: "Transcript: {{ steps.transcribe_voice.output.results?.channels[0]?.alternatives[0]?.transcript or foreach.item.message.text }}"
            
            # Intent Classification
            - name: classify_intent
              type: .gemini
              connector-id: "gemini-expense-assistant-connector-id"
              with:
                subAction: invokeAI
                subActionParams:
                  model: "gemini-2.5-pro"
                  messages:
                    - role: user
                      content: "{{ steps.transcribe_voice.output.results?.channels[0]?.alternatives[0]?.transcript or foreach.item.message.text }}"
                  systemInstruction: |
                    You are an intent classifier for an expense assistant.
                    Classify the user's message as either:
                    - INGEST: User wants to add/record an expense (e.g., "Spent 250 on lunch", "Add dinner for 350")
                    - QUERY: User wants to query/search expenses (e.g., "How much did I spend last week?", "Show my food expenses")
                    
                    Always respond with valid JSON only:
                    {
                      "intent": "INGEST" or "QUERY",
                      "confidence": 0.0-1.0,
                      "reasoning": "brief explanation"
                    }
    
            - name: check_confidence
              type: if
              # TODO: This condition needs to be adjusted based on your workflow structure.
              # The template extracts the confidence, but KQL needs a field name to compare.
              # Consider restructuring to extract the confidence value first, then reference it.
              condition: "confidence < 0.7"
              steps:
                - name: request_clarification
                  type: http
                  with:
                    url: "{{ consts.telegram_api_url }}{{ consts.telegram_bot_token }}/sendMessage"
                    method: POST
                    headers:
                      Content-Type: "application/json"
                    body:
                      chat_id: "{{ foreach.item.message.from.id | json: 2 }}"
                      text: "{{ steps.classify_intent.output.message.reasoning }}. Could you please clarify: Are you trying to add an expense or ask about existing expenses?"
              else:
                - name: route_by_intent
                  type: if
                  # TODO: This condition needs to be adjusted. KQL needs a field name.
                  # Consider restructuring to extract the intent value first, then reference it.
                  condition: "intent: INGEST"
                  steps:
                    # INGEST BRANCH
                    - name: extract_expense_data
                      type: .gemini
                      connector-id: "gemini-expense-assistant-connector-id"
                      with:
                        subAction: invokeAI
                        subActionParams:
                          model: "gemini-2.5-pro"
                          messages:
                            - role: user
                              content: "{{ steps.transcribe_voice.output.results?.channels[0]?.alternatives[0]?.transcript or foreach.item.message.text }}"
                          systemInstruction: |
                            Extract expense information from the user's message.
                            Return valid JSON with:
                            {
                              "amount": number,
                              "merchant": string,
                              "category": string (food, transport, entertainment, etc.),
                              "payment_method": string (credit_card, cash, debit, etc.),
                              "date": string (ISO format, default to today if not specified)
                            }
                    
                    - name: index_expense
                      type: elasticsearch.index
                      with:
                        index: "expenses"
                        document:
                          "@timestamp": "{{ steps.extract_expense_data.output.message.date | default: 'now' }}"
                          amount: "{{ steps.extract_expense_data.output.message.amount }}"
                          merchant: "{{ steps.extract_expense_data.output.message.merchant }}"
                          category: "{{ steps.extract_expense_data.output.message.category }}"
                          payment_method: "{{ steps.extract_expense_data.output.message.payment_method }}"
                          raw_transcript: "{{ steps.transcribe_voice.output.results?.channels[0]?.alternatives[0]?.transcript or foreach.item.message.text }}"
                          semantic_text: "{{ steps.transcribe_voice.output.results?.channels[0]?.alternatives[0]?.transcript or foreach.item.message.text }}"
                          user_id: "{{ foreach.item.message.from.id | json: 1 }}"
                          telegram_chat_id: "{{ foreach.item.message.chat.id | json: 1 }}"
                    
                    - name: send_ingest_response
                      type: http
                      with:
                        url: "{{ consts.telegram_api_url }}{{ consts.telegram_bot_token }}/sendMessage"
                        method: POST
                        headers:
                          Content-Type: "application/json"
                        body:
                              chat_id: "{{ foreach.item.message.from.id | json: 1 }}"
                              text: "✅ Added expense: {{ steps.extract_expense_data.output.message.amount }} at {{ steps.extract_expense_data.output.message.merchant }} ({{ steps.extract_expense_data.output.message.category }})"
                      
                  else:
                    # QUERY BRANCH
                    - name: query_agent
                      type: http
                      with:
                        url: "http://localhost:5601/api/agent_builder/mcp"
                        method: POST
                        headers:
                          Authorization: "ApiKey YOUR_API_KEY"
                          Content-Type: "application/json"
                        body:
                          method: "tools/call"
                          params:
                            name: "semantic_search_expenses"
                            arguments:
                              query: "{{ steps.transcribe_voice.output.results?.channels[0]?.alternatives[0]?.transcript or foreach.item.message.text }}"
                              limit: 10
                    
                    - name: send_query_response
                      type: http
                      with:
                        url: "{{ consts.telegram_api_url }}{{ consts.telegram_bot_token }}/sendMessage"
                        method: POST
                        headers:
                          Content-Type: "application/json"
                        body:
                          chat_id: "{{ foreach.item.message.from.id | json: 2 }}"
                          text: "{{ steps.query_agent.output.content[0].text | default: 'I found your expense information.' }}"
            
            # Step 4: Update last_update_id (store in Elasticsearch for persistence)
            # Update on every iteration - the last one will be the final value
            # This avoids needing array[length-1] syntax which LiquidJS doesn't support
            - name: update_last_update_id
              type: elasticsearch.index
              with:
                index: "telegram-bot-state"
                id: "last_update_id"
                document:
                  last_update_id: "{{ foreach.item.update_id }}"
                  updated_at: "now"


Testing

I sent "Spent 250 on lunch" (should ingest) and "How much did I spend on food last week?" (should query).

And it failed :sweat_smile:. Alas, as I already mentioned - One Workflow is still in very early stage, and not everything going smooth. Luckily, I found most of the bugs already, and eventually workflow is working for me at my local environment. So, I just need a little bit of time to process all issue into the PR and get it merge and hope very soon you will be able to have everything working - I will update this article to keep you posted =)


What I Learned

Re-implementing Som's expense assistant wasn't just a technical experiment. It was a chance to see what happens when orchestration, search, semantics, and AI reasoning all live inside the same platform. And honestly? It felt refreshingly cohesive. Like everything was supposed to work together, which is rare.

Elastic has always been strong at storing and searching data. But watching a single workflow pull messages from Telegram, process voice or text, classify intent with Gemini, enrich and index documents, run ES|QL analytics, and deliver conversational replies — all without leaving the Elastic ecosystem — that was interesting. Elasticsearch is clearly evolving into something more than just a search engine. It's becoming a platform where AI agents and operational workflows can actually blend together without fighting each other.

This assistant is hardly the end state. With the foundation in place, there's a lot you could build on top of it. Spending insights. Anomaly detection ("hey, this charge looks weird…"). Monthly summaries. Conversational dashboards. Proactive notifications. Budgets. Multi-user bots. The usual stuff. All powered by the same engine, which is the part that makes it interesting.

If you're curious about what AI agents look like when paired with real observable data — not just chat completions floating in the cloud — this kind of project is a surprisingly tangible way to explore that. More tangible than I expected, anyway.

Maybe your expenses will finally start answering your questions. Mine haven't yet, but they're getting closer.

2 Likes