Ingest Azure Siginin logs array of json object using http_poller

I am trying to use the logstash http_poller input plugin to ingest Azure Signin Logs (without an eventhub), then pass each document to logs-azure.signinlogs-1.22.0 ingest pipeline for proper parsing.

My current logstash config looks like this:

input {
  http_poller  {
    urls => {
      test1 => "https://endpoint"
    }
    request_timeout => 250
    schedule => { "every"=> "5m"}
    codec => json
  }
}
filter {}
output {
  elasticsearch {
    cloud_id => "id"
    cloud_auth => "pass"
    data_stream => "true"
    data_stream_type => "a"
    data_stream_dataset => "b"
    data_stream_namespace => "c"
    validate_after_inactivity => 0
    pipeline => "logs-azure.signinlogs-1.22.0"
  } 
}

The data coming from the endpoint looks like this:

[
    {
        "id": "uuid",
        "createdDateTime": "2023-01-13T10:4:27Z",
        "userDisplayName": "John Doe",
        "userPrincipalName": "jdoe@me.com",
        "userId": "uuid",
        "appId": "uuid",
        "riskEventTypes": [],
        "riskEventTypes_v2": [],
        "resourceDisplayName": "AppName",
        "resourceId": "uuid",
        "status": {
            "errorCode": 0,
            "failureReason": "Other.",
            "additionalDetails": "MFA"
        },
        "deviceDetail": {
            "deviceId": "",
            "isManaged": false,
            "trustType": ""
        },
        "appliedConditionalAccessPolicies": []
    },
    {
        "id": "uuid",
        "createdDateTime": "2023-10-10T11:33:12Z",
        "userDisplayName": "John Doe",
        "userPrincipalName": "jdoe@me.com",
        "userId": "uuid",
        "appId": "uuid",
        "riskEventTypes": [],
        "riskEventTypes_v2": [],
        "resourceDisplayName": "AppName",
        "resourceId": "uuid",
        "status": {
            "errorCode": 0,
            "failureReason": "Other.",
            "additionalDetails": "MFA"
        },
        "deviceDetail": {
            "deviceId": "",
            "isManaged": false,
            "trustType": ""
        },
        "appliedConditionalAccessPolicies": []
    }
]

When I run the pipeline, I get this error:

field [message] not present as part of path [message]

Then I updated the http_poller input plugin to include:

codec => json { target => "message" }
ecs_compatibility => v1

Which essentially encapsulates the entire document at the root level, when I rerun logstash, now I get errors that look like this:

Unexpected character ('r' (code 114)): was expecting double-quote to start field name\n at [Source: (String)"{resourceDisplayName=Value, resourceId=uuid, appliedConditionalAccessPolicies=, riskEventTypes=, deviceDetail={displayName=, browser=Value 7.32.91, trustType=, deviceId=, operatingSystem=Value, isCompliant=false, isManaged=false}, appDisplayName=Azure Virtual Desktop Client, isInteractive=true, riskLevelDuringSignIn=none, conditionalAccessStatus=success, ipAddress=IPv6Addr, createdDateTime=2025-03-11T03:10:31Z, u"[truncated 714 chars]; line: 1, column: 2]

I'm not entirely sure what the issue is since what I got from the endpoint is a valid JSON.

Any suggestion is appreciated.

Thanks,
O_O_O

I've never used this, but my guess is that the ingest pipeline unconditionally expects to get JSON in the message field (ignore_missing is not set). The first error message you show indicates that the message field does not exist, so the json processor fails.

You do not need to parse the logs with a json codec, just send the raw message to elasticsearch and the pipeline will parse them.

Right. When I remove the json codec and simply use target => "message" so that message can be available, then I still get the second error.

Do not modify the [message] field. You need the [message] field to contain the unparsed JSON received by the http_poller.

If you use a json codec then no [message] field is created and you get that first error message. If you use a json filter and overwrite the [message] string with a parsed object then I am not surprised you get some other error.

Do you even need to parse the [message] field? The ingest pipeline in elasticsearch will, as I understand it, do that for you. If you do need to, then set the target to some other field, perhaps under [@metadata], so that you can work with fields from the parsed message.

Any specific reason to not use an Event Hub and the Elastic Agent integration?

Those Azure logs are pretty annoying to parse in Logstash alone, if you try to mix logstash and ingest pipelines, that expect the raw message, it gets even more complicated.

From where is this log coming from?

One big issue here is that those logs are an array of events, and the Elastic Agent input will split the array into multiple events.

To do the same in Logstash you would need to parse the event and use a split filter, but this may change the document and break the parse from the ingest pipeline.

Eventhubs are out of the question for me unfortunately. :frowning:
The logs are coming from a python based API endpoint that calls Microsoft Graph, therefore it returns a list of dict.

What would be a pseudo logic that I could apply to get close enough to doing this with logstash?

You need to validate that the document that your API is returning has exactly the same format that is expected by the integrations.

In the Github repository for the integrations you have the examples of the expected format: integrations/packages/azure/data_stream/signinlogs/_dev/test/pipeline at main · elastic/integrations · GitHub

Can you share what is your Logstash output?

Add a file or stdout and share a sample of some events to make it clear how it looks like.

Here is an output of

{
  "riskState": "none",
  "resourceId": "uuid",
  "createdDateTime": "2025-03-11T19:12:10Z",
  "appliedConditionalAccessPolicies": [],
  "userId": "uuid",
  "correlationId": "uuid",
  "id": "uuid",
  "isInteractive": true,
  "@version": "1",
  "userDisplayName": "FN LN",
  "userPrincipalName": "email",
  "appId": "uuid",
  "appDisplayName": "Prod",
  "resourceDisplayName": "Windows Azure Active Directory",
  "@timestamp": "2025-03-11T19:17:11.810760859Z",
  "riskEventTypes": [],
  "clientAppUsed": "Browser",
  "conditionalAccessStatus": "success",
  "deviceDetail": {
    "browser": "Edge 134.0.0",
    "isCompliant": false,
    "displayName": "",
    "trustType": "",
    "isManaged": false,
    "deviceId": "",
    "operatingSystem": "Windows10"
  },
  "status": {
    "errorCode": 0,
    "additionalDetails": "MFA requirement satisfied by claim in the token",
    "failureReason": "Other."
  },
  "location": {
    "state": "NY",
    "city": "City",
    "countryOrRegion": "US",
    "geoCoordinates": {
      "longitude": -8.96,
      "latitude": 1.73,
      "altitude": null
    }
  },
  "riskEventTypes_v2": [],
  "riskDetail": "none",
  "riskLevelAggregated": "none",
  "riskLevelDuringSignIn": "none",
  "ipAddress": "ipv6"
}

When I use the beta version of the endpoint, I get a few more key values:

{
  "federatedCredentialId": null,
  "crossTenantAccessType": "none",
  "incomingTokenType": "none",
  "createdDateTime": "2025-03-11T18:59:19Z",
  "servicePrincipalId": "uuid",
  "correlationId": "uuid",
  "homeTenantId": "uuid",
  "homeTenantName": "",
  "appliedEventListeners": [],
  "userPrincipalName": "email",
  "appId": "uuid",
  "signInEventTypes": [
    "interactiveUser"
  ],
  "isTenantRestricted": false,
  "clientAppUsed": "Authenticated SMTP",
  "conditionalAccessStatus": "notApplied",
  "authenticationAppPolicyEvaluationDetails": [],
  "sessionId": "",
  "deviceDetail": {
    "browser": "",
    "isCompliant": false,
    "displayName": "",
    "trustType": "",
    "isManaged": false,
    "deviceId": "",
    "operatingSystem": ""
  },
  "authenticationMethodsUsed": [],
  "authenticationProtocol": "none",
  "authenticationAppDeviceDetails": null,
  "userType": "member",
  "conditionalAccessAudiences": [],
  "status": {
    "errorCode": 50126,
    "additionalDetails": "The user didn't enter the right credentials. It's expected to see some number of these errors in your logs due to users making mistakes.",
    "failureReason": "Error validating credentials due to invalid username or password."
  },
  "processingTimeInMilliseconds": 70,
  "originalRequestId": "",
  "riskEventTypes_v2": [],
  "riskLevelAggregated": "none",
  "autonomousSystemNumber": 11111,
  "riskLevelDuringSignIn": "none",
  "uniqueTokenIdentifier": "token",
  "servicePrincipalCredentialThumbprint": "",
  "tokenIssuerName": "",
  "riskState": "none",
  "servicePrincipalName": null,
  "resourceOwnerTenantId": "uuid",
  "authenticationRequirementPolicies": [],
  "managedServiceIdentity": {
    "federatedTokenIssuer": null,
    "msiType": "none",
    "federatedTokenId": null,
    "associatedResourceId": null
  },
  "resourceId": "uuid",
  "clientCredentialType": "clientAssertion",
  "signInIdentifierType": null,
  "signInTokenProtectionStatus": "unbound",
  "appliedConditionalAccessPolicies": [],
  "tokenIssuerType": "AzureAD",
  "userId": "uuid",
  "authenticationProcessingDetails": [
    {
      "value": "0",
      "key": "Certificate authorities CRL enforcement status"
    }
  ],
  "networkLocationDetails": [],
  "authenticationDetails": [
    {
      "authenticationMethod": "Password",
      "succeeded": false,
      "authenticationStepResultDetail": "Invalid username or password or Invalid on-premise username or password.",
      "authenticationStepDateTime": "2025-03-11T18:59:19Z",
      "authenticationStepRequirement": "",
      "authenticationMethodDetail": "Password Hash Sync"
    }
  ],
  "resourceTenantId": "uuid",
  "privateLinkDetails": {
    "policyId": "",
    "policyName": "",
    "policyTenantId": "",
    "resourceId": ""
  },
  "isThroughGlobalSecureAccess": false,
  "mfaDetail": null,
  "sessionLifetimePolicies": [],
  "id": "uuid",
  "userAgent": "ASASAS",
  "isInteractive": true,
  "userDisplayName": "Ward01",
  "resourceServicePrincipalId": "uuid",
  "tokenProtectionStatusDetails": {
    "signInSessionStatusCode": 0,
    "signInSessionStatus": "unbound"
  },
  "appDisplayName": "Office 365 Exchange Online",
  "resourceDisplayName": "Office 365 Exchange Online",
  "appOwnerTenantId": "uuid",
  "@version": "1",
  "@timestamp": "2025-03-11T19:04:26.461495347Z",
  "ipAddressFromResourceProvider": null,
  "signInIdentifier": "email",
  "servicePrincipalCredentialKeyId": null,
  "globalSecureAccessIpAddress": "",
  "location": {
    "state": "",
    "city": "",
    "countryOrRegion": "SG",
    "geoCoordinates": {
      "longitude": 10.5,
      "latitude": 13.2,
      "altitude": null
    }
  },
  "authenticationRequirement": "singleFactorAuthentication",
  "riskDetail": "none",
  "flaggedForReview": false,
  "authenticationContextClassReferences": [],
  "originalTransferMethod": "none",
  "ipAddress": "1.14.20.6"
}

According to this: https://github.com/elastic/integrations/blob/main/packages/azure/data_stream/signinlogs/_dev/test/pipeline/test-signinlogs-raw.log-expected.json#L70 it does not look like there is a perfect match.

You need to compare with the raw/sample files, the expected json file is how the event would look like in Elasticsearch after being processed by the Ingest Pipeline, this is used only by the tests of the CI/CD tools from Elastic in this repository.

As mentioned, you need to make the log that your API is getting to be similar to the log that in the sample/raw files, same fields, same data types etc, this is the message that the ingest pipeline expects to receive to be able to parse.

This integration is built to get the logs directly from Event Hub, if you cannot do that, then you will need to see what is different and change the message to match the examples in the raw and sample files.

Another option is to forget the Ingest Pipeline and try to do the parse directly in Logstash, for this you would need to see what filters are being used and see if you can replicate them using Logstash filters.

1 Like

Thank you! Those are reasonable choices.

Another thing is, the event format can also be completely different if you are getting the information using the Graph API and if you are exporting this to an Event Hub.

So, it may be a lot of changes for this to work.

I had some issues when tried to send logs to a Analytics Workspace, the format is not compatible with the parse, so it was required to export to Event Hubs.

1 Like