OiO.lk Community platform!

Oio.lk is an excellent forum for developers, providing a wide range of resources, discussions, and support for those in the developer community. Join oio.lk today to connect with like-minded professionals, share insights, and stay updated on the latest trends and technologies in the development field.
  You need to log in or register to access the solved answers to this problem.
  • You have reached the maximum number of guest views allowed
  • Please register below to remove this limitation

Airflow XCom not retrieving values between tasks in DAG

  • Thread starter Thread starter daniel guo
  • Start date Start date
D

daniel guo

Guest
I'm experiencing an issue with Apache Airflow where values pushed to XCom in one task are not retrievable in a subsequent task within the same DAG. Here is a minimal example of my code:

Code:
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.utils.dates import days_ago
from pathlib import Path
from airflow.utils.log.logging_mixin import LoggingMixin

task_logger = LoggingMixin().log

def create_folders_with_subfolders(base_path, **kwargs):
    execution_date = kwargs['execution_date']
    dir_name = execution_date.strftime("%Y%m%d")
    main_folder = Path(base_path) / dir_name
    main_folder.mkdir(parents=True, exist_ok=True)

    subfolders = ['sftp_raw_data', 'validate_delimiter', 'lowercase', 'data_quality', 'process_files']
    paths = {}
    for subfolder in subfolders:
        subfolder_path = main_folder / subfolder
        subfolder_path.mkdir(exist_ok=True)
        paths[subfolder] = str(subfolder_path)
        kwargs['ti'].xcom_push(key=subfolder, value=str(subfolder_path))
        task_logger.info(f"XCom push for {subfolder}: {str(subfolder_path)}")
    task_logger.info(f"paths {paths}")

def validate_delimiter_and_load_csv(ti, **kwargs):
    task_logger.info(f"ti {ti}")
    zip_path = ti.xcom_pull(task_ids='create_dirs_task', key='sftp_raw_data')
    output_dir = ti.xcom_pull(task_ids='create_dirs_task', key='validate_delimiter')

    task_logger.info(f"zip_path pulled from XCom: {zip_path}")
    task_logger.info(f"output_dir pulled from XCom: {output_dir}")

with DAG(
    dag_id="example_dag",
    start_date=days_ago(1),
    catchup=False,
    schedule_interval="0 6,18 * * *",
    tags=["example"],
) as dag:

    create_dirs_task = PythonOperator(
        task_id="create_dirs_task",
        python_callable=create_folders_with_subfolders,
        op_kwargs={'base_path': '/path/to/directory'},
        provide_context=True,
    )

    validate_task = PythonOperator(
        task_id="validate_delimiter_and_load_csv",
        python_callable=validate_delimiter_and_load_csv,
        provide_context=True,
    )

    create_dirs_task >> validate_task

Description: In the DAG defined above, I have two tasks: create_dirs_task which creates directories and pushes paths to XCom, and validate_delimiter_and_load_csv which is supposed to pull these paths from XCom. However, the paths are not being pulled correctly in the validate_delimiter_and_load_csv task — the logs show None for both zip_path and output_dir.

The XCom push seems to work fine as verified by the logs and the Airflow UI showing the XCom entries. Yet, when attempting to pull these entries in the subsequent task, they return None. Both tasks use provide_context=True.

I would appreciate any suggestions on what might be causing this issue and how to resolve it. Thank you!
<p>I'm experiencing an issue with Apache Airflow where values pushed to XCom in one task are not retrievable in a subsequent task within the same DAG. Here is a minimal example of my code:</p>
<pre class="lang-py prettyprint-override"><code>from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.utils.dates import days_ago
from pathlib import Path
from airflow.utils.log.logging_mixin import LoggingMixin

task_logger = LoggingMixin().log

def create_folders_with_subfolders(base_path, **kwargs):
execution_date = kwargs['execution_date']
dir_name = execution_date.strftime("%Y%m%d")
main_folder = Path(base_path) / dir_name
main_folder.mkdir(parents=True, exist_ok=True)

subfolders = ['sftp_raw_data', 'validate_delimiter', 'lowercase', 'data_quality', 'process_files']
paths = {}
for subfolder in subfolders:
subfolder_path = main_folder / subfolder
subfolder_path.mkdir(exist_ok=True)
paths[subfolder] = str(subfolder_path)
kwargs['ti'].xcom_push(key=subfolder, value=str(subfolder_path))
task_logger.info(f"XCom push for {subfolder}: {str(subfolder_path)}")
task_logger.info(f"paths {paths}")

def validate_delimiter_and_load_csv(ti, **kwargs):
task_logger.info(f"ti {ti}")
zip_path = ti.xcom_pull(task_ids='create_dirs_task', key='sftp_raw_data')
output_dir = ti.xcom_pull(task_ids='create_dirs_task', key='validate_delimiter')

task_logger.info(f"zip_path pulled from XCom: {zip_path}")
task_logger.info(f"output_dir pulled from XCom: {output_dir}")

with DAG(
dag_id="example_dag",
start_date=days_ago(1),
catchup=False,
schedule_interval="0 6,18 * * *",
tags=["example"],
) as dag:

create_dirs_task = PythonOperator(
task_id="create_dirs_task",
python_callable=create_folders_with_subfolders,
op_kwargs={'base_path': '/path/to/directory'},
provide_context=True,
)

validate_task = PythonOperator(
task_id="validate_delimiter_and_load_csv",
python_callable=validate_delimiter_and_load_csv,
provide_context=True,
)

create_dirs_task >> validate_task
</code></pre>
<p>Description:
In the DAG defined above, I have two tasks: create_dirs_task which creates directories and pushes paths to XCom, and validate_delimiter_and_load_csv which is supposed to pull these paths from XCom. However, the paths are not being pulled correctly in the validate_delimiter_and_load_csv task — the logs show None for both zip_path and output_dir.</p>
<p>The XCom push seems to work fine as verified by the logs and the Airflow UI showing the XCom entries. Yet, when attempting to pull these entries in the subsequent task, they return None. Both tasks use provide_context=True.</p>
<p>I would appreciate any suggestions on what might be causing this issue and how to resolve it. Thank you!</p>
 

Latest posts

Online statistics

Members online
0
Guests online
3
Total visitors
3
Top