Blogs

Apache Airflow: Streamlined pipeline orchestration with Arena workflows

Avatar photo Team Zaloni July 29th, 2020

Create complex ETL Pipelines

Arena’s Workflow feature has a plethora of actions that can be used to create complex ETL pipelines using simple drag and drop controls. Once a workflow is created it can be scheduled to run at specific intervals or on demand. Workflows can also be chained together as Sub Workflows to create an even more complex pipeline. The power of Arena Workflows can be further augmented by using Apache Airflow, an open-source workflow management platform, as the orchestrator. Airflow can interact with Arena Workflows using Arena’s REST APIs to Create, Start, Monitor & Stop Workflows. In this blog post I will describe how easily this can be done.

 

Challenge: A common reporting ETL use case is to execute reporting jobs after all data has been loaded into the data lake. This requires the ability to execute the data load workflows first, followed by the reporting workflows. A naive approach would be to build one single workflow with both the data loading and reporting steps in it. This approach however, quickly falls apart as the number of sources and reporting queries increase. An optimal solution to this problem would be to create individual workflows that cater to specific tasks in the pipeline. Individual workflows for downloading data from different sources & for reporting can be created and then chained together with Airflow, which would trigger the workflows once the dependencies are met. The below diagram shows a simple reporting use case with 3 source & 2 reporting tables. Reporting table 1 depends on data from source table 1 & 2, while reporting table 2 depends on data from source table 3 & reporting table 1.

apache airflow

Solution: We will now see how this use case can be done using Arena Workflows and Apache Airflow. We will start by creating Arena workflows for exporting data from the source Database using Arena Workflow’s DB import action. DB import action allows exporting of 1 or more tables from JDBC compliant databases.  We will create 3 workflows for each of the 3 tables. Next we will create the reporting workflows using Arena Workflow’s Hive Action. Hive Action allows execution of custom SQL queries on Hive tables.

 

It’s very simple to create Arena workflows, simply drag and drop the actions that you need, fill in the necessary details and join the actions together. Make sure that you include the Start & Stop actions that indicate the beginning and end of a workflow. Once all the details have been filled and the actions joined save the workflow with a meaningful name. You can always refer to Arena’s documentation for help on any particular feature.

Workflow for Importing Table

Workflow for executing Reporting Query

 

Once the 5 workflows have been created it is time for some “Airflow”. We will start by defining two Python functions one for logging into Arena and another for executing workflows & monitoring them.

 

def login():

  login_url = ‘https://’+ipPort+’/bedrock-app/services/rest/login’

  payload = {“username”:user, “password”:password}

  data_json = json.dumps(payload)

  session = requests.Session()

  response = session.post(login_url, data=data_json)

  if (response.status_code != 200):

    response.raise_for_status()

  return session

 

The login() function obtains a session token by calling Arena’s login REST API endpoint. Once the session token is obtained we make the successive REST calls with it.

 

def execute_workflow(payload, wf_id, project_id,**kwargs):

  url1 = ‘https://’+ipPort+’/bedrock-app/services/rest/workflows/’+wf_id+’/execute?projectIds=’+project_id

  data_json = json.dumps(payload)

  headers = {‘Content-type’: ‘application/json’}

  session = login()

  response = session.post(url1, data=data_json,headers=headers)

  if(response.status_code != 200):

    response.raise_for_status()

    return

  instance_id = ‘0’

  for x in response.json()[‘responseMessage’].split():

    if(x.isdigit()):

      instance_id = x

      break

  status = “IN PROGRESS”

  url2 = ‘https://’+ipPort+’/bedrock-app/services/rest/workflows/instances/’+instance_id+’/status?projectIds=’+project_id

  while status == “IN PROGRESS”:

    time.sleep(30)

    response = session.get(url2)

    if (response.json()[‘result’] != None):

      status = response.json()[‘result’][‘wfStatus’]

      if (status == ‘FAILED’ or status == ‘STOPPED’):

        raise Exception(“Task Failed”)  

  return ‘workflow status ‘ + status

 

The execute_workflow() function executes individual workflows by passing in the workflow id and project id using the execute workflow REST API endpoint. This endpoint returns the workflow instance id which is then used to call the workflow status REST API endpoint at regular intervals to keep track of the Workflow’s status.

 

Once the functions are in place we start by building the Airflow DAG. We use Airflow’s Python operators to define individual workflows customizing each operator with the workflow’s id and project id. The operators call the execute_workflow() function internally passing in the needed parameters workflow id (wfid) & project id (projectId) specified in the op_kwargs field.

args = {‘owner’:’Bedrock’,

  ‘start_date’:datetime(2020,02,02),

}

dag = DAG(dag_id=’Reporting_DAG_V1′,schedule_interval=’30 0 * * *’,catchup=False, default_args=args)

Defines the Airflow DAG.

 

Import_Table_1_WF = PythonOperator(

  task_id= ‘Import_Table_1_WF’,

  python_callable=execute_workflow,

  provide_context=True,

  op_kwargs={‘payload’:{

    “wfName”: “Import_Table_1_WF”,

    “globalParameter”: “ex_pass,,,,,,,”,

    “executedBy”: executedBy,

    “wfLevelParameterList”: [],

    “wfNamespaceList”: [],

    “adminCapacityQueues”: {

      “queueName”: “default”

      }

    },

    ‘wfId’:’10’,

    ‘projectId’:’10’

  },

  dag=dag)

Python operator for workflow that imports data.

 

Reporting_Query_1_WF = PythonOperator(

  task_id= ‘Reporting_Query_1_WF’,

  python_callable=execute_workflow,

  provide_context=True,

  op_kwargs={‘payload’:{

    “wfName”: “Reporting_Query_1_WF”,

    “globalParameter”: “ex_pass,,,,,,,”,

    “executedBy”: executedBy,

    “wfLevelParameterList”: [],

    “wfNamespaceList”: [],

    “adminCapacityQueues”: {

      “queueName”: “default”

    }

    },

    ‘wfId’:’20’,

    ‘projectId’:’10’

  },

  dag=dag)

Python operator for workflow that executes reporting queries.

 

The next two lines of code is where the magic of specifying the dependencies happens.

Reporting_Query_1_WF.set_upstream([Import_Table_1_WF, Import_Table_2_WF])

Reporting_Query_2_WF.set_upstream([Import_Table_3_WF, Reporting_Query_1_WF])

With these two lines we have set that Reporting_Query_1_WF workflow depends on  the successful completion of Import_Table_1_WF & Import_Table_2_WF workflows. Similarly Reporting_Query_2_WF workflow  depends on the successful completion of Import_Table_3_WF & Reporting_Query_1_WF workflows.

apache airflow

View of the DAG in Airflow

 

Once the DAG is turned on Apache Airflow will execute it at the scheduled interval, maintaining the dependencies between the different workflows.

 

Conclusion: This blog post demonstrated how easy it is to use Arena Workflows in conjunction with Apache Airflow to create complex ETL pipelines with dependencies between different workflows. In the next blog post we will look at other useful Arena features that streamline your data pipelines.

apache airflow

 

about the author

This team of authors from Team Zaloni provide their expertise, best practices, tips and tricks and use cases across varied topics incuding: data governance, data catalog, dataops, observability, and so much more.