Fleet apply pipelines

Hello,

I have two questions about ingest pipelines on Fleet.

I want to enrich a field in an existent integration (IIS)

  1. I know its not recommended to directly edit ingest pipelines on the integrations but the advanced settings only allows me to interact with the data before the pipeline is applied, I would have to generate the field from the message twice, I prefer to just edit the index pipeline to add my processors. Is there a way to use the advanced settings after the pipeline? that way I can use the extracted field.

  2. How can I apply this change in the pipeline to my current Fleet logs without having to manually applying a reindex operation?

Thanks!!

There's kind of a tricky way to do what I think you might want to do.

If what you want to add to the ingest pipeline can be run at the end in other words it's additive to the existing pipeline you can actually define the template with a setting called final_pipeline and that pipeline will always be called on your index after the module pipeline but before before the document is indexed

index.final_pipeline

It's part of the index settings here

Perhaps that would work

Thanks @stephenb I noticed the final_pipeline setting but it has already a value and I dont want to break the system.

This is what I see in the logs-iis.error index template (I'm using Elastic Cloud)

"final_pipeline": ".fleet_final_pipeline-1"

Should I set final_pipeline somewhere else? or just replace that one?

Seems like replacing the fleet final pipeline it is not an option, is doing a lot of stuff

{
  ".fleet_final_pipeline-1" : {
    "version" : 1,
    "description" : """Final pipeline for processing all incoming Fleet Agent documents.
""",
    "processors" : [
      {
        "set" : {
          "description" : "Add time when event was ingested.",
          "field" : "event.ingested",
          "copy_from" : "_ingest.timestamp"
        }
      },
      {
        "script" : {
          "description" : "Remove sub-seconds from event.ingested to improve storage efficiency.",
          "tag" : "truncate-subseconds-event-ingested",
          "source" : "ctx.event.ingested = ctx.event.ingested.withNano(0).format(DateTimeFormatter.ISO_OFFSET_DATE_TIME);",
          "ignore_failure" : true
        }
      },
      {
        "remove" : {
          "description" : "Remove any pre-existing untrusted values.",
          "field" : [
            "event.agent_id_status",
            "_security"
          ],
          "ignore_missing" : true
        }
      },
      {
        "set_security_user" : {
          "field" : "_security",
          "properties" : [
            "authentication_type",
            "username",
            "realm",
            "api_key"
          ]
        }
      },
      {
        "script" : {
          "description" : """Add event.agent_id_status based on the API key metadata and the agent.id contained in the event.
""",
          "tag" : "agent-id-status",
          "source" : """boolean is_user_trusted(def ctx, def users) {
  if (ctx?._security?.username == null) {
    return false;
  }

  def user = null;
  for (def item : users) {
    if (item?.username == ctx._security.username) {
      user = item;
      break;
    }
  }

  if (user == null || user?.realm == null || ctx?._security?.realm?.name == null) {
    return false;
  }

  if (ctx._security.realm.name != user.realm) {
    return false;
  }

  return true;
}

String verified(def ctx, def params) {
  // No agent.id field to validate.
  if (ctx?.agent?.id == null) {
    return "missing";
  }

  // Check auth metadata from API key.
  if (ctx?._security?.authentication_type == null
      // Agents only use API keys.
      || ctx._security.authentication_type != 'API_KEY'
      // Verify the API key owner before trusting any metadata it contains.
      || !is_user_trusted(ctx, params.trusted_users)
      // Verify the API key has metadata indicating the assigned agent ID.
      || ctx?._security?.api_key?.metadata?.agent_id == null) {
    return "auth_metadata_missing";
  }

  // The API key can only be used represent the agent.id it was issued to.
  if (ctx._security.api_key.metadata.agent_id != ctx.agent.id) {
    // Potential masquerade attempt.
    return "mismatch";
  }

  return "verified";
}

if (ctx?.event == null) {
  ctx.event = [:];
}

ctx.event.agent_id_status = verified(ctx, params);""",
          "params" : {
            "trusted_users" : [
              {
                "username" : "elastic/fleet-server",
                "realm" : "_service_account"
              },
              {
                "username" : "cloud-internal-agent-server",
                "realm" : "found"
              },
              {
                "username" : "elastic",
                "realm" : "reserved"
              }
            ]
          }
        }
      },
      {
        "remove" : {
          "field" : "_security",
          "ignore_missing" : true
        }
      }
    ],
    "on_failure" : [
      {
        "remove" : {
          "field" : "_security",
          "ignore_missing" : true,
          "ignore_failure" : true
        }
      },
      {
        "append" : {
          "field" : "error.message",
          "value" : [
            "failed in Fleet agent final_pipeline: {{ _ingest.on_failure_message }}"
          ]
        }
      }
    ]
  }
}

No don't do that I didn't realize there was already a final pipeline.

So another may be another way to do it.

Is create a top level pipeline,
That calls the actual IIS pipeline
And then calls your additional pipeline.

That way you don't end up editing the actual IIS pipeline.

Then you override the pipeline with input.pipeline in the fleet settings with the value of your new top level Pipeline.

EDIT : Darn that is the old style filebeat... looking how to do this with the new agent

  1. Apologies ... at this time it looks like the only option is to actually edit the IIS Pipeline, I do not see a direct way to override the pipeline in the new IIS Agent settings... Hmmm

  2. If you have already ingested logs there is no easy way to just add those fields back to existing documents without a reindex.

Thanks again @stephenb I'm figuring out how to do this. Should I run a "update_by_query" with match_all: {} against the datastream using the new pipeline?

Or it is a better option to wipe and ingest everything again? Related to that my question is how can I do that because I tried deleting the integration and then the related datastream but after some seconds the datastream appears again. I tried this with the nginx integration.

This one specifically [metrics-nginx.stubstatus-default] I removed the integration, uninstalled the assets, I delete it and appears again.

Thanks