FETCH DATA FROM API The task fetch_data_from_api sends a GET request to the API endpoint https://api-sekolah-indonesia.vercel.app/sekolah?page=1&perPage=2000. The API response, which is in JSON format, is returned by this function.
Transform Data The task transform_data pulls the JSON data fetched in the previous task using Airflow's XCom mechanism. It extracts relevant data from the nested structure ( dataSekolah ) and converts it into a pandas DataFrame . Placeholder comments indicate where further data transformations could be applied using pandas.
Load Data to Database The task load_data_to_database pulls the transformed DataFrame from the previous task. It connects to a PostgreSQL database using the PostgresHook provided by Airflow. The data types of the DataFrame columns are analyzed and set to TEXT for simplicity. A SQL query is constructed to create a table if it doesn't already exist in the specified schema (public). The transformed data is then loaded into this table using the pandas to_sql method.
Detailed Components 1. Fetch Data from API def fetch_data_from_api (): response = requests.get ('https://api-sekolah-indonesia.vercel.app/sekolah?page=1&perPage=2000') data = response.json () return data Purpose: This function sends an HTTP GET request to the API to retrieve data. Data Structure: The data is expected to be in JSON format, with a key dataSekolah containing the school data.
2. Transform Data def transform_data (** kwargs ): data = kwargs [' ti ']. xcom_pull ( task_ids =' fetch_data_from_api ’) sekolah_data = data[' dataSekolah '] df = pd.DataFrame ( sekolah_data ) # Transformation 1 and 2 placeholders transformed_df = df return transformed_df Purpose: This function transforms the JSON data into a pandas DataFrame and performs data transformation. Data Structure: The DataFrame ( df ) holds the extracted data from dataSekolah .
3. Load Data to Database def load_data_to_database (** kwargs ): transformed_data = kwargs [' ti ']. xcom_pull ( task_ids =' transform_data ') postgres_hook = PostgresHook ( postgres_conn_id =' postgres ') data_types = {col: 'TEXT' for col, dtype in transformed_data.dtypes.items ()} create_query = f""" CREATE TABLE IF NOT EXISTS public.target_table ( {', '.join([f'{col} { data_types [col]}' for col in transformed_data.columns ])} ); """ postgres_hook.run ( create_query ) transformed_data.to_sql (' target_table ', postgres_hook.get_sqlalchemy_engine (), schema='public', if_exists ='append', index=False) Purpose: This function creates a table if it doesn't exist and loads the transformed data into the database. Data Structure: The DataFrame's column data types are analyzed and set to TEXT.
with DAG(' api_to_database_dag ', default_args = default_args , schedule_interval ='@daily', catchup=False) as dag : extract_task = PythonOperator ( task_id =' fetch_data_from_api ', python_callable = fetch_data_from_api ) transform_task = PythonOperator ( task_id =' transform_data ', python_callable = transform_data , provide_context =True ) load_task = PythonOperator ( task_id =' load_data_to_database ', python_callable = load_data_to_database , provide_context =True ) extract_task >> transform_task >> load_task DAG Name: api_to_database_dag Schedule: The DAG is scheduled to run daily (@daily). Airflow DAG Definition
Tasks: fetch_data_from_api : Fetches data from the API. transform_data : Transforms the fetched data. load_data_to_database : Loads the transformed data into the PostgreSQL database. Task Dependencies: The tasks are chained so that fetch_data_from_api runs first, followed by transform_data , and finally load_data_to_database .