Filebeat load json file like bulk mode

Hey Everybody,
i have worked with Elasticsearch on inserting data with the _bulk mode with the Api(json ) , and now i want to insert data with the Filebeat loading file with the same json used with bulk mode..... and here is my data i want to insert with fielbeat
... please help me !!!

{
    "index": {
        "_id": "d03eed2a-f981-417f-a7ac-a5d5c0255ed8",
        "_index": "dev__job_instance_info"
    }
}
{
    "endDateTime": "2022-04-15T11:42:28.212644300",
    "env": "DEV",
    "executionId": "d03eed2a-f981-417f-a7ac-a5d5c0255ed8",
    "inputCount": 1,
    "jobName": "CustomerServicePortType_CustomerServiceOperation",
    "jobStatus": "SUCCESS",
    "jobType": "Service",
    "jobVersion": "0.1",
    "outputCount": 1,
    "rejectCount": 0,
    "source": "Customer",
    "startDateTime": "2022-04-15T11:42:27.039734200",
    "target": "Kibana"
}
{
    "index": {
        "_id": "CustomerServicePortType_CustomerServiceOperation-4245555",
        "_index": "dev__job_instance_detail"
    }
}
{
    "dateTime": "2022-04-15T11:42:27.416730",
    "detailsType": null,
    "endDateTime": "2022-04-15T11:42:28.212644300",
    "env": "DEV",
    "executionId": "d03eed2a-f981-417f-a7ac-a5d5c0255ed8",
    "functionalId": "4245555",
    "inputCount": 1,
    "inputData": "{\"country\":\"fdf\",\"firstname\":\"sdf\",\"id\":\"4245555\",\"email\":\"dsf\",\"lastname\":\"dsd\"}",
    "inputName": null,
    "jobName": "CustomerServicePortType_CustomerServiceOperation",
    "jobStatus": "SUCCESS",
    "jobType": "Service",
    "jobVersion": "0.1",
    "message": null,
    "outputCount": 1,
    "outputData": null,
    "rejectCount": 0,
    "source": "Customer",
    "startDateTime": "2022-04-15T11:42:27.039734200",
    "status": "SUCCESS",
    "target": "Kibana"
}
{
    "index": {
        "_index": "dev__job_instance_detail_history"
    }
}
{
    "dateTime": "2022-04-15T11:42:27.416730",
    "detailsType": null,
    "endDateTime": "2022-04-15T11:42:28.212644300",
    "env": "DEV",
    "executionId": "d03eed2a-f981-417f-a7ac-a5d5c0255ed8",
    "functionalId": "4245555",
    "inputCount": 1,
    "inputData": "{\"country\":\"fdf\",\"firstname\":\"sdf\",\"id\":\"4245555\",\"email\":\"dsf\",\"lastname\":\"dsd\"}",
    "inputName": null,
    "jobName": "CustomerServicePortType_CustomerServiceOperation",
    "jobStatus": "SUCCESS",
    "jobType": "Service",
    "jobVersion": "0.1",
    "message": null,
    "outputCount": 1,
    "outputData": null,
    "rejectCount": 0,
    "source": "Customer",
    "startDateTime": "2022-04-15T11:42:27.039734200",
    "status": "SUCCESS",
    "target": "Kibana"
}

Hi @walid_louis Welcome to the community.

Filebeat is expecting ndjson single line newline Delimited JSON (ndjson) Not pretty json.

You could reformat your data using jq See Here after it is written or have the source write the data as ndsjon.

The you will need to drop the "_index" line it is no longer needed as you will define the destination index in filebeat.... you can do that when you re-write the data or with a drop processor.

Then filebeat will read it automatically using the json configuration see here

If you want to use the the _ids you will need to write them into the main json body.

cat sample-pretty.json | jq -c | grep -v _index > sample.ndjson

results in

{"endDateTime":"2022-04-15T11:42:28.212644300","env":"DEV","executionId":"d03eed2a-f981-417f-a7ac-a5d5c0255ed8","inputCount":1,"jobName":"CustomerServicePortType_CustomerServiceOperation","jobStatus":"SUCCESS","jobType":"Service","jobVersion":"0.1","outputCount":1,"rejectCount":0,"source":"Customer","startDateTime":"2022-04-15T11:42:27.039734200","target":"Kibana"}
{"dateTime":"2022-04-15T11:42:27.416730","detailsType":null,"endDateTime":"2022-04-15T11:42:28.212644300","env":"DEV","executionId":"d03eed2a-f981-417f-a7ac-a5d5c0255ed8","functionalId":"4245555","inputCount":1,"inputData":"{\"country\":\"fdf\",\"firstname\":\"sdf\",\"id\":\"4245555\",\"email\":\"dsf\",\"lastname\":\"dsd\"}","inputName":null,"jobName":"CustomerServicePortType_CustomerServiceOperation","jobStatus":"SUCCESS","jobType":"Service","jobVersion":"0.1","message":null,"outputCount":1,"outputData":null,"rejectCount":0,"source":"Customer","startDateTime":"2022-04-15T11:42:27.039734200","status":"SUCCESS","target":"Kibana"}
{"dateTime":"2022-04-15T11:42:27.416730","detailsType":null,"endDateTime":"2022-04-15T11:42:28.212644300","env":"DEV","executionId":"d03eed2a-f981-417f-a7ac-a5d5c0255ed8","functionalId":"4245555","inputCount":1,"inputData":"{\"country\":\"fdf\",\"firstname\":\"sdf\",\"id\":\"4245555\",\"email\":\"dsf\",\"lastname\":\"dsd\"}","inputName":null,"jobName":"CustomerServicePortType_CustomerServiceOperation","jobStatus":"SUCCESS","jobType":"Service","jobVersion":"0.1","message":null,"outputCount":1,"outputData":null,"rejectCount":0,"source":"Customer","startDateTime":"2022-04-15T11:42:27.039734200","status":"SUCCESS","target":"Kibana"}

So it impossible to create an _index from the input file like these ? right !!

and another thing ... here i have 3 lines ndjson and i want each line in a destination so i should define 3 index destination in filebeat config ....so how can i do this and assure each single line in his destination?

dev__job_instance_info <===={"endDateTime":"2022-04-15T11:42:28.212644300","env":"DEV","executionId":"d03eed2a-f981-417f-a7ac-a5d5c0255ed8","inputCount":1,"jobName":"CustomerServicePortType_CustomerServiceOperation","jobStatus":"SUCCESS","jobType":"Service","jobVersion":"0.1","outputCount":1,"rejectCount":0,"source":"Customer","startDateTime":"2022-04-15T11:42:27.039734200","target":"Kibana"}
dev__job_instance_detail <==== {"dateTime":"2022-04-15T11:42:27.416730","detailsType":null,"endDateTime":"2022-04-15T11:42:28.212644300","env":"DEV","executionId":"d03eed2a-f981-417f-a7ac-a5d5c0255ed8","functionalId":"4245555","inputCount":1,"inputData":"{\"country\":\"fdf\",\"firstname\":\"sdf\",\"id\":\"4245555\",\"email\":\"dsf\",\"lastname\":\"dsd\"}","inputName":null,"jobName":"CustomerServicePortType_CustomerServiceOperation","jobStatus":"SUCCESS","jobType":"Service","jobVersion":"0.1","message":null,"outputCount":1,"outputData":null,"rejectCount":0,"source":"Customer","startDateTime":"2022-04-15T11:42:27.039734200","status":"SUCCESS","target":"Kibana"}
dev__job_instance_history <==== {"dateTime":"2022-04-15T11:42:27.416730","detailsType":null,"endDateTime":"2022-04-15T11:42:28.212644300","env":"DEV","executionId":"d03eed2a-f981-417f-a7ac-a5d5c0255ed8","functionalId":"4245555","inputCount":1,"inputData":"{\"country\":\"fdf\",\"firstname\":\"sdf\",\"id\":\"4245555\",\"email\":\"dsf\",\"lastname\":\"dsd\"}","inputName":null,"jobName":"CustomerServicePortType_CustomerServiceOperation","jobStatus":"SUCCESS","jobType":"Service","jobVersion":"0.1","message":null,"outputCount":1,"outputData":null,"rejectCount":0,"source":"Customer","startDateTime":"2022-04-15T11:42:27.039734200","status":"SUCCESS","target":"Kibana"}

You need to add some logic to the output section see here based on a field etc.

thank yu very much @stephenb yu helped me a lot

Your Welcome... take a look at it and them come back with some specifics question as you proceed.

Hey @stephenb i try to cretae index with data stream but didnt work and it only create indices ..so what is the basic filebeat configuration to create data stream (dynamicaly) with filebeat

###################### Filebeat Configuration Example #########################

# This file is an example configuration file highlighting only the most common
# options. The filebeat.reference.yml file from the same directory contains all the
# supported options with more comments. You can use it as a reference.

# You can find the full configuration reference here:
# https://www.elastic.co/guide/en/beats/filebeat/index.html

# For more available modules and options, please see the filebeat.reference.yml sample
# configuration file.

# ============================== Filebeat inputs ===============================

filebeat.inputs:

# Each - is an input. Most options can be set at the input level, so
# you can use different inputs for various configurations.
# Below are the input specific configurations.

# filestream is an input for collecting log messages from files.
- type: filestream

  # Change to true to enable this input configuration.
  enabled: true
  # Paths that should be crawled and fetched. Glob based paths.
  paths:
    #- /var/log/*.log
    #- /home/centos/*
    - C:\Users\Bureau\log\az.log

  parsers:
    - ndjson:
      # Decode JSON options. Enable this if your logs are structured in JSON.
      # JSON key on which to apply the line filtering and multiline settings. This key
      # must be top level and its value must be a string, otherwise it is ignored. If
      # no text key is defined, the line filtering and multiline features cannot be used.
      message_key: message

      # By default, the decoded JSON is placed under a "json" key in the output document.
      # If you enable this setting, the keys are copied to the top level of the output document.
      keys_under_root: true

      # If keys_under_root and this setting are enabled, then the values from the decoded
      # JSON object overwrite the fields that Filebeat normally adds (type, source, offset, etc.)
      # in case of conflicts.
      #overwrite_keys: false

      # If this setting is enabled, then keys in the decoded JSON object will be recursively
      # de-dotted, and expanded into a hierarchical object structure.
      # For example, `{"a.b.c": 123}` would be expanded into `{"a":{"b":{"c":123}}}`.
      #expand_keys: false

      # If this setting is enabled, Filebeat adds an "error.message" and "error.key: json" key in case of JSON
      # unmarshaling errors or when a text key is defined in the configuration but cannot
      # be used.
      #add_error_key: true
      #document_id: "id"


  

  # Exclude lines. A list of regular expressions to match. It drops the lines that are
  # matching any regular expression from the list.
  #exclude_lines: ['^DBG']

  # Include lines. A list of regular expressions to match. It exports the lines that are
  # matching any regular expression from the list.
  #include_lines: ['^ERR', '^WARN']

  # Exclude files. A list of regular expressions to match. Filebeat drops the files that
  # are matching any regular expression from the list. By default, no files are dropped.
  #prospector.scanner.exclude_files: ['.gz$']

  # Optional additional fields. These fields can be freely picked
  # to add additional information to the crawled log files for filtering
  #fields:
  #  level: debug
  #  review: 1

# ============================== Filebeat modules ==============================

filebeat.config.modules:
  # Glob pattern for configuration loading
  path: ${path.config}/modules.d/*.yml

  # Set to true to enable config reloading
  reload.enabled: false

  # Period on which files under path should be checked for changes
  #reload.period: 10s


# =================================== Kibana ===================================

# Starting with Beats version 6.0.0, the dashboards are loaded via the Kibana API.
# This requires a Kibana endpoint configuration.
setup.kibana:

  # Kibana Host
  # Scheme and port can be left out and will be set to the default (http and 5601)
  # In case you specify and additional path, the scheme is required: http://localhost:5601/path
  # IPv6 addresses should always be defined as: https://[2001:db8::1]:5601
  host: "https://25.5.12.56"
  ssl.verification_mode : none

  # Kibana Space ID
  # ID of the Kibana Space into which the dashboards should be loaded. By default,
  # the Default Space will be used.
  #space.id:

# ======================= Elasticsearch template setting =======================

setup.template.settings:
  index.number_of_shards: 1
#  index.codec: best_compression
#  _source.enabled: false
#setup.template.enabled: true
setup.template.name: "my-index"
setup.template.pattern: "my-index"

# ---------------------------- Elasticsearch Output ----------------------------
output.elasticsearch:
  # Array of hosts to connect to.
  hosts: ["25.5.12.56:9200"]

  # Protocol - either `http` (default) or `https`.
  protocol: "https"
  ssl:
    enabled: true
    verification_mode : none
  #timeout: 600

  # Authentication credentials - either API key or username/password.
  #api_key: "id:api_key"
  username: "elastic"
  password: "azdae"
  index: "filebeat-8.1.2-%{[name]}"

# ====================== Index Lifecycle Management (ILM) ======================

# Configure index lifecycle management (ILM) to manage the backing indices
# of your data streams.

# Enable ILM support. Valid values are true, false.
#setup.ilm.enabled: true

# Set the lifecycle policy name. The default policy name is
# 'beatname'.
#setup.ilm.policy_name: "mypolicy"

# The path to a JSON file that contains a lifecycle policy configuration. Used
# to load your own lifecycle policy.
#setup.ilm.policy_file:

# Disable the check for an existing lifecycle policy. The default is true. If
# you disable this check, set setup.ilm.overwrite: true so the lifecycle policy
# can be installed.
#setup.ilm.check_exists: true

# Overwrite the lifecycle policy at startup. The default is false.
#setup.ilm.overwrite: true



and my input like this

{"name":"dev__job_instance_info","endDateTime":"2022-04-15T11:42:28.212644300","id":"9ffc24b0-e5a9-4ea6-add5-5e6461f7e52b","env":"DEV","executionId":"d03eed2a-f981-417f-a7ac-a5d5c0255ed8","inputCount":1,"jobName":"CustomerServicePortType_CustomerServiceOperation","jobStatus":"SUCCESS","jobType":"Service","jobVersion":"0.1","outputCount":1,"rejectCount":0,"source":"Customer","startDateTime":"2022-04-15T11:42:27.039734200","target":"Kibana"}
{"name":"dev__job_instance_detail","dateTime":"2022-04-16T11:42:27.416730","detailsType":null,"endDateTime":"2022-04-15T11:42:28.212644300","id":"CustomerServicePortType_CustomerServiceOperation-4245554","env":"DEV","executionId":"d03eed2a-f981-417f-a7ac-a5d5c0255ed8","functionalId":"4245555","inputCount":1,"inputData":"{\"country\":\"fdf\",\"firstname\":\"sdf\",\"id\":\"4245555\",\"email\":\"dsf\",\"lastname\":\"dsd\"}","inputName":null,"jobName":"CustomerServicePortType_CustomerServiceOperation","jobStatus":"SUCCESS","jobType":"Service","jobVersion":"0.1","message":null,"outputCount":1,"outputData":null,"rejectCount":0,"source":"Customer","startDateTime":"2022-04-15T11:42:27.039734200","status":"SUCCESS","target":"Kibana"}
{"name":"dev__job_instance_history","dateTime":"2022-04-15T11:42:27.416730","detailsType":null,"endDateTime":"2022-04-15T11:42:28.212644300","env":"DEV","executionId":"d03eed2a-f981-417f-a7ac-a5d5c0255ed8","functionalId":"4245555","inputCount":1,"inputData":"{\"country\":\"fdf\",\"firstname\":\"sdf\",\"id\":\"4245555\",\"email\":\"dsf\",\"lastname\":\"dsd\"}","inputName":null,"jobName":"CustomerServicePortType_CustomerServiceOperation","jobStatus":"SUCCESS","jobType":"Service","jobVersion":"0.1","message":null,"outputCount":1,"outputData":null,"rejectCount":0,"source":"Customer","startDateTime":"2022-04-15T11:42:27.039734200","status":"SUCCESS","target":"Kibana"}

Take this out.. just to get it working.

setup.template.settings:
  index.number_of_shards: 1
#  index.codec: best_compression
#  _source.enabled: false
#setup.template.enabled: true
setup.template.name: "my-index"
setup.template.pattern: "my-index"

In general, people often try to set up the template etc from filebeat... Not my favorite way.. Just set it all up in Elasticsearch per the docs below.

Take this out too

index: "filebeat-8.1.2-%{[name]}"

See what you get...

Also remember if filebeat already read the file it will not read it again unless you clear / rm the filebeat data dir.. That's where it keeps track of which files have been read.

If you want to use your own data streams you should probably read this...

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.