Upgrading to Microsoft.Azure.Management.DataLake.Store 0.10.1-preview to Access Azure Data Lake Store Using C#

Introduction

Microsoft recently released a new nuget package to programmatically access the Azure Data Lake Store.  In a previous post Accessing Azure Data Lake Store from an Azure Data Factory Custom .Net Activity I am utilizing Microsoft.Azure.Management.DataLake.StoreFileSystem 0.9.6-preview to programmatically access the data lake using C#.  In this post I will go through what needs to be changed with my previous code to upgrade to the new nuget package.  I will also include a new version of the DataLakeHelper class which uses the updated sdk.

Upgrade Path

Since I already have a sample project utilizing the older sdk (Microsoft.Azure.Management.DataLake.StoreFileSystem 0.9.6-preview), I will use that as an example on what needs to be modified to use the updated nuget package (Microsoft.Azure.Management.DataLake.Store 0.10.1-preview).

The first step is to remove all packages which supported the obsolete sdk.  Here is the list of all packages that can be removed:

  • Hyak.Common
  • Microsoft.Azure.Common
  • Microsoft.Azure.Common.Dependencies
  • Microsoft.Azure.Management.DataLake.StoreFileSystem
  • Microsoft.Bcl
  • Microsoft.Bcl.Async
  • Microsoft.Bcl.Build
  • Microsoft.Net.Http

All of these dependencies are needed when using the DataLake.StoreFileSystem package.  In my previous sample I am also using Microsoft.Azure.Management.DataFactories in order to create a custom activity for Azure Data Factory, unfortunately this package has a dependency on all of the above packages as well.  Please be careful removing these packages as your own applications might have other dependencies on those listed above.  In order to show that these packages are no longer needed my new sample project is just a simple console application using the modified DataLakeHelper class, which can be found here on github.

Now let’s go through the few changes that need to be made to the DataLakeHelper class in order to use the new nuget package.  The following functions from the original DataLakeHelper class will need to be modified:

create_adls_client()
execute_create(string path, MemoryStream ms)
execute_append(string path, MemoryStream ms)

Here is the original code for create_adls_client():

 private void create_adls_client()
        {
            var authenticationContext = new AuthenticationContext($"https://login.windows.net/{tenant_id}");
            var credential = new ClientCredential(clientId: client_id, clientSecret: client_key);
            var result = authenticationContext.AcquireToken(resource: "https://management.core.windows.net/", clientCredential: credential);

            if (result == null)
            {
                throw new InvalidOperationException("Failed to obtain the JWT token");
            }

            string token = result.AccessToken;

            var _credentials = new TokenCloudCredentials(subscription_id, token);
            inner_client = new DataLakeStoreFileSystemManagementClient(_credentials);
        }

In order to upgrade to the new sdk, there are 2 changes that need to be made.

  1. The DataLakeStoreFileSystemManagementClient requires a ServiceClientCredentials object
  2. You must set the azure subscription id on the newly created client

The last 2 lines should now look like this:

var _credentials = new TokenCredentials(token);
inner_client = new DataLakeStoreFileSystemManagementClient(_credentials);
inner_client.SubscriptionId = subscription_id;

Now that we can successfully authenticate again with the Azure Data Lake Store, the next change is to the create and append methods.

Here is the original code for execute_create(string path, MemoryStream ms) and execute_append(string path, MemoryStream ms):

 private AzureOperationResponse execute_create(string path, MemoryStream ms)
        {
            var beginCreateResponse = inner_client.FileSystem.BeginCreate(path, adls_account_name, new FileCreateParameters());
            var createResponse = inner_client.FileSystem.Create(beginCreateResponse.Location, ms);
            Console.WriteLine("File Created");
            return createResponse;
        }

        private AzureOperationResponse execute_append(string path, MemoryStream ms)
        {
            var beginAppendResponse = inner_client.FileSystem.BeginAppend(path, adls_account_name, null);
            var appendResponse = inner_client.FileSystem.Append(beginAppendResponse.Location, ms);
            Console.WriteLine("Data Appended");
            return appendResponse;
        }

The change for both of these methods is pretty simple, the BeginCreate and BeginAppend methods are no longer available and the new Create and Append methods now take in the path and Azure Data Lake Store account name.

With the changes applied the new methods are as follows:

        private void execute_create(string path, MemoryStream ms)
        {
            inner_client.FileSystem.Create(path, adls_account_name, ms, false);
            Console.WriteLine("File Created");
        }

        private void execute_append(string path, MemoryStream ms)
        {
            inner_client.FileSystem.Append(path, ms, adls_account_name);
            Console.WriteLine("Data Appended");
        }

Conclusion

As you can see it was not difficult to upgrade to the new version of the sdk.  Unfortunately, since these are all preview bits changes like this can happen, hopefully this sdk has found its new home and it won’t go through too many more breaking changes for the end user.

Accessing Azure Data Lake Store from an Azure Data Factory Custom .Net Activity

04/05/2016 Update: If you are looking to use the latest version of the Azure Data Lake Store SDK (Microsoft.Azure.Management.DataLake.Store 0.10.1-preview) please see my post Upgrading to Microsoft.Azure.Management.DataLake.Store 0.10.1-preview to Access Azure Data Lake Store Using C# for what needs to be done to update the DataLakeHelper class.

Introduction

When working with Azure Data Factory (ADF), having the ability to take advantage of Custom .Net Activities greatly expands the ADF use case. One particular example where a Custom .Net Activity is necessary would be when you need to pull data from an API on a regular basis. For example you may want to pull sales leads from the Salesforce API on a daily basis or possibly some search query against the Twitter API every hour. Instead of having a console application scheduled on some VM or local machine, this can be accomplished with ADF and a Custom .Net Activity.

With the data extraction portion complete the next question is where would the raw data land for continued processing? Azure Data Lake Store of course! Utilizing the Azure Data Lake Store (ADLS) SDK, we can land the raw data into ADLS allowing for continued processing down the pipeline. This post will focus on an end to end solution doing just that, using Azure Data Factory and a Custom .Net Activity to pull data from the Salesforce API then landing it into ADLS for further processing.  The end to end solution will run inside a Custom .Net Activity but the steps here to connect to ADLS from .net are universal and can be used for any .net application.

Prerequisites

Continue reading “Accessing Azure Data Lake Store from an Azure Data Factory Custom .Net Activity”

The Lesser Known Resolution to the Unexpected Number of Columns Error Executing U-SQL on Azure Data Lake Analytics

Problem

Azure Data Lake Store (ADLS) gives you the ability to store all raw data in one location readily accessible for analysis. In my particular case I am pulling data from Salesforce and using the ADLS .net SDK to store the results in the data lake. If anyone has worked with Salesforce they know that it is possible to have an object with hundreds of custom fields, this leads to a file being stored in ADLS with hundreds of columns. One of the first translations I wanted to use U-SQL and Azure Data Lake Analytics (ADLA) for was to only evaluate a subset of the data by querying a few columns from the hundreds that might exist in a file.

An example script might look like this:

@tasks_raw =
    EXTRACT AccountId string,
            Id string,            
            OwnerId string,            
            Status string,
            Subject string,
	     ....
	     .... More Fields ....,
	     ....
            WhatCount int?,
            WhatId string,
            WhoCount int?,
            WhoId string
    FROM "/RawData/Salesforce/Task/2016/01/25/Task.csv"
    USING Extractors.Csv();



@tasks_subset =
    SELECT AccountId,
           ActivityDate,          
           CreatedById,
           CreatedDate,
           Id,           
           OwnerId,
           Status
    FROM @tasks_raw;



OUTPUT @tasks_subset
TO "/Subset/Salesforce/Task/2016/01/25/Task.csv"
USING Outputters.Csv();

The first step is to impose a schema on top of the current file by using the “Extract” syntax, there by storing that into a variable. Then from the newly created variable I can select only a subset of columns which I need for later processing to store. The common problem I have run into is with the Extract portion of this script. I have frequently received the Unexpected Number of Columns error as seen in the screen shot below.

image

Solution

The most common cause of this error, especially when the input file is a CSV, is a “comma” in one of the data cells. After I completed all possible sanitization of the data before writing it to ADLS using the .net SDK I still received this error.

I began a dialog with the Azure Data Lake team, who informed me that using the ADLS .net SDK to create and append data to a file in ADLS could be causing a column alignment issue. If you are using the .net SDK to create a file and then append data to it, be aware there is a 4 MB limit on the request. This means that if you send a request with N number of rows and that batch is over 4 MB, the request will terminate in the middle of the record and not in a record boundary delimiter. The solution here is simple, I needed to add some logic to batch the rows to be stored in 4 MB or less chunks, ensuring that the final row would end on a record boundary.

Some sample code would look like this:

public void StoreData(string path, List rows, bool append)
{
	var buffer = new MemoryStream();
	var sw = new StreamWriter(buffer);

	foreach (var row in rows)
	{
		if (buffer.Length + Encoding.UTF8.GetByteCount(row) > FOURMB)
		{
			buffer.Position = 0;
			if (append)
			{
				execute_append(path, buffer);
			}
			else
			{
				execute_create(path, buffer);
				append = true;
			}

			buffer = new MemoryStream();
			sw = new StreamWriter(buffer);
		}
		sw.Write(row);
		sw.Flush();
	}

	if (buffer.Length <= 0) return;

	buffer.Position = 0;
	if (append)
	{
		execute_append(path, buffer);
	}
	else
	{
		execute_create(path, buffer);
	}	

}


private AzureOperationResponse execute_create(string path, MemoryStream ms)
{
	var beginCreateResponse = inner_client.FileSystem.BeginCreate(path, DataLakeAppConfig.DataLakeAccountName, new FileCreateParameters());
	var createResponse = inner_client.FileSystem.Create(beginCreateResponse.Location, ms);
	return createResponse;
}

private AzureOperationResponse execute_append(string path, MemoryStream ms)
{

	var beginAppendResponse = inner_client.FileSystem.BeginAppend(path, DataLakeAppConfig.DataLakeAccountName, null);
	var appendResponse = inner_client.FileSystem.Append(beginAppendResponse.Location, ms);
	return appendResponse;
}


Conclusion

While working with any type of delimited file you no doubt have run into this unexpected number of columns error. This problem is exacerbated by the fact that you may have hundreds of columns in the file which makes it very difficult to track down. If you are using the ADLS SDK to store files in the data lake and you feel you have thoroughly gone through all other possible solutions, give this a shot. Either way it might be worth changing your storing pattern to avoid this problem on a future data file.