Best practice for adding additional fields to transform

Hi community,

I've created a custom transform job to extract Fortigate VPN events into custom index to get start / stop time. Sharing it below for reference. This is extracting data from fortigate integration so we already have the geo fields in the source data.

I'm wondering that is the best practice for adding geoip fields to the data for client.ip?

Is it recommended to add a new custom ingest pipeline that uses geoip processor on the client.ip field?

PUT _transform/fortivpntunnels/_update
{
  "source": {
    "index": [
      "logs-*"
    ],
    "query": {
      "bool": {
        "filter": [
          {
            "bool": {
              "filter": [
                {
                  "bool": {
                    "should": [
                      {
                        "bool": {
                          "should": [
                            {
                              "term": {
                                "fortinet.firewall.action": {
                                  "value": "tunnel-down"
                                }
                              }
                            }
                          ],
                          "minimum_should_match": 1
                        }
                      },
                      {
                        "bool": {
                          "should": [
                            {
                              "term": {
                                "fortinet.firewall.action": {
                                  "value": "tunnel-up"
                                }
                              }
                            }
                          ],
                          "minimum_should_match": 1
                        }
                      },
                      {
                        "bool": {
                          "should": [
                            {
                              "term": {
                                "fortinet.firewall.action": {
                                  "value": "tunnel-stats"
                                }
                              }
                            }
                          ],
                          "minimum_should_match": 1
                        }
                      }
                    ],
                    "minimum_should_match": 1
                  }
                },
                {
                  "bool": {
                    "should": [
                      {
                        "exists": {
                          "field": "fortinet.firewall.tunnelip"
                        }
                      }
                    ],
                    "minimum_should_match": 1
                  }
                },
                {
                  "bool": {
                    "should": [
                      {
                        "exists": {
                          "field": "fortinet.firewall.xauthuser"
                        }
                      }
                    ],
                    "minimum_should_match": 1
                  }
                }
              ]
            }
          }
        ]
      }
    }
  },
  "pivot": {
    "group_by": {
      "observer.name": {
        "terms": {
          "field": "observer.name"
        }
      },
      "tunnel.id": {
        "terms": {
          "field": "fortinet.firewall.tunnelid"
        }
      },
      "tunnel.ip": {
        "terms": {
          "field": "fortinet.firewall.tunnelip"
        }
      },
      "client.ip": {
        "terms": {
          "field": "destination.ip"
        }
      },
      "username": {
        "terms": {
          "field": "fortinet.firewall.xauthuser"
        }
      }
    },
    "aggregations": {
      "tunnel.last.time": {
        "max": {
          "field": "@timestamp"
        }
      },
      "tunnel.first.time": {
        "min": {
          "field": "@timestamp"
        }
      }
    }
  },
  "description": "Transformation process that aggregates information about the start and end of VPN connections based on Fortinet events into an index",
  "frequency": "20s",
  "dest": {
    "index": "fortinet-vpn-tunnels"
  },
  "sync": {
    "time": {
      "delay": "10s",
      "field": "@timestamp"
    }
  },
  "retention_policy": {
    "time": {
      "field": "tunnel.first.time",
      "max_age": "180d"
    }
  }
}

Hello,
It might be worth it to check if any pipelines were created when you added the Fortigate integration that already geo enriches client.ip during the original ingest. You can check with GET _ingest/pipeline/*fortinet* or something similar.
If not then I'd recommend your own pipeline:

PUT _ingest/pipeline/add-client-geo
{
  "description": "Add GeoIP data to client.ip",
  "processors": [
    {
      "geoip": {
        "field": "client.ip",
        "target_field": "client.geo",
        "ignore_missing": true
      }
    }
  ]
}

Then add it to your transform:

PUT _transform/fortivpntunnels/_update
{
  "dest": {
    "index": "fortinet-vpn-tunnels",
    "pipeline": "add-client-geo"
  }
}

Thanks for quick reply!