Starting an Azure Data Factory Pipeline from C# .Net

Introduction

Azure Data Factory (ADF) does an amazing job orchestrating data movement and transformation activities between cloud sources with ease.  Sometimes you may also need to reach into your on-premises systems to gather data, which is also possible with ADF through data management gateways.  However, you may run into a situation where you already have local processes running or you cannot run a specific process in the cloud, but you still want to have a ADF pipeline dependent on the data being processed locally.  For example you may have an ETL process that begins with a locally run process that stores data in Azure Data Lake.  Once that process is completed you want the ADF pipeline to being processing that data and any other activities or pipelines to follow.  The key is starting the ADF pipeline only after the local process has completed.  This post will highlight how to accomplish this through the use of the Data Factory Management API.

Prerequisites

Azure Data Factory

The purpose of this post is not to reiterate how to create ADF pipelines.  There are already plenty of resources out there to learn about ADF and accomplish that task.  So I assume you know how to create an ADF pipeline and already have a particular pipeline in mind for this.  If not please follow the detailed instructions here to create your own ADF pipeline.  With that being said there is three pieces of information you will need from the pipeline, the Resource Group Name, the Data Factory Name, and the Pipeline Name.  One other note would be to also set the IsPaused property to true in the JSON pipeline definition file.

Example Pipeline Definition:

{
  "$schema": "http://datafactories.schema.management.azure.com/schemas/2015-08-01/Microsoft.DataFactory.Pipeline.json",
  "name": "Pipeline_Copy_Date_Dimension",
  "properties": {
    "description": "",
    "activities": [
      {
        "name": "Copy_Date_D_ASDB_To_ASDW",
        "type": "Copy",
        "inputs": [
          { "name": "DataSet_ASDB_Date_D" }
        ],
        "outputs": [
          { "name": "DataSet_ASDW_Date_D" }
        ],
        "typeProperties": {
          "source": {
            "type": "SqlSource"
          },
          "sink": {
            "type": "SqlDWSink"
          }
        },
        "policy": {
          "concurrency": 1,
          "executionPriorityOrder": "OldestFirst",
          "retry": 3,
          "timeout": "01:00:00"
        },
        "scheduler": {
          "frequency": "Day",
          "interval": 1
        }
      }
    ],
    "start": "2015-12-07T00:00:00Z",
    "end": "2015-12-07T23:59:59Z",
    "isPaused": true
  }
}

Service Principal

After you have defined your ADF pipeline the next piece is being able to authenticate with the data factory management api.  In my post Accessing Azure Data Lake Store from an Azure Data Factory Custom .Net Activity the service principal is the key to utilizing the data factory management api from .Net.  As stated in my earlier post you can find instructions here on how to create an Azure Active Directory Application and Service Principal.  Once you have an Azure Active Directory Application created, the following pieces of information are needed in order to get an authorization token from Azure Active Directory, the Azure Tenant Id, Subscription Id, Client Id, and Key.

Visual Studio

Now that the pieces are in place we can begin using the Data Factory Management API to start the ADF pipeline.  There is no “execute” function to begin an ADF pipeline, what we will do is use the API to modify the existing pipeline configuration.  Specifically we will set isPaused to false and the start/end attributes to our desired running schedule.

Full source code of the sample is available on github.  Create a new console app and use nuget to import the following packages.

The sample application contains a DataFactoryHelper class which encapsulates the authentication and utilization of the Data Factory Management API.

        private void create_adf_client()
        {
            var authenticationContext = new AuthenticationContext($"https://login.windows.net/{tenant_id}");
            var credential = new ClientCredential(clientId: client_id, clientSecret: client_key);
            var result = authenticationContext.AcquireToken(resource: "https://management.core.windows.net/", clientCredential: credential);

            if (result == null)
            {
                throw new InvalidOperationException("Failed to obtain the JWT token");
            }

            var token = result.AccessToken;

            var _credentials = new TokenCloudCredentials(subscription_id, token);
            inner_client = new DataFactoryManagementClient(_credentials);
        }

The first step is to create the DataFactoryManagementClient, here I am utilizing the Azure Active Directory ClientId and Key to create the credentials needed to authenticate with the Data Factory API. If you read my earlier post about accessing the Azure Data Lake Store API from .NET, you will notice this code looks identical except I am creating a DataFactoryManagementClient instead of a DataLakeStoreFileSystemManagementClient.

public void StartPipeline(string resourceGroup, string dataFactory, string pipelineName, DateTime slice)
        {
            var pipeline = inner_client.Pipelines.Get(resourceGroup, dataFactory, pipelineName);

            pipeline.Pipeline.Properties.Start = DateTime.Parse($"{slice.Date:yyyy-MM-dd}T00:00:00Z");
            pipeline.Pipeline.Properties.End = DateTime.Parse($"{slice.Date:yyyy-MM-dd}T23:59:59Z");
            pipeline.Pipeline.Properties.IsPaused = false;

            inner_client.Pipelines.CreateOrUpdate(resourceGroup, dataFactory, new PipelineCreateOrUpdateParameters()
            {
                Pipeline = pipeline.Pipeline
            });
        }

Now that the client is initialized we can begin using it to modify our existing ADF pipeline.  The StartPipeline method invokes the clients Pipelines.Get() method to get the full json definition of the pipeline.  With this information we can now modify the pipelines properties to the desired running schedule and then set the isPaused property to false to enable the pipeline and finally saving the updated pipeline definition.  For this post and sample purposes I kick off the pipeline and end the job, however the ADF pipeline story does not need to end there.  You can use the Data Factory Management API to programmatically monitor the pipeline to ensure completion and then continue with other work if so inclined.

As a side note on monitoring Azure Data Factory Pipelines, there was a recent release of a new Management and Monitoring App for Azure Data Factory.

Conclusion

Azure Data Factory is a great service for orchestrating data transformations and movement activities between other cloud services.  However, you may find yourself dependent on a locally running process which is preventing you from running the entire routine in the cloud.  Hopefully this post shed some light on how to create a hybrid approach, by manually executing an ADF pipeline when a dependent local job completes.