Hello @Tortoise !
When the DAG is launched, its full cycle includes 9 stages. In order for Airflow to distinguish the logs of stage 2 from stage 8, it uses the parameter log_id_template={dag_id}-{task_id}-{run_id}-{try_number}, which sends a request to Elastic. The folder structure is shown below, the final file in which is attempt=1.log.
For example, the content /usr/local/airflow/logs/dag_id=abonent_info_delivery_janissary/run_id=manual__2025-08-23T07:57:54.336244+00:00/task_id=validate_input_tasks/attempt=1.log
[2025-08-23T11:09:36.891+0300] {local_task_job_runner.py:123} INFO - ::group::Pre task execution logs
[2025-08-23T11:09:36.927+0300] {taskinstance.py:2614} INFO - Dependencies all met for dep_context=non-requeueable deps ti=<TaskInstance: abonent_info_delivery_janissary.validate_input_tasks manual__2025-08-23T08:08:46.090558+00:00 [queued]>
[2025-08-23T11:09:36.941+0300] {taskinstance.py:2614} INFO - Dependencies all met for dep_context=requeueable deps ti=<TaskInstance: abonent_info_delivery_janissary.validate_input_tasks manual__2025-08-23T08:08:46.090558+00:00 [queued]>
[2025-08-23T11:09:36.941+0300] {taskinstance.py:2867} INFO - Starting attempt 1 of 1
[2025-08-23T11:09:36.963+0300] {taskinstance.py:2890} INFO - Executing <Task(PythonOperator): validate_input_tasks> on 2025-08-23 11:08:46.090558+03:00
[2025-08-23T11:09:36.969+0300] {standard_task_runner.py:72} INFO - Started process 1975 to run task
[2025-08-23T11:09:36.974+0300] {standard_task_runner.py:104} INFO - Running: ['***', 'tasks', 'run', 'abonent_info_delivery_janissary', 'validate_input_tasks', 'manual__2025-08-23T08:08:46.090558+00:00', '--job-id', '12', '--raw', '--subdir', 'DAGS_FOLDER/abonent_info_delivery_janissary.dag.py', '--cfg-path', '/tmp/tmp6jcq0mdz']
[2025-08-23T11:09:36.977+0300] {standard_task_runner.py:105} INFO - Job 12: Subtask validate_input_tasks
[2025-08-23T11:09:37.089+0300] {task_command.py:467} INFO - Running <TaskInstance: abonent_info_delivery_janissary.validate_input_tasks manual__2025-08-23T08:08:46.090558+00:00 [running]> on host bigdata-03.main01.hadoop.apps.prod.int.nt-ias.ru
[2025-08-23T11:09:37.296+0300] {taskinstance.py:3134} INFO - Exporting env vars: AIRFLOW_CTX_DAG_OWNER='***' AIRFLOW_CTX_DAG_ID='abonent_info_delivery_janissary' AIRFLOW_CTX_TASK_ID='validate_input_tasks' AIRFLOW_CTX_EXECUTION_DATE='2025-08-23T11:08:46.090558+03:00' AIRFLOW_CTX_TRY_NUMBER='1' AIRFLOW_CTX_DAG_RUN_ID='manual__2025-08-23T08:08:46.090558+00:00'
[2025-08-23T11:09:37.297+0300] {taskinstance.py:732} INFO - ::endgroup::
[2025-08-23T11:09:37.340+0300] {python.py:240} INFO - Done. Returned value was: None
[2025-08-23T11:09:37.363+0300] {taskinstance.py:341} INFO - ::group::Post task execution logs
[2025-08-23T11:09:37.364+0300] {taskinstance.py:353} INFO - Marking task as SUCCESS. dag_id=abonent_info_delivery_janissary, task_id=validate_input_tasks, run_id=manual__2025-08-23T08:08:46.090558+00:00, execution_date=20250823T110846, start_date=20250823T110936, end_date=20250823T110937
[2025-08-23T11:09:37.467+0300] {local_task_job_runner.py:266} INFO - Task exited with return code 0
[2025-08-23T11:09:37.492+0300] {local_task_job_runner.py:245} INFO - ::endgroup::
And how this file is handled by logstash. I expect that it forms a full-fledged log for a specific stage of the Airflow in Elastic piece by piece.
[2025-08-23T08:09:51,321][DEBUG][logstash.inputs.file ] Received line {:path=>"/usr/local/airflow/logs/dag_id=abonent_info_delivery_janissary/run_id=manual__2025-08-23T08:08:46.090558+00:00/task_id=validate_input_tasks/attempt=1.log", :text=>"[2025-08-23T11:09:36.941+0300] {taskinstance.py:2614} INFO - Dependencies all met for dep_context=requeueable deps ti=<TaskInstance: abonent_info_delivery_janissary.validate_input_tasks manual__2025-08-23T08:08:46.090558+00:00 [queued]>"}
[2025-08-23T08:09:51,321][DEBUG][logstash.inputs.file ] Received line {:path=>"/usr/local/airflow/logs/dag_id=abonent_info_delivery_janissary/run_id=manual__2025-08-23T08:08:46.090558+00:00/task_id=validate_input_tasks/attempt=1.log", :text=>"[2025-08-23T11:09:36.941+0300] {taskinstance.py:2867} INFO - Starting attempt 1 of 1"}
[2025-08-23T08:09:51,322][DEBUG][logstash.inputs.file ] Received line {:path=>"/usr/local/airflow/logs/dag_id=abonent_info_delivery_janissary/run_id=manual__2025-08-23T08:08:46.090558+00:00/task_id=validate_input_tasks/attempt=1.log", :text=>"[2025-08-23T11:09:36.963+0300] {taskinstance.py:2890} INFO - Executing <Task(PythonOperator): validate_input_tasks> on 2025-08-23 11:08:46.090558+03:00"}
[2025-08-23T08:09:51,322][DEBUG][logstash.inputs.file ] Received line {:path=>"/usr/local/airflow/logs/dag_id=abonent_info_delivery_janissary/run_id=manual__2025-08-23T08:08:46.090558+00:00/task_id=validate_input_tasks/attempt=1.log", :text=>"[2025-08-23T11:09:36.969+0300] {standard_task_runner.py:72} INFO - Started process 1975 to run task"}
[2025-08-23T08:09:51,322][DEBUG][logstash.inputs.file ] Received line {:path=>"/usr/local/airflow/logs/dag_id=abonent_info_delivery_janissary/run_id=manual__2025-08-23T08:08:46.090558+00:00/task_id=validate_input_tasks/attempt=1.log", :text=>"[2025-08-23T11:09:36.974+0300] {standard_task_runner.py:104} INFO - Running: ['***', 'tasks', 'run', 'abonent_info_delivery_janissary', 'validate_input_tasks', 'manual__2025-08-23T08:08:46.090558+00:00', '--job-id', '12', '--raw', '--subdir', 'DAGS_FOLDER/abonent_info_delivery_janissary.dag.py', '--cfg-path', '/tmp/tmp6jcq0mdz']"}
[2025-08-23T08:09:51,322][DEBUG][logstash.inputs.file ] Received line {:path=>"/usr/local/airflow/logs/dag_id=abonent_info_delivery_janissary/run_id=manual__2025-08-23T08:08:46.090558+00:00/task_id=validate_input_tasks/attempt=1.log", :text=>"[2025-08-23T11:09:36.977+0300] {standard_task_runner.py:105} INFO - Job 12: Subtask validate_input_tasks"}
[2025-08-23T08:09:51,323][DEBUG][logstash.inputs.file ] Received line {:path=>"/usr/local/airflow/logs/dag_id=abonent_info_delivery_janissary/run_id=manual__2025-08-23T08:08:46.090558+00:00/task_id=validate_input_tasks/attempt=1.log", :text=>"[2025-08-23T11:09:37.089+0300] {task_command.py:467} INFO - Running <TaskInstance: abonent_info_delivery_janissary.validate_input_tasks manual__2025-08-23T08:08:46.090558+00:00 [running]> on host bigdata-03.main01.hadoop.apps.prod.int.nt-ias.ru"}
[2025-08-23T08:09:51,323][DEBUG][logstash.inputs.file ] Received line {:path=>"/usr/local/airflow/logs/dag_id=abonent_info_delivery_janissary/run_id=manual__2025-08-23T08:08:46.090558+00:00/task_id=validate_input_tasks/attempt=1.log", :text=>"[2025-08-23T11:09:37.296+0300] {taskinstance.py:3134} INFO - Exporting env vars: AIRFLOW_CTX_DAG_OWNER='***' AIRFLOW_CTX_DAG_ID='abonent_info_delivery_janissary' AIRFLOW_CTX_TASK_ID='validate_input_tasks' AIRFLOW_CTX_EXECUTION_DATE='2025-08-23T11:08:46.090558+03:00' AIRFLOW_CTX_TRY_NUMBER='1' AIRFLOW_CTX_DAG_RUN_ID='manual__2025-08-23T08:08:46.090558+00:00'"}
[2025-08-23T08:09:51,323][DEBUG][logstash.inputs.file ] Received line {:path=>"/usr/local/airflow/logs/dag_id=abonent_info_delivery_janissary/run_id=manual__2025-08-23T08:08:46.090558+00:00/task_id=validate_input_tasks/attempt=1.log", :text=>"[2025-08-23T11:09:37.297+0300] {taskinstance.py:732} INFO - ::endgroup::"}
[2025-08-23T08:09:51,323][DEBUG][logstash.inputs.file ] Received line {:path=>"/usr/local/airflow/logs/dag_id=abonent_info_delivery_janissary/run_id=manual__2025-08-23T08:08:46.090558+00:00/task_id=validate_input_tasks/attempt=1.log", :text=>"[2025-08-23T11:09:37.340+0300] {python.py:240} INFO - Done. Returned value was: None"}
[2025-08-23T08:09:51,324][DEBUG][logstash.inputs.file ] Received line {:path=>"/usr/local/airflow/logs/dag_id=abonent_info_delivery_janissary/run_id=manual__2025-08-23T08:08:46.090558+00:00/task_id=validate_input_tasks/attempt=1.log", :text=>"[2025-08-23T11:09:37.363+0300] {taskinstance.py:341} INFO - ::group::Post task execution logs"}
[2025-08-23T08:09:51,324][DEBUG][logstash.inputs.file ] Received line {:path=>"/usr/local/airflow/logs/dag_id=abonent_info_delivery_janissary/run_id=manual__2025-08-23T08:08:46.090558+00:00/task_id=validate_input_tasks/attempt=1.log", :text=>"[2025-08-23T11:09:37.364+0300] {taskinstance.py:353} INFO - Marking task as SUCCESS. dag_id=abonent_info_delivery_janissary, task_id=validate_input_tasks, run_id=manual__2025-08-23T08:08:46.090558+00:00, execution_date=20250823T110846, start_date=20250823T110936, end_date=20250823T110937"}
[2025-08-23T08:09:51,324][DEBUG][logstash.inputs.file ] Received line {:path=>"/usr/local/airflow/logs/dag_id=abonent_info_delivery_janissary/run_id=manual__2025-08-23T08:08:46.090558+00:00/task_id=validate_input_tasks/attempt=1.log", :text=>"[2025-08-23T11:09:37.467+0300] {local_task_job_runner.py:266} INFO - Task exited with return code 0"}
[2025-08-23T08:09:51,324][DEBUG][logstash.inputs.file ] Received line {:path=>"/usr/local/airflow/logs/dag_id=abonent_info_delivery_janissary/run_id=manual__2025-08-23T08:08:46.090558+00:00/task_id=validate_input_tasks/attempt=1.log", :text=>"[2025-08-23T11:09:37.492+0300] {local_task_job_runner.py:245} INFO - ::endgroup::"}
At the moment, in the Airflow GUI, in the logs of the validate_tasks stage, I have the following
*** Log abonent_info_delivery_janissary-validate_input_tasks-manual__2025-08-23T08:08:46.090558+00:00-1 not found in Elasticsearch. If your task started recently, please wait a moment and reload this page. Otherwise, the logs for this task instance may have been removed.
And in Elastic itself, I don't even see the creation of the airflow-logs index.