Automatic Index Creation

Do you want output section of Filebeat.yml. Actually, I am new to the world of Elastic, so I apologize for asking these questions.

Thanks,
Debasis

You need to share how this event looks like in your elasticsearch, in Kibana.

But you shared 2 different filebeat.yml, which one are you running?

Also, I do not use filebeat for parsing, so I'm not sure your document is really getting parsed, looking into the documentation the decode_csv_fields will not create any fields, it will create an array, so you will not have any field named timestamp.

Please share how this document looks like inside your Elasticsearch and Kibana.

@leandrojmp Currently I am using second filebeat.yml (the other one is initial version of filebeat) . Below is the one record after inserting in to Index by filebeat process.

    "_index": "sfw230301v1",
    "_id": "8eMyeo4B3GeqyTEgKyyX",
    "_score": 1,
    "_source": {
      "log": {
        "file": {
          "path": "/data/elastic/ss7-edr-generator/result/2023-03-01_1711349485171.csv"
        },
        "offset": 1178726611
      },
      "sms_hub_type": "A2P",
      "msg_source": "IND19",
      "traffic_type": "2",
      "pid": "0",
      "cg_gt": "919968881474",
      "imsi": "404956772952693",
      "opcode": "44",
      "rule_type": "59",
      "cg_operator": "IND19",
      "action_id": "7",
      "sms_content": "I'm not going to stay there in the house in Goulburn alone with him he thinks he can control me he can't control himself he doesn't have any fear if something happened to Divina child protection would have been notified last night I'm already walking on thin ice they will grab the baby n take her away n it will be months before we would b able to get her back once child protection services gets involved.",
      "dcs": "0",
      "service_id": "33",
      "cd_plan": "1",
      "tenant": "INDAT",
      "timestamp": "1677666172000",
      "cg_plan": "1",
      "smsc": "919968881474",
      "edr_version": "V2",
      "sender_type": "ALPHANUMERIC",
      "session_id": "1171134947836087159480957",
      "cd_gt": "919008474416",
      "message_id": "60803",
      "message_type": "MT",
      "cd_ssn": "8",
      "rule_id": "7208",
      "sequence": "171134947836087159480957",
      "component": "31",
      "cd_operator": "INDAT",
      "@timestamp": "2024-03-26T09:58:04.366Z",
      "prov_version": "542",
      "tpoa": "FACEBOOK",
      "fragment_number": "1",
      "cg_ssn": "8"
    }
  }

Thanks,
Debasis

@carly.richmond I had tried the option you had mention but could not help me . Could you please help me if I am missing anything.

Thanks,
Debasis

Hi @Debasis_Mallick,

Can you explain what you mean by the processor did not help you? It would be good to understand what you need the output to look like that is different to the ingested document that you've shared.

Let us know!

Hi @carly.richmond ,

As I told earlier below one is a sample record from the csv file(GENERATED_EDR_SS7_1733656032000.csv) which we will received from customer.

sequence,component,tenant,service_id,session_id,timestamp,edr_version
1711807094739562106743,31,INDAT,33,11711807094739562106740,1689182220000,V2

We want filebeat should read the timestamp which is in epoch format and create index on that basis.
For Example:- In the above record after conversion it will be 12-JULY-2023 so we want Index as sfwindex-12072023.
And the record should insert to Index (sfwindex-12072023) while querying the index the output will shown as below.

{
        "_index": "sfwindex-12072023",
        "_id": "Tx0y1Y0B2HV3ao-3ugBi",
        "_score": 4.4236135,
        "_source": {
          "agent": {
            "name": "cb-1",
            "id": "419beb27-818d-45b2-9aa9-685b7574403d",
            "ephemeral_id": "78b64bde-c774-4572-9692-9a07df46a6ec",
            "type": "filebeat",
            "version": "8.9.2"
          },
          "log": {
            "file": {
              "path": "/disk2/ss7-edr-generator/result/cb4transv1/GENERATED_EDR_SS7_1733656032000.csv"
            },
            "offset": 1129063480
          },
          "sequence": "1711807094739562106743",
          "component": "31",
          "tenant": "INDAT",
          "service_id": "33",
          "session_id": "11711807094739562106740",
          "timestamp": "1689182220000",
          "edr_version": "V2"
      },

Thanks,
Debasis

Please show the full Filebeat processor config as well as the Elsticsarch output configuration together with a document that was indexed into Elasticsearch using this exact configuration. If you have an ingest pipeline that process the data before it is indexed it would be good if you shared what this does too.

All of this would be a lot easier to analyse if we had a consistent snapshot and not pieces of data provided at different points in time with potentially different configuration.

1 Like

Are you referring Elasticsearch output section in filebeat.yml or after ingestion how the output looks like while querying through Kibana.

Thanks,
Debasis

I would like to see the following, all captured in a consistent manner:

  • Elasticsearch output configured in Filebeat
  • The full processor configuration in Filebeat
  • Any ingest pipeline that may alter the event
  • A sample event from Elasticsearch that was processed through the configuration above

Not sure how to capture sample event.

Please find the other details as below.


# ---------------------------- Elasticsearch Output ----------------------------
output.elasticsearch:
  # Array of hosts to connect to.
  hosts: ["https://10.10.18.174:9200","https://10.10.18.215:9200"]
  worker: 8
  bulk_max_size: 3000
  # Protocol - either `http` (default) or `https`.
  protocol: "https"
  index: "sfwindex-%{+yyyy.MM.dd}"

  # Authentication credentials - either API key or username/password.
  # api_key: "id:api_key"
  username: "elastic"
  password: "elastic"
  ssl:
    enabled: true
    certificate_authorities: ["/etc/filebeat/certs/cert.pem"]

# ================================= Processors =================================
processors:

  - timestamp:

      field: "timestamp"
      target_field: "@timestamp"
      layouts:
         - "UNIX_MS"

Below is the pipeline 


PUT _ingest/pipeline/parse_elastic_data_v1
{
  "processors": [
    {
      "csv": {
        "description": "Parse elastic Data From CSV Files",
        "field": "message",
        "target_fields": ["sequence",
        "component","tenant",
        "service_id","session_id",
        "timestamp","edr_version",
        "prov_version","action_id",
        "action_extra_info","rule_type",
        "rule_id", "traffic_type",
        "ac_group","ac_version",
        "opcode","cg_plan", "cg_gt",
        "cg_ssn","cg_operator",
        "cd_plan","cd_gt",
        "cd_ssn", "cd_operator",
        "imsi", "location_operator",
        "location_timestamp","sms_content",
        "message_id","fragment_number",
        "sms_analytics_case_id","msisdn",
        "msisdn_ton","msisdn_npi","tpoa",        
        "sender_type","sms_hub_type",      
        "embedded_url","domain",      
        "msg_source","smsc",      
        "pid","dcs","message_type"],
        "separator": ",",
        "ignore_missing":true,
        "trim":true
      }
      }
	     }
 ]
}

Thanks,
Debasis

The issue is that you are not parsing the CSV data in Filebeat so the field your timestamp processor is based on does not exist at that point. The expected timestamp is therefore not available for your Elasticsearch output, which results in the wrong index being targeted.

If you change to perform the CSV parsing (see the CSV processor in Filebeat) ahead of the timestamp processor instead of in the ingest pipeline it should work correctly.

@Christian_Dahlqvist So you are saying instead of ingest pipeline I should use (Decode CSV fields | Filebeat Reference [8.14] | Elastic) decode csv processor to parse the csv data.

Thanks,
Debasis

Yes, the steps need to be done in the right order, and the ingest pipeline is only run once the data has reached Elasticsearch. I guess you could also do it all in the ingest pipeline (parse CSV and after that process the timestamp and change the index name), but processing in Filebeat is likely easier and more standard.

No, there is no need for this, also the decode_csv_fields will transform your csv message into an array where each index will have one of the columns.

It will output the values as an array of strings. This processor is available for Filebeat.

You will not have any named fields in the same way you have using the ingest pipeline, currently there is no way to have named fields using filebeat without having to create a rename processor to each index in the array.

Since you already have an ingest pipeline parsing your csv, you just need to add a date processor in your ingest pipeline to parse your date field.

The documentation for the date processor is here.

You probably just need this:

{
  "description" : "...",
  "processors" : [
    {
      "date" : {
        "field" : "timestamp",
        "target_field" : "@timestamp",
        "formats" : ["UNIX_MS"]
      }
    }
  ]
}
1 Like

In this case it is more easier on the ingest pipeline side, filebeat does not really parse the csv, it just transform the csv message into an array, you cannot have named fields without having a lot of rename processors after the csv processor.

1 Like

OK. Then the ingest pipeline solution is clearly the better path. Just setting the @timestamp field will not change the index name though, will it? Would he not also need to find a way to change the index name set by Filebeat based on the processed timestamp?

Yeah, but this also can be done on the ingest pipeline side, to change the index name it the date_index_name processor would need to be used.

1 Like

Yes, I was referring to doing this from the ingest pipeline.

What you need to do is:

  • Parse your data using the ingest pipeline you are already using
  • Add a date processor to the ingest pipeline to parse your date field
  • Add a date_index_name processor to change the index name.

With this you will be able to change the index named based on the date of your documents if this is what you want.

@leandrojmp Let me create the pipeline as you suggest and update in case of any error.

Thanks,
Debasis