Ruben Slabbert
Ruben Slabbert

Creating Azure Data Factory pipelines using Typescript

As part of an internal codebase, we’ve experimented with a way to create and manage Data Factory pipelines using Typescript code. This experiment proved to be successful and we’re planning on moving ahead with it. This blog post explores what we did and the benefits of our approach.

Firstly, let’s define our main components:

  • Azure Data Factory (ADF) is a managed ETL service that allows you to move and transform data between different systems. For example, we frequently use it to move data from existing source systems (such as SQL Server) to a cloud data warehouse (such as Synapse). ADF is primarily configured via a web-based GUI.
  • Typescript is a programming language that brings static typing to Javascript. We can use it as a general-purpose language to call APIs, transform data, etc.

The first key insight to the idea was that ADF provides an HTTP API (as does most of Azure) to configure, create, and delete resources. In fact, the ADF web GUI makes use of this API. The first part of our experimentation was to see if we could manually call this API using a simple Typescript program. We roughly did the following:

const execPipeParams = {
  url: "https://management.azure.com/subscriptionId/resourceGroupName/dataFactoryName/pipelineName,
  method: "POST",
  body: {},
};

// This request runs the pipeline
const resp = await serviceClient.sendRequest(execPipeParams);

The above code confirmed that we could execute an ADF pipeline from our code using the API. We immediately took advantage of this on a project that still created ADF pipelines using the web GUI to create automated tests for our pipelines:

test("exec CsvToParquet",async () => { 
  // This object corresponds to the pipeline's parameters 
  const pipelineArgs = { 
    SourceFileName: "test.csv", 
    FirstRowAsHeader: true, 
    ColumnDelimiter: ",", 
    EscapeChar: "\\", 
    QuoteChar: '"', 
    SourceRelativePath: "csv-to-parquet", 
    SourceStorageAccountContainerName: "test-data", 
    SourceStorageAccountEndpoint: "https://xxxx.blob.core.windows.net/",  
    SourceStorageType: "blob", 
    TargetRelativePath: "testing2", 
    TargetStorageAccountContainerName: "silver", 
    TargetStorageAccountEndpoint: "https://xxxx.dfs.core.windows.net/", 
    TargetStorageType: "adls", 
    PartitionStartDate: "12/7/2020 2:36:36 AM", 
    PartitionEndDate: "12/7/2020 2:36:36 AM", 
    SourceSystemName: "testcsv", 
  }; 

  // This function will execute the pipeline and wait for it to finish 
  const pipelineResp = await execADFPipelineForTesting( "CsvToParquet", pipelineArgs ); 
  
  // Finally, we expect that the pipeline succeeded 
  expect(pipelineResp.parsedBody.status).toEqual("Succeeded"); 
});

This proved very successful in limited testing and allowed us to test for pipeline regressions sporadically, but it was still not fully integrated into our DevOps process. To do that we decided to take it a step further, by seeing if we could create ADF pipelines using our code:

const client = new DataFactoryManagementClient(credentials, subscriptionId);
await client.pipelines.createOrUpdate(resourceGroup, factoryName, pipelineName, { ... pipeline config ... });

When both these capabilities in place, we were able to add the following to our testing logic:

// All the resources we need to deploy as part of this test. Explained further below.
const resources = { 
  datasets: [
    BlobDelimitedDataset,
    BlobParquetDataset, 
    ADLSDelimitedDataset, 
    ADLSParquetDataset, 
    AzSQLDataset 
  ],
  linkedServices: [GenericBlob, GenericADLS, GenericAzSql], 
  pipelines: [CsvToParquet], 
}; 

// adfUUID is randomly generated at the start of each test to allow us to 
// run all our pipelines in a single test ADF instance without them conflicting 
// All resources deployed as part of this test are prefixed with this uuid 
let adfUUID: string; 
beforeAll(async () => { 
  adfUUID = await deployADFForTesting(resources); 
}); 

afterAll(async () => { 
  await destroyADFForTesting(adfUUID, resources); 
});

At this point, we had the ability to create our ADF resources in Typescript and test them in Typescript. This proved to be a massive productivity benefit, but we were also able to take it a step further. Initially, we coded the objects using JSON files, but this proved hard to maintain. To remedy this, we were able to make use of the Typescript types built into the @azure/arm-datafactory package on npm, such as PipelineResource, CopyActivity, etc. An example pipeline definition could then be:

const copyAcitivity: CopyActivity = { 
  name: "copy", 
  type: "Copy", 
  source: { type: "DelimitedTextSource" }, 
  sink: { type: "DelimitedTextSink" }, 
  inputs: [], 
  outputs: [] 
} 

export const BlobToBlobCopy: PipelineResource = { 
  name: "BlobToBlobCopy", 
  activities: [copyAcitivity], 
  parameters: { FileName: { type: "String" } }, 
};

Before we go further, the benefits we get so far are:

  • Our datasets, linked services, and pipelines are all specified as Typescript code, allowing us to break it up, reuse code, and type check our objects. This makes the development process faster and also increases the robustness and maintainability of our code.
  • The pipeline code and tests are colocated, ensuring they can stay up to date. This also enables something we’ll touch on in a bit, which is the integration with our CI.
  • Since it’s all code, developers can develop locally on their machines and push to a git repo when they’re ready, which is a usability improvement for developers over the web GUI.
  • Finally, because the code is now decoupled from a live ADF instance, we can now easily deploy the exact same code to different environments to ensure our development, test, staging, and production instances are managed correctly.

The above is already a pretty big improvement to how ADF is normally developed, but we can go further. ADF has the ability to create dynamic properties. Effectively, you can use it to pass parameters from a pipeline to an activity, concatenate different parameters and strings to turn parameters into paths, perform math on JSON fields, etc. These dynamic properties, however, are coded in the web GUI (and in our old Typescript) as plain text, which means if we did something like misspell a function, forget a comma/parenthesis, or change a parameter without updating the corresponding plain text, we’d get a runtime error. Basically, we’d have to deploy and run our ADF pipeline before we could see issues in these properties, which resulted in a slow iteration process. We are able to use Typescript magic to make a better solution:

// Instead of doing this:
`@concat(pipeline().paramaters.Folder, "/", pipeline().paramaters.File)`

// Our Typescript functions let us write:
interface MyPipelineParams {
  Folder: { type: "String" },
  File: { type: "String" },
}

expression(pipeline<MyPipelineParams>().parameters.Folder, rawString("/"), pipeline<MyPipelineParams>().parameters.File)

In the above code, our pipeline function is generic over an interface ensuring that when we try to access parameters.Folder, we’re guaranteed to only be accessing a valid field.

// The implementation of this function makes use of Proxy to ensure that when you run
pipeline<Params>().paramaters.MyParamField.toString()

// you get the string
"pipeline().parameters.MyParamField"

In essence, we’re able to emulate the dynamic property language using Typescript, providing us with code completion, great errors, and other smaller improvements such as syntax highlighting. For example, this is a test for one of our emulated functions (replace):

import { rawString, StringExpression } from "../../types";
import { replace } from "./replace";

test("replace('hello world', 'world', 'you'", () => {
    expect(
        replace(
            rawString("hello world"),
            rawString("world"),
            rawString("you")
        ).toString()
    ).toEqual("replace('hello world', 'world', 'you')");
});

test("replace return type", () => {
    expect(replace(rawString(""), rawString(""), rawString(""))).toBeInstanceOf(
        StringExpression
    );
});

This ability to use generic parameters and other type-checked functions then means that interfaces between datasets and pipelines can also be type-checked:

// blob_delimited.ts
interface BlobDelimitedParameters {
    FileName: StringExpression;
}

const parameters: Record<
    keyof BlobDelimitedParameters,
    ParameterSpecification
> = {
    FileName: {
        type: "String",
    },
};

const properties: DelimitedTextDataset = {
    parameters,
    type: "DelimitedText",
    location: {
        type: "AzureBlobStorageLocation",
        fileName: expression(dataset<BlobDelimitedParameters>().FileName),
    },
};

const BlobDelimitedDataset: DatasetResource = {
    name: "BlobDelimited",
    properties,
};

// copy_activity.ts
const copyAcitivity: CopyActivity = {
    name: "copy",
    type: "Copy",
    source: {
        type: "DelimitedTextSource",
    },
    sink: {
        type: "DelimitedTextSink",
    },
    inputs: [
        {
            referenceName: BlobDelimitedDataset.name as string,
            parameters: {
                FileName: expression(
                    pipeline<BlobDelimitedParameters>().parameters.FileName
                ),
            }
        }
     ]
}

In the above example, our copy activity directly refers to the types on the blob delimited dataset, meaning that a) we can’t refer to parameters that don’t exist, and b) if we ever change the blob delimited dataset’s parameters, our types will stop us from introducing a regression.

The final piece to this puzzle then is how we can run our tests in our CI. On each commit to our repository, all of our Typescript code will be built, and our tests will be run, ensuring that our objects are well constructed, our resources can deploy to ADF, and that they run successfully. For example, see our CI output when a change introduced failed changes:

Test Suites: 2 failed, 1 passed, 3 total
Tests:       3 failed, 3 passed, 6 total
Snapshots:   0 total
Time:        1538.214 s
Ran all test suites matching /packages\/adf-core/i.

In conclusion, while the above process still has some usability issues (specifically around test robustness), it is already providing an enormous amount of value in our development process. For example, we can feel confident that changes we make are unlikely to introduce regressions if our tests pass. All of this is a result of our commitment to a continuous improvement and experimentation mindset.

Get in touch with Ruben Slabbert

Ruben is a solution architect with data engineering, infrastructure modernisation, and application development experience across Microsoft Azure, Google Cloud Platform, and Amazon Web Services.

Get in Touch