Get control of data loads in Azure Synapse

Get control of data loads in Azure Synapse

Azure SQL Database

by Erwin | Jul 12, 2021

Load Source data to DataLake

There are several ways to extract data from a source in Azure Synapse Analytics or in Azure Data Factory. In this article I'm going to use a metadata-driven approach by using a control table in Azure SQL in which we configure the metadata(The data which we need to extract). We then load data from an Azure SQL Database to a Azure Data Lake. We will use 2 Pipelines for this activity:

  • The Command Pipeline in which we configure our metadata-driven approach
  • The Execute Pipeline in which we perform the actual data copy activity

Command Pipeline

As you can see in the figure above we use the For Each activity to call the execute pipeline. But when we use the For Each Activity, the Execute Pipelines are assigned during the start of the For Each Activity and not during run time. We will only see this behaviour while extracting a large amount of tables. I have noticed that slots are idle and not starting a new task before one the other task are finished. Due to this behaviour we will see gaps in the Execution window as we can see in the figure below.

For each activity dataload

 

In the article below I will give you a solution to get control of your data loads by using the Lookup Activity and based on a control table where we can define the workloads.

Metadata

When you're already using a database where you store your Metadata, use this database otherwise create a new one. We then need to create the table below where we store the table names which we need to extract.

[sql]
CREATE TABLE [configuration_demo].[Source_Parameter](
	[Id] [int] IDENTITY(1,1) NOT NULL,
	[SourceName] [nvarchar](500) NULL,
	[SourceSchema] [nvarchar](500) NULL,
	[SourceQueryCustom] [nvarchar](max) NULL,
	[DataLakeCatalog] [nvarchar](500) NULL,
	[Worker] [int] NULL,
	[WorkerOrder] [int] NULL,
	[TableDestinationName] [nvarchar](500) NULL,
	[TableDestinationSchema] [nvarchar](500) NULL,
	[IsActive] [bit] NOT NULL,
	[IsIncremental] [bit] NULL,
	[IsIncrementalColumn] [nvarchar](50) NULL,
	[LastLoadtime] [datetime] NULL,
 CONSTRAINT [PK_Source_Parameter] PRIMARY KEY CLUSTERED
(
	[Id] ASC
)WITH (STATISTICS_NORECOMPUTE = OFF, IGNORE_DUP_KEY = OFF, OPTIMIZE_FOR_SEQUENTIAL_KEY = OFF) ON [PRIMARY]
) ON [PRIMARY]
[/sql]

Metadata-Sourcetable

 
ColumnName Definition
SourceName Name of the Source Table or view
SourceSchema SchemaName of the Source Table or view
SourceQueryCustom Your own custom select querie to load data, do casting or just get a subset of columns
DataLakeCatalog Name of the datalake folder to store raw parquet files
Worker To divide and order the workload, a source can be assigned to 6 workers. These unassigned sources running parallel in batches of 30 (1-6)
WorkerOrder The loading order for the sources assigned to workers 1 till 6.
TableDestinationName Name of the datastore table to store source data
TableDestinationSchema Name of the datastore schema to store source data
IsActive Activate or disable the source from loading (1|0)
IsIncremental Set to 1 if the source needs to be delta loaded (1|0)
IsIncrementalColumn If above setting is set to 1, on which datetime column do we need to base the incremental load
LastLoadtime This field is updated on runtime when the source is loaded.

With the Worker we can define the workload, we can add a same set of tables, a mix of small and large tables, together. This worker can be changed any time.

Once we have filled the configuration we can start creating our Linked Services for the connections.

Create Linked Services

For these 2 Pipelines we need to create 4 different Linked Services.

Azure Key Vault (LS_AKV_OXGN), if you haven't created a Linked Service for Azure Key Vault before please read this page.

Azure Data Lake Storage Gen2 (LS_ADLS_DLS2), linked to your Azure Data Lake.

Linked Service DataLake

Grant workspace service managed identity  access to your Azure Data Lake Storage Gen2.

Azure SQL Source database (LS_ASQL_SOURCE)

 

Linked Service Sql Source

Create a entry in your Azure Key Vault secrets for the connection string  =>

integrated security=False;encrypt=True;connection timeout=30;data source=xxxxxxxxxx.database.windows.net;initial catalog=WideWorldImporters;user id=xxxxxx;password=xxxxxx

Grant workspace service managed identity access to your Azure SQL Database or create a read-only account in your Source Database

Azure SQL Configuration database (LS_ASQL_CONFIG)

Create a entry in your Azure Key Vault secrets for the connection string  =>

Integrated Security=False;Encrypt=True;Connection Timeout=30;Data Source=demoekeuwdvlmmssqloxgn01.database.windows.net;Initial Catalog=your configurationdatabase

Integration Datasets

Before we can start building our Pipeline we need to create the Integration Datasets.

Source database (DS_ASQL_SOURCE_DEMO)

Dataset SQL Source

Leave the Table Schema and Name empty,

Sink Datalake (DS_ADLS_RAW_PARQUET)

Parameters

  • FilePath
  • Filename

Metadata DB (DS_ASQL_CONFIG)

Dataset Config

Leave the Table Schema and Name empty,

Pipeline

The last step is to create 2 Pipelines, which you can both download over here.

Command Pipeline Worker

Command Pipeline

Name: (PL_COMMAND_COPY_ASQL_TO_ADLS_WORKER_DEMO)

Lookup Activity

Lookup activity can retrieve a dataset from any of the Azure Data Factory / Azure Synapse -supported data sources. For this case we use a query to load the Metadata from the control table so that we know which tables we need to extract.

Get Files Worker

For above Query I've created a view, in the view I have defined the name of the folder and the Filename with the DataLake based on the metadata from the control table.

[sql]
CREATE view [execution_demo].[Load_DataLake_Files]
AS
/**********************************************************************************************************
* View Name:  [execution_demo].[Load_DataLake_Files]
*
* Purpose:    View to show the records which should be processed
*
*
* Revision Date/Time: 

**********************************************************************************************************/
SELECT	 SP.Id as [PipelineParameterId]
		,SP.[SourceName]
		,isnull(CASE WHEN SP.[SourceSchema] != '' THEN SP.[SourceSchema] END, 'Unknown')  as SourceSchema
		,case when Worker not in (1,2,3,4,5,6)  then 1 else Worker end Worker
		,WorkerOrder
		,case	when	SourceQueryCustom is null
			then	'SELECT * FROM [' +isnull(CASE WHEN SP.[SourceSchema] != '' THEN SP.[SourceSchema] END, 'Unknown') + '].[' + SP.[SourceName] + '] where 0 = ' + convert(nvarchar(1),[IsIncremental]) + ' OR 1 = ' + convert(nvarchar(1),[IsIncremental]) + ' AND ' + isnull(SP.[IsIncrementalColumn],'1') +' >='''+convert(varchar(20),ISNULL([LastLoadtime], '1900.01.01'))+''''
			else	[SourceQueryCustom]
		 end As SelectQuery
		,'SELECT CASE WHEN ' + convert(nvarchar(1),[IsIncremental]) + ' = 1 THEN CONVERT(VARCHAR, MAX(' + isnull(SP.[IsIncrementalColumn],'1') +'), 120) ELSE CONVERT(VARCHAR, GETDATE(), 120) END AS [LastLoadDate] FROM [' +isnull(CASE WHEN SP.[SourceSchema] != '' THEN SP.[SourceSchema] END, 'Unknown') + '].[' + SP.[SourceName] + ']' AS [SelectLastLoaddate]
		,isnull(CASE WHEN SP.DataLakeCatalog != '' THEN SP.DataLakeCatalog END, 'Unknown') + '/' +
					isnull(CASE WHEN SP.[SourceSchema] != '' THEN SP.[SourceSchema] END, 'Unknown') + '_' +
					SP.TableDestinationName + '/' +
					FORMAT(GETUTCDATE(), 'yyyy') +'/'+
					FORMAT(GETUTCDATE(), 'MM') +'/'+
					FORMAT(GETUTCDATE(), 'dd')
           as  FilePath
		,isnull(CASE WHEN SP.DataLakeCatalog != '' THEN SP.DataLakeCatalog END, 'Unknown')  + '_' +
					isnull(CASE WHEN SP.[SourceSchema] != '' THEN SP.[SourceSchema] END, 'Unknown') + '_' +
					SP.TableDestinationName + '_' +
					FORMAT(GETUTCDATE(), 'yyyy') +
					FORMAT(GETUTCDATE(), 'MM') +
					FORMAT(GETUTCDATE(), 'dd') +
					FORMAT(GETUTCDATE(), 'HH') +
					FORMAT(GETUTCDATE(), 'mm') +'.parquet'
                --Equal to Filename
			 as [FileName]
		,SP.[TableDestinationName]
		,SP.[ProcessType]
		,cast(SP.[IsActive] as BIT) AS [IsActive]
		,cast(SP.[IsIncremental] as BIT) AS [IsIncremental]
		,isnull(SP.[IsIncrementalColumn],1) as [IsIncrementalColumn]
		,case when [LastLoadtime] is null then '1900.01.01' else LastLoadtime end  as LastLoadtime
FROM [configuration_demo].[Source_Parameter]  as SP
[/sql]

For each Activity

The ForEach Activity defines a repeating control flow in your pipeline. In this case it will call the Pipeline Activity based on the output of Lookup Activity Get Files Worker XX.

For each Activity

 

Within each Activity we add a Pipeline Activity, to finalize this Pipeline we first need to create the Execute Pipeline.

Execute Pipeline

Name: (PL_EXECUTE_COPY_ASQL_TO_ADLS_DEMO)

Add the following Parameters to the Pipeline:

Execute parameters pipeline

Switch Activity

The Switch activity provides the same functionality that a switch statement provides in programming languages. It evaluates a set of activities corresponding to a case that matches the condition evaluation.

In this situation we create a case for Incremental Loads or Full loads. Why do we do this? We pay for every activity and every activity will take time. If we have a full load we don't need the last load date from the source. So in this situation it will save you 1 activity and 15 sec for every Full load.

Switch Activity

Lookup Activity

Use the Lookup Activity to get the LastLoadDateTime from a Source Table. We will store this value at the end of the process for incremental Loading purposes. In this way, we exactly know the correct value if we run the process the next time.  This Activity is only used in the switch activity when the case Incremental = True .

Exexcute lastloaddate

Query = expression @{pipeline().parameters.SelectLastLoadDate}

Copy Activity

We can use the Copy activity to copy data among data stores located on-premises and in the cloud.

Execute copy source

Query = expression @{pipeline().parameters.SelectQuery}

Execute source ADLS

Stored procedure Activity

Use Stored procedure Activity to do data transformation activities in a Data Factory / Synapse pipeline to transform and process raw data into predictions and insights. We use it to update our control table with the Incremental Value from the source which we looked up in the beginning of our pipeline.

Execute set last loaddate

For the Full load Switch we replace "'@{activity('Get LastLoadDate').output.firstrow.LastLoadDate}' "  with "getdate()"

Now we have finalized our execute pipeline we need to finalize the command pipeline.

FEL Execute Pipeline

 

Fill in the parameters based on the output of the Get Files Worker1 and you're ready to rock.

Copy and Paste  the Get Files Worker 1 and ForEach_Worker1 to the desired number or workers, rename them to 2,3,4,5 and so on. Change the worker number in the Lookup query or even easier just download both templates from my Github.

Workers

The great advantage of above solution is that you are now in control how your loads are running instead of the random slots in an For each Activity.

Hopefully this article has helped you a step further. As always, if you have any questions, leave them in the comments.

Feel free to leave a comment

Restore a accidentally deleted Azure SQL Database

Restore a accidentally deleted Azure SQL Database

Help I deleted my Logical Server

​OOPS

Have you ever experienced that you accidentally deleted your Logical Server in Azure?  Because, for example, you made your Pipeline wrong. Surely. And of course you didn't have a backup in your storage either.
Well I must confess that happened to me recently. It was on my test environment but if you delete a configuration database with your metadata it is quite annoying.

How to Resolve?

But there is a solution to get the deleted Azure SQL databases back.

You need to take the following steps for this:

  • Recreate the Logical server with the same name and in the same region.
  • Do not re-deploy the deleted databases from your pipeline.
  • After the Logical Server has been created, click Create Database in the Azure Portal
  • Create database
  • Go directly to the tab Additional Settings
  • Additional Settings
  • Select Backup
  • Select Backup
  • Search for your deleted Logical Server and you will see that you can restore your removed databases.
  • Select the correct database, click on review and create. Your database will now be restored
  • The last thing you need to do is to remove the datetime extension from your database
  • You can now start using your database again and all security roles that were there before are also present.
  • Repeat above steps in case you had more databases attached to the same server.

It's a fairly simple process, but you just need to know it. And it will ultimately save you a lot of time and frustration.
If you have any questions regarding the above, please let me know.

Watch the MS Ignite sessions on-demand

Watch the MS Ignite sessions on-demand

MS Ignite Sessions

MS Ignite 2020 was this year a virtual event.

Most of the sessions were live in the evenings and the other sessions were available at different times in different time zones. Compliments to the MS Ignite team for organizing such a great event

Most of the sessions are ready to watch on demand. If you have not done this yet, here is the link to be able to do this.

Create an Azure Synapse Analytics SQL Pool

Create an Azure Synapse Analytics SQL Pool

Azure SQL Database

Adding a new SQL Pool

There are 2 options to create a SQL Pool. Go to your Synapse Workspace in de Azure Portal and add a new SQL Pool.

Create SQL Pool

Or go to the Management Tab in your Azure Synapse Workspace and add a new Pool.

SQL Pool Azure Synapse Studio

Creating a new SQL Pool

Create Azure Synapse SQL Pool

SQL Pool Name (SQL pool name should not be greater than 15 characters) and select the correct Performance Level. Be careful in selecting the Performance Level. DW 1000c will cost you  € 12,73 per hour excluding storage. My advice is to start small and scale up when necessary. More about DWc units can be found here.

Additional settingsAzure Synapse SQL Pool

If you want to use Data from an existing pool, select Backup or Restore Point. Otherwise select none

Use existing data-Backup

SQL Pool Backup

Use existing data-Restore Point

SQL Pool Restore Point 

Use existing data-None

Review Azure Synapse SQL Pool

Review your settings and your SQL Pool will be created.

Remarks

Be aware that if you already have another Azure SQL DW running, you cannot select or add it to your Azure Synapse Workspace.
Adding an SQL pool is only possible via the Azure Synapse Workspace(Azure Portal or Studio).

If you install it as a standalone component through the Azure portal, the SQL Pool cannot be added to the Azure Synapse Workspace.

Pause SQL Pool

And if you don’t use your SQL pool please PAUSE it. It will save you a lot of money.

Using Azure Automation to generate data in your WideWorldImporters database

CASE:

For my test environment I want to load every day new increments into the WideWorldImporters Azure SQL Database with Azure Automation. The following Stored Procedure is available to achieve this.

 EXECUTE DataLoadSimulation.PopulateDataToCurrentDate

@AverageNumberOfCustomerOrdersPerDay = 60,

@SaturdayPercentageOfNormalWorkDay = 50,

@SundayPercentageOfNormalWorkDay = 0,

@IsSilentMode = 1,

@AreDatesPrinted = 1;

For this case I’m going to use Azure Automation to Execute this Stored Procedure on a daily base.

More details on above Stored Procedure can be found here.

In case you don’t have a WideWorldImporters database in your Azure environment you can download it from GitHub.

  1. Create an Azure Automation Account
  2. Add module to Azure Automation Account
  3. Adding a SQL Server Credential 
  4. Create a Runbook
  5. Create a Schedule

Create an Azure Automation Account

First of all we need to create an Azure Automation Account in our Azure Environment.

Go to the portal and search for Automation and click op Create

Create Azure Automation

Fill in the details and select the correct subscription and Resource Group.

Create Azure Automation

Click on create and wait for the account to be created. The new Automation Account configuration blade will be opened once the provision is completed.

Add module to Azure Automation Account

The next step is that we need to add the “SQLSERVER” module to our Automation Account.

Select Modules and Browse Galery.

Create Azure Automation

Search for sqlserver Created by matteot_msft.

Create Azure Automation

Then click on Import and OK and wait for the import to complete.

Add SQL Server Credentials

Click on add a Credential, this is a secure way to hold your login name and password that will be used to access the Azure SQL Database.

Create Azure Automation

Add a Credential name AzureSQLDBAutomationAccount and define a password.

Create Azure Automation

If this account is not created in your Azure SQL Database, then you need to add this account as well:

[sql]

–Execute in Master Database

create login AzureSQLDBAutomationAccount with password=’Your defined Password’           

–Execute in WideworldImporters Database

create user AzureSQLDBAutomationAccount

for login AzureSQLDBAutomationAccount

with default_schema= dbo

–add user to the dabase role

exec sp_addrolemember  N’db_owner’ , N’AzureSQLDBAutomationAccount’
[/sql]

Create a Runbook

The next step is to create a Runbook.

You can Import an existing Runbook or Create a new Runbook.

For now we will Create a new RunBook.

Create Azure Automation

Name = “AzureSQLDB_Run_StoredProcedure”

Runbook Type =”Powershell”

Create Azure Automation

[ps]
param(
# Fully-qualified name of the Azure DB server
[parameter(Mandatory=$true)]

[string] $AzureSQLServerName = ‘DBSERVER’,

# Name of database
[parameter(Mandatory=$true)]

[string] $AzureSQLDatabaseName = ‘DBNAME’,

# Name of Procedure ‘exec dbo.xxxxxxxx’
[parameter(Mandatory=$true)]

[string] $ProcedureName = ‘exec name’,

# Credentials for $SqlServerLogin stored as an Azure Automation credential asset
[parameter(Mandatory=$true)]

[string] $SqlCredential
)
$Credential = Get-AutomationPSCredential -Name $SqlCredential
$AzureSQLServerName = $AzureSQLServerName + ‘.database.windows.net’

$SQLOutput = $(Invoke-Sqlcmd -ServerInstance $AzureSQLServerName -Username $Credential.UserName -Password $Credential.GetNetworkCredential().Password -Database $AzureSQLDatabaseName -Query $ProcedureName -QueryTimeout 65535 -ConnectionTimeout 60 -Verbose) 4>&1

Write-Output $SQLOutput
[/ps]

This a simple Powershell script which can execute a Stored Procedure. More details about the PS Invoke-Sqlcmd can be found here.  You can download the Powershell script from my Github

Save the Runbook and then test the runbook. After you tested your Runbook you can Publish the Runbook

Create Azure Automation

Fill in the requested Parameters as showed in the picture above and click on Start. The requested procedure will be executed.

 

Create A Schedule

That last step is create a Schedule which will run every day to incremental load your data.

Create Azure Automation

 

Create Azure Automation

Define a Name, Schedule Start Time and the Recurrence for the Schedule and click on Create

Create Azure Automation

 

Fill in the requested Parameters and click on Save.

I assume that you have now succeeded in creating an Azure Automation Account with these steps and that you could execute a Stored Procedure in an Azure SQL Database. If you have any questions, leave a message in the comments below. Thanks for reading.

TIP:

To update your database to the current date you have to run this procedure quit  lot of times so create a query where you execute the Procedure 100x times in  a row. The downloaded database has data up to may 2016.

In case you get error regarding the Temporal Tables, check your errors in this blog post. I helped me a lot.

 

I my next blog I will describe how you easily can extract data(full and Incremental) from your WideWorldImporters Database to Azure Data Lake Gen2 with Azure Data Factory.