Iterate Over the Results of an Azure SQL Stored Procedure in an Azure Logic App

Introduction

If you need to quickly create a business workflow or other B2B integration, look no further then Azure Logic Apps. Azure Logic Apps is a cloud based workflow and integration service. With the multitude of built in connectors you can easily automate many common business workflows. The Azure SQL Database connector and execute stored procedure action were of interest to me when I needed to implement an automated email notification. The output of a stored procedure from a source system would provide me a list of users who would receive reminder emails. Azure Logic Apps provides the capability of iterating over results of an action, however iterating over the results of a stored procedure is not as straight forward as some of the other examples. In this post I am going to use the automating email notifications example to illustrate what I needed to do to iterate over the results of a stored procedure. Continue reading “Iterate Over the Results of an Azure SQL Stored Procedure in an Azure Logic App”

Running an Azure Data Factory Pipeline on a Weekday Schedule Using an Azure Function

Introduction

I have written a few posts about different aspects of Azure Data Factory.  I use it as the main workhorse of my data integration and ETL projects.  One major drawback I have found with Azure Data Factory is the scheduling system, it’s not as flexible as I and many others would like it to be.  With that being said there are certainly ways to adapt and get more control of an Azure Data Factory pipeline execution.  In my post Starting an Azure Data Factory Pipeline from C# .Net, I outline the need to kick off a pipeline after a local job has completed and how this can be attained by utilizing the SDK to programmatically set the pipelines Start/End dates.  You may not have that requirement specifically, but let’s say you want to only run a pipeline during the weekday or another specific schedule, this can be accomplished by utilizing the same code from my prior post and scheduling a local console app.  However, I thought it would be more fun to utilize Azure Functions to kick off a pipeline on a weekday schedule to provide a fully cloud based solution. Continue reading “Running an Azure Data Factory Pipeline on a Weekday Schedule Using an Azure Function”

Azure Data Factory Copy Activity Storage Failure Error

I was recently tasked with migrating local integration jobs into the cloud.  The first job I began to tackle was an integration job between Salesforce and a local financial system.  The integration workflow is to first download the newest entries of a specific object from salesforce, then to push that into an on premise sql server staging table.  Once the data is copied locally another job within the financial system will process the staged data. 

In order to accomplish this task, I decided to use Azure Data Factory, with the on premise gateway to connect to the local sql database.  If you have seen some of my other posts I have used Azure Data Lake Store to land my data, with this pipeline I decided to use Azure Blob Storage.  Since the Azure Blob Storage api has the ability to store Append Blobs, I was able to follow a similar pattern as I followed using the Azure Data Lake Store and append the Salesforce data to a blob as I fetched the data.

Once I had some sample Append Blobs in my container my next step was to setup the Azure Data Factory copy activity to get that data transferred to the on premise sql server staging tables.  This was where I began to run into issues.  After a lot of verification and testing, it turns out Append Blobs are not supported in Azure Data Factory.

Here are a few things to look out for to rule out this issue:

  • In the blob container blade, it will show the BlobType, check the type of the blobs you are trying to work with in Azure Data Factory.clip_image001
  • I also ran into an issue where the data set which was pointing to the AppendBlob would not validate.
  • When running the Azure Data Factory copy activity against an Append Blob you will see the following error:
    Copy activity met storage operation failure at ‘Source’ side. Error message from storage execution: Requested value ‘AppendBlob’ was not found.
    This can be a bit misleading if you are not aware AppendBlobs are not supported.

Error 40197 Error Code 4815 Bulk Insert into Azure SQL Database

I have been working on a large scale ETL project, one of the data sources I regularly pull data from is Salesforce.  If anyone has worked with Salesforce they know it can be a free for all with object field changes.  This alone makes pulling data regularly from the Salesforce REST API difficult since there can be so much activity with the objects.  Not to mention the fact that the number of custom fields that can be added to a single object ranges into the hundreds.  With these factors in play it can be a daunting task to debug errors when they inevitably crop up.

During a recent run of my process I started to receive the following error:

Error 40197 The service has encountered an error processing your request. Please try again. Error code 4815. A severe error occurred on the current command. The results, if any, should be discarded.

Here is a list of error codes however it was not too helpful in my situation: SQL error codes

In my ETL process I am using Entity Framework and EntityFramework.BulkInsert-ef6, so I am doing bulk inserts into my Azure SQL Database.  Since I know there is a good chance there was a change with the object definition in Salesforce that could be the cause of this error that is where I started to investigate.  As it turns out one of the fields length was changed from 40 to 60, which means the original table I have created with a column size varchar(40) is going to have a problem.  In my case this error happened when the amount of data was larger then the field size definition in the table.  Hopefully this post will give someone else another troubleshooting avenue for this error.

Upgrading to Microsoft.Azure.Management.DataLake.Store 0.10.1-preview to Access Azure Data Lake Store Using C#

Introduction

Microsoft recently released a new nuget package to programmatically access the Azure Data Lake Store.  In a previous post Accessing Azure Data Lake Store from an Azure Data Factory Custom .Net Activity I am utilizing Microsoft.Azure.Management.DataLake.StoreFileSystem 0.9.6-preview to programmatically access the data lake using C#.  In this post I will go through what needs to be changed with my previous code to upgrade to the new nuget package.  I will also include a new version of the DataLakeHelper class which uses the updated sdk.

Upgrade Path

Since I already have a sample project utilizing the older sdk (Microsoft.Azure.Management.DataLake.StoreFileSystem 0.9.6-preview), I will use that as an example on what needs to be modified to use the updated nuget package (Microsoft.Azure.Management.DataLake.Store 0.10.1-preview).

The first step is to remove all packages which supported the obsolete sdk.  Here is the list of all packages that can be removed:

  • Hyak.Common
  • Microsoft.Azure.Common
  • Microsoft.Azure.Common.Dependencies
  • Microsoft.Azure.Management.DataLake.StoreFileSystem
  • Microsoft.Bcl
  • Microsoft.Bcl.Async
  • Microsoft.Bcl.Build
  • Microsoft.Net.Http

All of these dependencies are needed when using the DataLake.StoreFileSystem package.  In my previous sample I am also using Microsoft.Azure.Management.DataFactories in order to create a custom activity for Azure Data Factory, unfortunately this package has a dependency on all of the above packages as well.  Please be careful removing these packages as your own applications might have other dependencies on those listed above.  In order to show that these packages are no longer needed my new sample project is just a simple console application using the modified DataLakeHelper class, which can be found here on github.

Now let’s go through the few changes that need to be made to the DataLakeHelper class in order to use the new nuget package.  The following functions from the original DataLakeHelper class will need to be modified:

create_adls_client()
execute_create(string path, MemoryStream ms)
execute_append(string path, MemoryStream ms)

Here is the original code for create_adls_client():

 private void create_adls_client()
        {
            var authenticationContext = new AuthenticationContext($"https://login.windows.net/{tenant_id}");
            var credential = new ClientCredential(clientId: client_id, clientSecret: client_key);
            var result = authenticationContext.AcquireToken(resource: "https://management.core.windows.net/", clientCredential: credential);

            if (result == null)
            {
                throw new InvalidOperationException("Failed to obtain the JWT token");
            }

            string token = result.AccessToken;

            var _credentials = new TokenCloudCredentials(subscription_id, token);
            inner_client = new DataLakeStoreFileSystemManagementClient(_credentials);
        }

In order to upgrade to the new sdk, there are 2 changes that need to be made.

  1. The DataLakeStoreFileSystemManagementClient requires a ServiceClientCredentials object
  2. You must set the azure subscription id on the newly created client

The last 2 lines should now look like this:

var _credentials = new TokenCredentials(token);
inner_client = new DataLakeStoreFileSystemManagementClient(_credentials);
inner_client.SubscriptionId = subscription_id;

Now that we can successfully authenticate again with the Azure Data Lake Store, the next change is to the create and append methods.

Here is the original code for execute_create(string path, MemoryStream ms) and execute_append(string path, MemoryStream ms):

 private AzureOperationResponse execute_create(string path, MemoryStream ms)
        {
            var beginCreateResponse = inner_client.FileSystem.BeginCreate(path, adls_account_name, new FileCreateParameters());
            var createResponse = inner_client.FileSystem.Create(beginCreateResponse.Location, ms);
            Console.WriteLine("File Created");
            return createResponse;
        }

        private AzureOperationResponse execute_append(string path, MemoryStream ms)
        {
            var beginAppendResponse = inner_client.FileSystem.BeginAppend(path, adls_account_name, null);
            var appendResponse = inner_client.FileSystem.Append(beginAppendResponse.Location, ms);
            Console.WriteLine("Data Appended");
            return appendResponse;
        }

The change for both of these methods is pretty simple, the BeginCreate and BeginAppend methods are no longer available and the new Create and Append methods now take in the path and Azure Data Lake Store account name.

With the changes applied the new methods are as follows:

        private void execute_create(string path, MemoryStream ms)
        {
            inner_client.FileSystem.Create(path, adls_account_name, ms, false);
            Console.WriteLine("File Created");
        }

        private void execute_append(string path, MemoryStream ms)
        {
            inner_client.FileSystem.Append(path, ms, adls_account_name);
            Console.WriteLine("Data Appended");
        }

Conclusion

As you can see it was not difficult to upgrade to the new version of the sdk.  Unfortunately, since these are all preview bits changes like this can happen, hopefully this sdk has found its new home and it won’t go through too many more breaking changes for the end user.

Azure Data Factory Pipeline Not Running (Pending Execution)

When starting out learning about Azure Data Factory, the first example usually completed is a basic copy activity.  A great tutorial on moving data from an azure blob to an azure sql table can be found here.  I think one of the key pieces of the data movement tutorial that gets missed is setting the external property on the input blob json definition to true.  Setting this to true lets azure data factory know that the data is being externally generated and not from another pipeline in the factory.  When I began trying my hand at creating pipelines I completely missed this note and was scratching my head wondering why the pipeline would never execute.  With that being said I wanted to just walk through a few screens from the azure portal that are very helpful when trying to understand what is happening with your pipeline.

Suppose we have a simple copy example as follows:

image

Looking at the SampleCopyPipeline blade you will see both datasets:

image

Selecting the InputBlob, to open that blade you will notice the first problem.  In the monitoring section of the InputBlob, there are no slices ready to be worked on.  The output of the copy activity is directly the dependent on there being an input that needs to be processed.

image

Now selecting the OutputTable, to open that blade you will see the common Pending Execution status for the output slice.

image

As I mentioned before the output slice is directly dependent on the input slice being ready for processing, in order for this slice to begin processing the corresponding input slice needs to be ready.  We can see the culprit once we select the output slice to open the output slice blade, which shows the dependent upstream slices it is waiting for.

image

The input slice is there, however the status is None.  Now the solution as I stated above is to set the external property of the input blob to true.  After that is complete we can now see that the input blob blade shows a slice Ready for processing.

image

We can also see the corresponding output slice blade no longer shows it is waiting on the input slice.

image

Conclusion

When first learning about Azure Data Factory, it may be difficult to follow exactly what is happening with the pipeline.  From the azure portal and the new Azure Data Factory Monitoring App you can get a lot of information about the status of your pipeline.  Hopefully this example gave you a simple way to start using the portal to better understand what is happening and troubleshoot any possible issues.

Starting an Azure Data Factory Pipeline from C# .Net

Introduction

Azure Data Factory (ADF) does an amazing job orchestrating data movement and transformation activities between cloud sources with ease.  Sometimes you may also need to reach into your on-premises systems to gather data, which is also possible with ADF through data management gateways.  However, you may run into a situation where you already have local processes running or you cannot run a specific process in the cloud, but you still want to have a ADF pipeline dependent on the data being processed locally.  For example you may have an ETL process that begins with a locally run process that stores data in Azure Data Lake.  Once that process is completed you want the ADF pipeline to being processing that data and any other activities or pipelines to follow.  The key is starting the ADF pipeline only after the local process has completed.  This post will highlight how to accomplish this through the use of the Data Factory Management API.

Prerequisites

Continue reading “Starting an Azure Data Factory Pipeline from C# .Net”

Resolving InvalidTemplate Error Running Azure Logic App Manually (Run Now)

Problem

Recently I have been testing out Azure Logic Apps as a possible tool for enterprise application data synchronization.  The first example I spiked out was to pull newly entered data from a table in an Azure SQL Database and push it into a Salesforce object.  This is a pretty common example and extremely easy to implement with an Azure Logic App.  My scenario included a Microsoft SQL Connector to my Azure SQL Database with a polling query, set to execute every hour.  I added a Salesforce Connector mapping the database fields to the custom object defined in Salesforce.  Very basic logic app, with a sql trigger and create action on the Salesforce Connector.  The next step was to test this out to see if it worked, I could have waited the hour execution interval, but I noticed a Run Now button on the main logic app blade.

image

Every time I clicked the Run Now button for the logic app I received the following error:

{"code":"InvalidTemplate","message":"Unable to process template language expressions in action 'salesforceconnector' inputs at line '1' and column '11': 'Template language expression cannot be evaluated: the property 'outputs' cannot be selected.'."}

Solution

The solution here is pretty simple, when you have a logic app with a trigger defined, DO NOT click the Run Now button.  If you would like to run the logic app manually you will notice the first tile that comes up has a check box to do just that.

image

I never even noticed this option until I ran into this issue.  Use this option if you need manual control over the logic app execution.

Note: As of this writing I do know the logic app designer is getting a major overhaul in the coming weeks.  I will do my best to update this post once those new bits become available.  Watch this to see a demo of what is coming.

Accessing Azure Data Lake Store from an Azure Data Factory Custom .Net Activity

04/05/2016 Update: If you are looking to use the latest version of the Azure Data Lake Store SDK (Microsoft.Azure.Management.DataLake.Store 0.10.1-preview) please see my post Upgrading to Microsoft.Azure.Management.DataLake.Store 0.10.1-preview to Access Azure Data Lake Store Using C# for what needs to be done to update the DataLakeHelper class.

Introduction

When working with Azure Data Factory (ADF), having the ability to take advantage of Custom .Net Activities greatly expands the ADF use case. One particular example where a Custom .Net Activity is necessary would be when you need to pull data from an API on a regular basis. For example you may want to pull sales leads from the Salesforce API on a daily basis or possibly some search query against the Twitter API every hour. Instead of having a console application scheduled on some VM or local machine, this can be accomplished with ADF and a Custom .Net Activity.

With the data extraction portion complete the next question is where would the raw data land for continued processing? Azure Data Lake Store of course! Utilizing the Azure Data Lake Store (ADLS) SDK, we can land the raw data into ADLS allowing for continued processing down the pipeline. This post will focus on an end to end solution doing just that, using Azure Data Factory and a Custom .Net Activity to pull data from the Salesforce API then landing it into ADLS for further processing.  The end to end solution will run inside a Custom .Net Activity but the steps here to connect to ADLS from .net are universal and can be used for any .net application.

Prerequisites

Continue reading “Accessing Azure Data Lake Store from an Azure Data Factory Custom .Net Activity”

The Lesser Known Resolution to the Unexpected Number of Columns Error Executing U-SQL on Azure Data Lake Analytics

Problem

Azure Data Lake Store (ADLS) gives you the ability to store all raw data in one location readily accessible for analysis. In my particular case I am pulling data from Salesforce and using the ADLS .net SDK to store the results in the data lake. If anyone has worked with Salesforce they know that it is possible to have an object with hundreds of custom fields, this leads to a file being stored in ADLS with hundreds of columns. One of the first translations I wanted to use U-SQL and Azure Data Lake Analytics (ADLA) for was to only evaluate a subset of the data by querying a few columns from the hundreds that might exist in a file.

An example script might look like this:

@tasks_raw =
    EXTRACT AccountId string,
            Id string,            
            OwnerId string,            
            Status string,
            Subject string,
	     ....
	     .... More Fields ....,
	     ....
            WhatCount int?,
            WhatId string,
            WhoCount int?,
            WhoId string
    FROM "/RawData/Salesforce/Task/2016/01/25/Task.csv"
    USING Extractors.Csv();



@tasks_subset =
    SELECT AccountId,
           ActivityDate,          
           CreatedById,
           CreatedDate,
           Id,           
           OwnerId,
           Status
    FROM @tasks_raw;



OUTPUT @tasks_subset
TO "/Subset/Salesforce/Task/2016/01/25/Task.csv"
USING Outputters.Csv();

The first step is to impose a schema on top of the current file by using the “Extract” syntax, there by storing that into a variable. Then from the newly created variable I can select only a subset of columns which I need for later processing to store. The common problem I have run into is with the Extract portion of this script. I have frequently received the Unexpected Number of Columns error as seen in the screen shot below.

image

Solution

The most common cause of this error, especially when the input file is a CSV, is a “comma” in one of the data cells. After I completed all possible sanitization of the data before writing it to ADLS using the .net SDK I still received this error.

I began a dialog with the Azure Data Lake team, who informed me that using the ADLS .net SDK to create and append data to a file in ADLS could be causing a column alignment issue. If you are using the .net SDK to create a file and then append data to it, be aware there is a 4 MB limit on the request. This means that if you send a request with N number of rows and that batch is over 4 MB, the request will terminate in the middle of the record and not in a record boundary delimiter. The solution here is simple, I needed to add some logic to batch the rows to be stored in 4 MB or less chunks, ensuring that the final row would end on a record boundary.

Some sample code would look like this:

public void StoreData(string path, List rows, bool append)
{
	var buffer = new MemoryStream();
	var sw = new StreamWriter(buffer);

	foreach (var row in rows)
	{
		if (buffer.Length + Encoding.UTF8.GetByteCount(row) > FOURMB)
		{
			buffer.Position = 0;
			if (append)
			{
				execute_append(path, buffer);
			}
			else
			{
				execute_create(path, buffer);
				append = true;
			}

			buffer = new MemoryStream();
			sw = new StreamWriter(buffer);
		}
		sw.Write(row);
		sw.Flush();
	}

	if (buffer.Length <= 0) return;

	buffer.Position = 0;
	if (append)
	{
		execute_append(path, buffer);
	}
	else
	{
		execute_create(path, buffer);
	}	

}


private AzureOperationResponse execute_create(string path, MemoryStream ms)
{
	var beginCreateResponse = inner_client.FileSystem.BeginCreate(path, DataLakeAppConfig.DataLakeAccountName, new FileCreateParameters());
	var createResponse = inner_client.FileSystem.Create(beginCreateResponse.Location, ms);
	return createResponse;
}

private AzureOperationResponse execute_append(string path, MemoryStream ms)
{

	var beginAppendResponse = inner_client.FileSystem.BeginAppend(path, DataLakeAppConfig.DataLakeAccountName, null);
	var appendResponse = inner_client.FileSystem.Append(beginAppendResponse.Location, ms);
	return appendResponse;
}


Conclusion

While working with any type of delimited file you no doubt have run into this unexpected number of columns error. This problem is exacerbated by the fact that you may have hundreds of columns in the file which makes it very difficult to track down. If you are using the ADLS SDK to store files in the data lake and you feel you have thoroughly gone through all other possible solutions, give this a shot. Either way it might be worth changing your storing pattern to avoid this problem on a future data file.