Logstash dont do anything but received lines

Hello cyberbrothers. I came across the fact that the logtash successfully reads the lines, but does nothing further. I've spent two sleepless days on this, please take a look at it with a clear eye.

My logstash conf

input {
  file {
    path => ["/usr/local/airflow/logs/*/*/*/*.log"]
    codec => "line"
  }
}
filter {
  grok {
    match => { "path" => "/usr/local/airflow/logs/(?<dag_id>.*?)/(?<run_id>.*?)/(?<task_id>.*?)/(?<try_number>.*?).log$" }
  }
  mutate {
      add_field => {
        "log_id" => "%{[dag_id]}-%{[run_id]}-%{[task_id]}-%{[try_number]}"
      }
  }
}
output {
  elasticsearch {
    hosts => ["http://xx.xx.x.xxx:9200"]
    index => "airflow-logs"
    user => 'elastic'
    password => 'default'
  }
}

Logstash’s log:

[2025-08-22T07:16:29,669][DEBUG][logstash.inputs.file     ] Received line {:path=>"/usr/local/airflow/logs/dag_id=abonent_info_delivery_janissary/run_id=manual__2025-08-22T07:13:29.991136+00:00/task_id=abonent1_3-pipeline.tsv_to_incremental/attempt=1.log", :text=>"PROCESS STARTED - CONVERT TSV-FILES FROM HDFS://DATA/ABONENT_INFO_DELIVERY_JANISSARY/TSV/\xD0\x9C\xD0\xA2\xD0\xA1_3/2025-08-22/1078317 TO HBASE AND SAVE TO HDFS://DATA/ABONENT_INFO_DELIVERY_JANISSARY/INCREMENTAL/\xD0\x9C\xD0\xA2\xD0\xA1_3/2025-08-22/1078317"}

But that's it! He only receives and nothing else. Grok debugger Kibana says everything is fine

My mental state depends on your help….

logstash compose

  logstash:
    image: logstash:7.1.1
    container_name: log
    environment:
      LS_JAVA_OPTS: "-Xms512m -Xmx512m"
      XPACK_MONITORING_ENABLED: 'false'
      LOG_LEVEL: 'trace'
    volumes:
      - ./logstash.conf:/usr/share/logstash/pipeline/logstash.conf
      - '/usr/local/airflow/logs:/usr/local/airflow/logs'
    ports:
      - "5000:5000/tcp"
      - "5000:5000/udp"
      - "5044:5044"
      - "9600:9600"
    command: logstash -f /usr/share/logstash/pipeline/logstash.conf
    networks:
      airflow-internal:
        ipv4_address: 10.192.3.10

Hello @trenhard

Looking at the post could you please confirm if this understanding is correct?

You are not reading content within the log file but instead you are reading the log file name & trying to ingest that into elastic, so if there are 100 logs files in the path you are expecting 100 entries in elastic?

And if above understanding is not right could you please share few lines from log file

/usr/local/airflow/logs/*/*/*/*.log

Thanks!!

Hello @Tortoise !

When the DAG is launched, its full cycle includes 9 stages. In order for Airflow to distinguish the logs of stage 2 from stage 8, it uses the parameter log_id_template={dag_id}-{task_id}-{run_id}-{try_number}, which sends a request to Elastic. The folder structure is shown below, the final file in which is attempt=1.log.

For example, the content /usr/local/airflow/logs/dag_id=abonent_info_delivery_janissary/run_id=manual__2025-08-23T07:57:54.336244+00:00/task_id=validate_input_tasks/attempt=1.log

[2025-08-23T11:09:36.891+0300] {local_task_job_runner.py:123} INFO - ::group::Pre task execution logs
[2025-08-23T11:09:36.927+0300] {taskinstance.py:2614} INFO - Dependencies all met for dep_context=non-requeueable deps ti=<TaskInstance: abonent_info_delivery_janissary.validate_input_tasks manual__2025-08-23T08:08:46.090558+00:00 [queued]>
[2025-08-23T11:09:36.941+0300] {taskinstance.py:2614} INFO - Dependencies all met for dep_context=requeueable deps ti=<TaskInstance: abonent_info_delivery_janissary.validate_input_tasks manual__2025-08-23T08:08:46.090558+00:00 [queued]>
[2025-08-23T11:09:36.941+0300] {taskinstance.py:2867} INFO - Starting attempt 1 of 1
[2025-08-23T11:09:36.963+0300] {taskinstance.py:2890} INFO - Executing <Task(PythonOperator): validate_input_tasks> on 2025-08-23 11:08:46.090558+03:00
[2025-08-23T11:09:36.969+0300] {standard_task_runner.py:72} INFO - Started process 1975 to run task
[2025-08-23T11:09:36.974+0300] {standard_task_runner.py:104} INFO - Running: ['***', 'tasks', 'run', 'abonent_info_delivery_janissary', 'validate_input_tasks', 'manual__2025-08-23T08:08:46.090558+00:00', '--job-id', '12', '--raw', '--subdir', 'DAGS_FOLDER/abonent_info_delivery_janissary.dag.py', '--cfg-path', '/tmp/tmp6jcq0mdz']
[2025-08-23T11:09:36.977+0300] {standard_task_runner.py:105} INFO - Job 12: Subtask validate_input_tasks
[2025-08-23T11:09:37.089+0300] {task_command.py:467} INFO - Running <TaskInstance: abonent_info_delivery_janissary.validate_input_tasks manual__2025-08-23T08:08:46.090558+00:00 [running]> on host bigdata-03.main01.hadoop.apps.prod.int.nt-ias.ru
[2025-08-23T11:09:37.296+0300] {taskinstance.py:3134} INFO - Exporting env vars: AIRFLOW_CTX_DAG_OWNER='***' AIRFLOW_CTX_DAG_ID='abonent_info_delivery_janissary' AIRFLOW_CTX_TASK_ID='validate_input_tasks' AIRFLOW_CTX_EXECUTION_DATE='2025-08-23T11:08:46.090558+03:00' AIRFLOW_CTX_TRY_NUMBER='1' AIRFLOW_CTX_DAG_RUN_ID='manual__2025-08-23T08:08:46.090558+00:00'
[2025-08-23T11:09:37.297+0300] {taskinstance.py:732} INFO - ::endgroup::
[2025-08-23T11:09:37.340+0300] {python.py:240} INFO - Done. Returned value was: None
[2025-08-23T11:09:37.363+0300] {taskinstance.py:341} INFO - ::group::Post task execution logs
[2025-08-23T11:09:37.364+0300] {taskinstance.py:353} INFO - Marking task as SUCCESS. dag_id=abonent_info_delivery_janissary, task_id=validate_input_tasks, run_id=manual__2025-08-23T08:08:46.090558+00:00, execution_date=20250823T110846, start_date=20250823T110936, end_date=20250823T110937
[2025-08-23T11:09:37.467+0300] {local_task_job_runner.py:266} INFO - Task exited with return code 0
[2025-08-23T11:09:37.492+0300] {local_task_job_runner.py:245} INFO - ::endgroup::

And how this file is handled by logstash. I expect that it forms a full-fledged log for a specific stage of the Airflow in Elastic piece by piece.

[2025-08-23T08:09:51,321][DEBUG][logstash.inputs.file     ] Received line {:path=>"/usr/local/airflow/logs/dag_id=abonent_info_delivery_janissary/run_id=manual__2025-08-23T08:08:46.090558+00:00/task_id=validate_input_tasks/attempt=1.log", :text=>"[2025-08-23T11:09:36.941+0300] {taskinstance.py:2614} INFO - Dependencies all met for dep_context=requeueable deps ti=<TaskInstance: abonent_info_delivery_janissary.validate_input_tasks manual__2025-08-23T08:08:46.090558+00:00 [queued]>"}
[2025-08-23T08:09:51,321][DEBUG][logstash.inputs.file     ] Received line {:path=>"/usr/local/airflow/logs/dag_id=abonent_info_delivery_janissary/run_id=manual__2025-08-23T08:08:46.090558+00:00/task_id=validate_input_tasks/attempt=1.log", :text=>"[2025-08-23T11:09:36.941+0300] {taskinstance.py:2867} INFO - Starting attempt 1 of 1"}
[2025-08-23T08:09:51,322][DEBUG][logstash.inputs.file     ] Received line {:path=>"/usr/local/airflow/logs/dag_id=abonent_info_delivery_janissary/run_id=manual__2025-08-23T08:08:46.090558+00:00/task_id=validate_input_tasks/attempt=1.log", :text=>"[2025-08-23T11:09:36.963+0300] {taskinstance.py:2890} INFO - Executing <Task(PythonOperator): validate_input_tasks> on 2025-08-23 11:08:46.090558+03:00"}
[2025-08-23T08:09:51,322][DEBUG][logstash.inputs.file     ] Received line {:path=>"/usr/local/airflow/logs/dag_id=abonent_info_delivery_janissary/run_id=manual__2025-08-23T08:08:46.090558+00:00/task_id=validate_input_tasks/attempt=1.log", :text=>"[2025-08-23T11:09:36.969+0300] {standard_task_runner.py:72} INFO - Started process 1975 to run task"}
[2025-08-23T08:09:51,322][DEBUG][logstash.inputs.file     ] Received line {:path=>"/usr/local/airflow/logs/dag_id=abonent_info_delivery_janissary/run_id=manual__2025-08-23T08:08:46.090558+00:00/task_id=validate_input_tasks/attempt=1.log", :text=>"[2025-08-23T11:09:36.974+0300] {standard_task_runner.py:104} INFO - Running: ['***', 'tasks', 'run', 'abonent_info_delivery_janissary', 'validate_input_tasks', 'manual__2025-08-23T08:08:46.090558+00:00', '--job-id', '12', '--raw', '--subdir', 'DAGS_FOLDER/abonent_info_delivery_janissary.dag.py', '--cfg-path', '/tmp/tmp6jcq0mdz']"}
[2025-08-23T08:09:51,322][DEBUG][logstash.inputs.file     ] Received line {:path=>"/usr/local/airflow/logs/dag_id=abonent_info_delivery_janissary/run_id=manual__2025-08-23T08:08:46.090558+00:00/task_id=validate_input_tasks/attempt=1.log", :text=>"[2025-08-23T11:09:36.977+0300] {standard_task_runner.py:105} INFO - Job 12: Subtask validate_input_tasks"}
[2025-08-23T08:09:51,323][DEBUG][logstash.inputs.file     ] Received line {:path=>"/usr/local/airflow/logs/dag_id=abonent_info_delivery_janissary/run_id=manual__2025-08-23T08:08:46.090558+00:00/task_id=validate_input_tasks/attempt=1.log", :text=>"[2025-08-23T11:09:37.089+0300] {task_command.py:467} INFO - Running <TaskInstance: abonent_info_delivery_janissary.validate_input_tasks manual__2025-08-23T08:08:46.090558+00:00 [running]> on host bigdata-03.main01.hadoop.apps.prod.int.nt-ias.ru"}
[2025-08-23T08:09:51,323][DEBUG][logstash.inputs.file     ] Received line {:path=>"/usr/local/airflow/logs/dag_id=abonent_info_delivery_janissary/run_id=manual__2025-08-23T08:08:46.090558+00:00/task_id=validate_input_tasks/attempt=1.log", :text=>"[2025-08-23T11:09:37.296+0300] {taskinstance.py:3134} INFO - Exporting env vars: AIRFLOW_CTX_DAG_OWNER='***' AIRFLOW_CTX_DAG_ID='abonent_info_delivery_janissary' AIRFLOW_CTX_TASK_ID='validate_input_tasks' AIRFLOW_CTX_EXECUTION_DATE='2025-08-23T11:08:46.090558+03:00' AIRFLOW_CTX_TRY_NUMBER='1' AIRFLOW_CTX_DAG_RUN_ID='manual__2025-08-23T08:08:46.090558+00:00'"}
[2025-08-23T08:09:51,323][DEBUG][logstash.inputs.file     ] Received line {:path=>"/usr/local/airflow/logs/dag_id=abonent_info_delivery_janissary/run_id=manual__2025-08-23T08:08:46.090558+00:00/task_id=validate_input_tasks/attempt=1.log", :text=>"[2025-08-23T11:09:37.297+0300] {taskinstance.py:732} INFO - ::endgroup::"}
[2025-08-23T08:09:51,323][DEBUG][logstash.inputs.file     ] Received line {:path=>"/usr/local/airflow/logs/dag_id=abonent_info_delivery_janissary/run_id=manual__2025-08-23T08:08:46.090558+00:00/task_id=validate_input_tasks/attempt=1.log", :text=>"[2025-08-23T11:09:37.340+0300] {python.py:240} INFO - Done. Returned value was: None"}
[2025-08-23T08:09:51,324][DEBUG][logstash.inputs.file     ] Received line {:path=>"/usr/local/airflow/logs/dag_id=abonent_info_delivery_janissary/run_id=manual__2025-08-23T08:08:46.090558+00:00/task_id=validate_input_tasks/attempt=1.log", :text=>"[2025-08-23T11:09:37.363+0300] {taskinstance.py:341} INFO - ::group::Post task execution logs"}
[2025-08-23T08:09:51,324][DEBUG][logstash.inputs.file     ] Received line {:path=>"/usr/local/airflow/logs/dag_id=abonent_info_delivery_janissary/run_id=manual__2025-08-23T08:08:46.090558+00:00/task_id=validate_input_tasks/attempt=1.log", :text=>"[2025-08-23T11:09:37.364+0300] {taskinstance.py:353} INFO - Marking task as SUCCESS. dag_id=abonent_info_delivery_janissary, task_id=validate_input_tasks, run_id=manual__2025-08-23T08:08:46.090558+00:00, execution_date=20250823T110846, start_date=20250823T110936, end_date=20250823T110937"}
[2025-08-23T08:09:51,324][DEBUG][logstash.inputs.file     ] Received line {:path=>"/usr/local/airflow/logs/dag_id=abonent_info_delivery_janissary/run_id=manual__2025-08-23T08:08:46.090558+00:00/task_id=validate_input_tasks/attempt=1.log", :text=>"[2025-08-23T11:09:37.467+0300] {local_task_job_runner.py:266} INFO - Task exited with return code 0"}
[2025-08-23T08:09:51,324][DEBUG][logstash.inputs.file     ] Received line {:path=>"/usr/local/airflow/logs/dag_id=abonent_info_delivery_janissary/run_id=manual__2025-08-23T08:08:46.090558+00:00/task_id=validate_input_tasks/attempt=1.log", :text=>"[2025-08-23T11:09:37.492+0300] {local_task_job_runner.py:245} INFO - ::endgroup::"}

At the moment, in the Airflow GUI, in the logs of the validate_tasks stage, I have the following

*** Log abonent_info_delivery_janissary-validate_input_tasks-manual__2025-08-23T08:08:46.090558+00:00-1 not found in Elasticsearch. If your task started recently, please wait a moment and reload this page. Otherwise, the logs for this task instance may have been removed.

And in Elastic itself, I don't even see the creation of the airflow-logs index.

slightly tweaked the Grok Pattern. But I don't see any signs of sending to ES, no errors, no successful logs, as if nothing is happening.

and now the generated log_id field must completely match the Airflow template with which it climbs into Elastic:

abonent_info_delivery_janissary-validate_input_tasks-manual__2025-08-23T08:37:44.686735+00:00-1

it seems that the line codec was not suitable for my case. After changing the codec to plain and adding debugging output, some movements towards Elastic began! My config looks like this and this is what Logstash outputs to me

input {
  file {
    path => ["/usr/local/airflow/logs/*/*/*/*.log"]
    codec => "plain"
  }
}
filter {
  grok {
    match => { "path" => "/usr/local/airflow/logs/dag_id=(?<dag_id>.*?)/run_id=(?<run_id>.*?)/task_id=(?<task_id>.*?)/attempt=(?<try_number>.*?).log$" }
    add_field => {
        "log_id" => "%{dag_id}-%{run_id}-%{task_id}-%{try_number}"
      }
  }
}
output {
  stdout {
    codec => rubydebug {
      metadata => true
    }
  }
  elasticsearch {
    hosts => ["http://xx.xx.x.xxx:9200"]
    index => "airflow-logs"
    user => 'elastic'
    password => 'default'
  }
}
{
          "path" => "/usr/local/airflow/logs/dag_id=abonent_info_delivery_janissary/run_id=manual__2025-08-23T11:03:50.742564+00:00/task_id=validate_input_tasks/attempt=1.log",
        "log_id" => "abonent_info_delivery_janissary-manual__2025-08-23T11:03:50.742564+00:00-validate_input_tasks-1",
        "run_id" => "manual__2025-08-23T11:03:50.742564+00:00",
    "@timestamp" => 2025-08-23T11:04:56.813Z,
     "@metadata" => {
        "host" => "ddc07cdee6c4",
        "path" => "/usr/local/airflow/logs/dag_id=abonent_info_delivery_janissary/run_id=manual__2025-08-23T11:03:50.742564+00:00/task_id=validate_input_tasks/attempt=1.log"
    },
          "host" => "ddc07cdee6c4",
      "@version" => "1",
       "task_id" => "validate_input_tasks",
    "try_number" => "1",
        "dag_id" => "abonent_info_delivery_janissary",
       "message" => "[2025-08-23T14:04:41.510+0300] {standard_task_runner.py:72} INFO - Started process 263 to run task"
}
[2025-08-23T11:04:58,587][ERROR][logstash.outputs.elasticsearch] Encountered a retryable error. Will Retry with exponential backoff  {:code=>400, :url=>"http://xx.xx.x.xxx:9200/_bulk", :body=>"{\"error\":{\"root_cause\":[{\"type\":\"illegal_argument_exception\",\"reason\":\"Action/metadata line [1] contains an unknown parameter [_type]\"}],\"type\":\"illegal_argument_exception\",\"reason\":\"Action/metadata line [1] contains an unknown parameter [_type]\"},\"status\":400}"}

What versions of logstash and elasticsearch are you using?

1 Like

Badger, as you correctly noted, the versions differed in major parts, which is why logstash version 7 could not find a template for ES version 8. After downgrading the ES version, I finally saw the coveted airflow-logs index! But the webserver GUI still does not find logs in Elastic. I'm digging further and will report back here as soon as he can pull them up (I hope so). Perhaps some Airflow geek will find this case useful.

Oh yes, I did it.

Each Elastic object must have an offset field, by which Airflow sorts the logs before displaying them in the GUI. There are several ways to set offset. The first is to use additional software in the form of Filebeat, which automatically sets an offset to each message. The second option is to insert Ruby code into logstash.

filter {
  ruby {
    code => '
      # Initialize the counter if not already set
      if !defined?(@@line_counter)
        @@line_counter = 0
      end

      # Increment the counter and set the line number
      @@line_counter += 1
      event.set("offset", @@line_counter)
    '
  }
}

For each Airflow task, there should be a message with a log end marker. In Airflow, this parameter is set to "end_of_log" by default. If there is no such message, the webserver goes into an endless loop of receiving the log. I implemented this additional message by inserting Ruby code.
And the last thing is to remove as much as possible the extra fields that Filebeat inserts. Implemented using mutate.remove_field.

Logstash config with the first option of setting offset - that is, connecting a container next to Filebeat:

input {
  beats {
    port => 5044
  }
}
filter {
  grok {
    match => { "path" => "/usr/local/airflow/logs/dag_id=(?<dag_id>.*?)/run_id=(?<run_id>.*?)/task_id=(?<task_id>.*?)/attempt=(?<try_number>.*?).log$" }
    add_field => {
        "log_id" => "%{dag_id}-%{task_id}-%{run_id}-%{try_number}"
      }
  }
  mutate {
    remove_field => ["agent", "ecs", "host", "@version", "tags", "log", "input"]
  }
  ruby {
    code => "
      message = event.get('message')

      if message
        if message['{local_task_job_runner.py:245}'] and message['::endgroup::']
          require 'pry'

          finish_event = LogStash::Event.new
          finish_event.set('message', 'end_of_log')
          finish_event.set('log_id', event.get('log_id'))
          finish_event.set('offset', event.get('offset')+1)

          new_event_block.call(finish_event)
        end
      end
    "
  }
}
output {
  stdout {
    codec => rubydebug
  }
  elasticsearch {
    hosts => ["http://xx.xx.x.xxx:9200"]
    index => "airflow-logs"
    user => 'elastic'
    password => 'default'
  }
}

filebeat.yml

filebeat.inputs:
  - type: filestream
    enabled: true
    id: my-filestream-id
    paths:
      - /usr/local/airflow/logs/*/*/*/*.log
    processors:
      - rename:
          fields:
            - from: "log.offset"
              to: "offset"
            - from: "log.file.path"
              to: "path"
output.logstash:
  hosts: ["10.192.3.10:5044"]

The topic can be closed, thank you all