Accessing Azure Data Lake Store from an Azure Data Factory Custom .Net Activity

04/05/2016 Update: If you are looking to use the latest version of the Azure Data Lake Store SDK (Microsoft.Azure.Management.DataLake.Store 0.10.1-preview) please see my post Upgrading to Microsoft.Azure.Management.DataLake.Store 0.10.1-preview to Access Azure Data Lake Store Using C# for what needs to be done to update the DataLakeHelper class.

Introduction

When working with Azure Data Factory (ADF), having the ability to take advantage of Custom .Net Activities greatly expands the ADF use case. One particular example where a Custom .Net Activity is necessary would be when you need to pull data from an API on a regular basis. For example you may want to pull sales leads from the Salesforce API on a daily basis or possibly some search query against the Twitter API every hour. Instead of having a console application scheduled on some VM or local machine, this can be accomplished with ADF and a Custom .Net Activity.

With the data extraction portion complete the next question is where would the raw data land for continued processing? Azure Data Lake Store of course! Utilizing the Azure Data Lake Store (ADLS) SDK, we can land the raw data into ADLS allowing for continued processing down the pipeline. This post will focus on an end to end solution doing just that, using Azure Data Factory and a Custom .Net Activity to pull data from the Salesforce API then landing it into ADLS for further processing.  The end to end solution will run inside a Custom .Net Activity but the steps here to connect to ADLS from .net are universal and can be used for any .net application.

Prerequisites

Service Principal

The first step to accessing the Azure Data Lake Store API from an Azure Data Factory custom activity is to create a service principal. Creating a service principal is the key piece to get this communication working.  I originally followed the example here: Data Lake Store .Net SDK, which walks through utilizing the SDK to access the data lake store from .net.  The problem with this example is the execution of the authentication code requires a username and password to be entered.  An ADF custom activity is run within an ADF pipeline in azure by an HD Insight On Demand Cluster or Azure Batch Service and you would not be able to complete a manual authentication workflow with these services.  This is where creating an Azure Active Directory Application and Service Principal comes into play.

The article provided here does a great job explaining what needs to be done to setup an Active Directory Application and Service Principal through the Azure Portal so I will not be covering it in detail.  The key pieces of information from the article above that will be needed later on are, the name of the AD Application created, the Azure Tenant Id, Client Id and Key.  All are needed for accessing the data lake from a custom activity.

Azure Data Lake Store

Now that you have completed the task of setting up an AD Application you will need to give that application access to your Azure Data Lake Store account.  If you skip this step, the service principal credentials will not have permissions to the ADLS file system to create and modify files through the API.

You will receive an error like the following:

{"RemoteException":{"exception":"AccessControlException","message":"FsOpenStream failed with error 0x83090aa2 (). [4cfa8eeb2c924ee099799473a55aa7d4]","javaClassName":"org.apache.hadoop.security.AccessControlException"}}

Through the azure portal data lake store blade it is easy to give the AD Application permissions to the ADLS file system.  In the azure portal open the ADLS blade and select the Data Explorer.

image

With the data explorer open, there will be a key icon which will bring you to the access blade to define the permissions.  It is important to note you must have the root folder selected for the access icon to be active.

image

Once on the access blade you click the add button to be presented with the option to search for a User or Group to give access.

image

Selecting that option will display a search box, this is where you will type in the name of the AD Application you created in the previous section.

image

Once selected you will give it Read/Write/Execute permissions, or whatever level of access you would like it to have.

image

After completing these steps and committing those changes, the ClientId/Key combination will now allow programmatic access to the ADLS file system.

Visual Studio – IDotNetActivity & Data Lake Store SDK

There is already a great walk through of the entire process of creating a custom activity here.  My goal is not to reiterate the same steps, but to provide an example utilizing the ADLS .Net SDK, full source code of the sample is available on github.  A custom activity is simply a library with classes that implement the IDotNetActivity interface.  With that being said you will need to create a new class library in visual studio and import the following packages using nuget.

In the sample application I have wrapped the usage of the data lake sdk in a DataLakeHelper class.  This class creates the data lake file system client utilizing the service principal credentials created above.  The client id is the username and the key is the password.

private void create_adls_client()
{

    var authenticationContext = new AuthenticationContext($"https://login.windows.net/{tenant_id}");
    var credential = new ClientCredential(clientId: client_id, clientSecret: client_key);
    var result = authenticationContext.AcquireToken(resource: "https://management.core.windows.net/", clientCredential: credential);

    if (result == null)
    {
        throw new InvalidOperationException("Failed to obtain the JWT token");
    }

    string token = result.AccessToken;

    var _credentials = new TokenCloudCredentials(subscription_id, token);
    inner_client = new DataLakeStoreFileSystemManagementClient(_credentials);
}

The create_adls_client method follows the same pattern from the Microsoft article, however instead of needing to prompt the user with an OAuth window, we can provide the client_id and client_key to create a credential object to authenticate with the data lake.

In this particular case I am receiving N number of records from a Salesforce api SOQL query, so I will not be using the upload file method from the SDK but rather create a file directly in the data lake file system and then continually append data to it in chunks.  In my DataLakeHelper I expose a StoreData method, which takes the path to store the file, list of comma delimited strings as rows to write to the file and finally a flag to determine if we are creating or appending that data.

      
public void StoreData(string path, List rows, bool append)
{
	try
	{
		var buffer = new MemoryStream();
		var sw = new StreamWriter(buffer);

		foreach (var row in rows)
		{
			//Ensure the request is below 4mb in size to avoid column alignment issue
			if (buffer.Length + Encoding.UTF8.GetByteCount(row) > 3500000)
			{
				buffer.Position = 0;
				if (append)
				{
					execute_append(path, buffer);
				}
				else
				{
					execute_create(path, buffer);
					append = true;
				}

				buffer = new MemoryStream();
				sw = new StreamWriter(buffer);
			}
			sw.WriteLine(row);
			sw.Flush();
		}

		if (buffer.Length <= 0) return;

		buffer.Position = 0;
		if (append)
		{
			execute_append(path, buffer);
		}
		else
		{
			execute_create(path, buffer);
		}
	}
	catch (Exception e)
	{
		throw;
	}

}

private AzureOperationResponse execute_create(string path, MemoryStream ms)
{
	var beginCreateResponse = inner_client.FileSystem.BeginCreate(path, adls_account_name, new FileCreateParameters());
	var createResponse = inner_client.FileSystem.Create(beginCreateResponse.Location, ms);
	Console.WriteLine("File Created");
	return createResponse;
}

private AzureOperationResponse execute_append(string path, MemoryStream ms)
{
	var beginAppendResponse = inner_client.FileSystem.BeginAppend(path, adls_account_name, null);
	var appendResponse = inner_client.FileSystem.Append(beginAppendResponse.Location, ms);
	Console.WriteLine("Data Appended");
	return appendResponse;
}

You will notice that my StoreData method is implementing a buffering scheme because the create and append api request size must not exceed 4 MB.  Please refer to the post here for a more detailed explanation of why I needed to do this and what problems this may present.

Here is a snippet from the sample custom .net activity, utilizing the DataLakeHelper:

                
var rows = new List();
var count = 0;
var is_file_created = false;

foreach (var row in sf_helper.GetLeads())
{
	rows.Add(row.ToString());
	count++;

	if (count != 2000) continue;

	adl_helper.StoreData(current_dir + file_name, rows, is_file_created);

	if (!is_file_created) is_file_created = true;

	count = 0;
	rows = new List();
}

if (count <= 0) return new Dictionary();

adl_helper.StoreData(current_dir + file_name, rows, is_file_created);

As you can see from the usage of the DataLakeHelper above, there is really no indication that the code is being used for a custom activity in azure data factory.  The point is this can be used for any application that will need to interact with the data lake file system.

Azure Data Factory

The final step is to automate this process with azure data factory.  All the data factory configuration files will be included in the sample on github.

Here is what the pipeline configuration would look like:

{
  "$schema": "http://datafactories.schema.management.azure.com/schemas/2015-08-01/Microsoft.DataFactory.Pipeline.json",
  "name": "Pipeline_CustomActivity_To_ADLS_Example",
  "properties": {
    "description": "",
    "activities": [
      {
        "linkedServiceName": "LinkedService_HDION_Sample",
        "name": "CustomActivity_Download_Sf_Leads_To_ADLS",
        "outputs": [
          { "name": "DataSet_ADLS_SfLeads" }
        ],
        "policy": {
          "timeout": "02:00:00",
          "concurrency": 1,
          "retry": 3
        },
        "scheduler": {
          "frequency": "Day",
          "interval": 1
        },
        "type": "DotNetActivity",
        "typeProperties": {
          "assemblyName": "CustomActivity.dll",
          "entryPoint": "CustomActivity.SampleActivity",
          "packageLinkedService": "LinkedService_AS_EPTStorage",
          "packageFile": "custom-activities/CustomActivity.zip",
          "extendedProperties": {
            "SliceStart": "$$Text.Format('{0:MM-dd-yyyy}', SliceStart)",
            "RootFolder": "RawData",
            "FileName": "SampleLeads"
          }
        }
      }
    ],
    "start": "2015-12-07T00:00:00Z",
    "end": "2015-12-07T23:59:59Z",
    "isPaused": false
  }
}

You can see that the naming convention I use for the configuration files tends to be verbose.  Please have a look here for a better understanding of why I am doing that.  This example pipeline is using an HD Insight On Demand cluster to execute the custom activity, the other option is to use Azure Batch.  The article referenced above goes into detail about each option.  Notice that you need a storage account as well, this is where the actual code will be stored in a zipped archive.  Finally an important thing I wanted to highlight with this pipeline definition is the use of extendedProperties.  The extended properties are any values that the activity might need injected, such as the SliceStart, in this case it is used to create a year/month/day file path for the azure data lake store. 

Conclusion

In this post I outlined a simple use case that can take full advantage of the automation power in ADF, by running completely custom code in an activity.  As you have seen that simple use case does involve many technologies that require different steps to get integrated.  There is already a wealth of information to accomplish the steps separately, however I hoped to bring those resources together in one cohesive example.  The key integration piece that allows the communication is the creation of an Azure Active Directory Application and accompanying Service Principal.  Hopefully this example shows you the power of using the Custom Activity in ADF and what kinds of solutions you can accomplish when bringing different azure services together.