Giter Club home page Giter Club logo

mrpaulandrew / procfwk Goto Github PK

View Code? Open in Web Editor NEW
174.0 174.0 113.0 41.88 MB

A cross tenant metadata driven processing framework for Azure Data Factory and Azure Synapse Analytics achieved by coupling orchestration pipelines with a SQL database and a set of Azure Functions.

Home Page: https://mrpaulandrew.com/category/azure/data-factory/adf-procfwk/

License: Other

PowerShell 3.93% C# 46.73% TSQL 41.80% PLpgSQL 1.26% Jupyter Notebook 6.19% Scala 0.09%
adf adfprocfwk azure azure-functions azure-sql-database data-engineering data-factory framework metadata pipelines processing procfwk

procfwk's Introduction

Hi there ๐Ÿ‘‹

procfwk's People

Contributors

mrpaulandrew avatar njlangley avatar nowinskik avatar pedrofiadeiropeak avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

procfwk's Issues

Capture the Last Pipeline Parameter Value Used in the Parameters Table

Having the last parameter value used captured along side existing metadata means the precursor stored procedure can inspect this value then update the next value for a given run.

Eg. Last value passed = 1.

Precursor update. New value = Last + 1.

This helps dynamically passing values to worker pipelines that use incremental date/time values.

Use Secure Activity Inputs/Outputs

For parts of the Infant pipeline where SPN details are being passed around to interact with Workers use Secure Input/Output options for in scope activities so SPN values are not exposed in plain text as part of standard logs.

Add Support for Key Vault URLs to be Sorted Instead of SPN Values

Update the [dbo].[ServicePrincipals] table to support Key Vault URLs as an option to be used instead of sorting SPN details in the metadata directly.

Drive different behaviour via a database property.

Extend affected Azure Functions to parse Key Vault URLs and return values from Key Vault using MSI authentication.

Allow multiple CurrentExecution to be managed by framework?

Hi, currently the framework can only handle one collection of pipelines (procfwk.Pipelines) and one execution (CurrentExecution) to be managed at any single time. It would be ideal if a higher level grouping to be allowed so that, we can define multiple sets of pipelines and dependencies, and then we have a capability to submit them as separate jobs, and have the framework to manage multiple jobs to be tracked by procfwk.CurrentExecution? Thank you.

Separate Metadata Integrity and Clean Up

Separate Metadata Integrity and Clean Up results into Two Stored Procedures & Pipeline Activities.

  • One Lookup Activity
  • One Stored Proc Activity

Can run in parallel before Clean Up ForEach activity within parent pipeline.

Automatic cancellation of worker pipelines when an child or infant pipeline is cancelled

When cancelling the parent or child pipelines (recursive cancel), the framework pipelines complete with a cancelled status, but the worker pipelines continue running (because cancelling does not allow cleanup). In the case where 50 workers in different data factories need cancelling too, this can be time consuming.

It would be nice if the framework had the option to define a cancellation policy, much like the continuation on error options, to allow cancellation of worker pipelines in the case an infant is cancelled.

I have been looking into pushing data factory events into event grid, to trigger a function for cancelled infants, that calls a new a cleanup pipeline to do just this. It cancels the worker, and waits for it to finish before updating the CurrentExecution table (only if required). I have PoC code that I can share.

Could not find object: linkedService.SupportDatabase on DeployProcFwkComponents.ps1

I'm trying to setup the procfwk. Untill this moment everything works fine, but now I receive this message while executing DeployProcFwkComponents.ps1.

`Update-PropertiesFromFile : Could not find object: linkedService.SupportDatabase
At C:\Program Files\WindowsPowerShell\Modules\azure.datafactory.tools\0.17.0\public\Publish-AdfV2FromJson.ps1:160 char:9
Update-PropertiesFromFile -adf $adf -stage $Stage -option $op ...
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
CategoryInfo : NotSpecified: (:) [Write-Error], WriteErrorException
FullyQualifiedErrorId : Microsoft.PowerShell.Commands.WriteErrorException,Update-PropertiesFromFile

The property 'Body' cannot be found on this object. Verify that the property exists.
At C:\Program Files\WindowsPowerShell\Modules\azure.datafactory.tools\0.17.0\private\Update-PropertiesFromFile.ps1:68 char:9
$json = $o.Body
~~~~~~~~~~~~~~~
CategoryInfo : NotSpecified: (:) [], PropertyNotFoundException
FullyQualifiedErrorId : PropertyNotFoundStrict

The variable '$json' cannot be retrieved because it has not been set.
At C:\Program Files\WindowsPowerShell\Modules\azure.datafactory.tools\0.17.0\private\Update-PropertiesFromFile.ps1:69 char:23
if ($null -eq $json) {
~~~~~
CategoryInfo : InvalidOperation: (json:String) [], RuntimeException
FullyQualifiedErrorId : VariableIsUndefined

Update-PropertiesFromFile : Wrong path defined in config for object(path): linkedService.SupportDatabase(properties.typeProperties.connectionString.secretName)
At C:\Program Files\WindowsPowerShell\Modules\azure.datafactory.tools\0.17.0\public\Publish-AdfV2FromJson.ps1:160 char:9
Update-PropertiesFromFile -adf $adf -stage $Stage -option $op ...
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
CategoryInfo : NotSpecified: (:) [Write-Error], DataException
FullyQualifiedErrorId : System.Data.DataException,Update-PropertiesFromFile
The variable '$json' cannot be retrieved because it has not been set.
At C:\Program Files\WindowsPowerShell\Modules\azure.datafactory.tools\0.17.0\private\Update-PropertiesFromFile.ps1:88 char:44
Update-ObjectProperty -obj $json -path "properties.$p ...
~~~~~
CategoryInfo : InvalidOperation: (json:String) [], RuntimeException
FullyQualifiedErrorId : VariableIsUndefined

Save-AdfObjectAsFile : Cannot bind argument to parameter 'obj' because it is null.
At C:\Program Files\WindowsPowerShell\Modules\azure.datafactory.tools\0.17.0\private\Update-PropertiesFromFile.ps1:104 char:41
$f = (Save-AdfObjectAsFile -obj $o)
~~
CategoryInfo : InvalidData: (:) [Save-AdfObjectAsFile], ParameterBindingValidationException
FullyQualifiedErrorId : ParameterArgumentValidationErrorNullNotAllowed,Save-AdfObjectAsFile

The variable '$f' cannot be retrieved because it has not been set.
At C:\Program Files\WindowsPowerShell\Modules\azure.datafactory.tools\0.17.0\private\Update-PropertiesFromFile.ps1:105 char:23
$o.FileName = $f
~~
CategoryInfo : InvalidOperation: (f:String) [], RuntimeException
FullyQualifiedErrorId : VariableIsUndefined
`

I tried many things. What I've changed now in the PS1 script is remove the first 14 lines and added this:

$tenantId = "mytenantid"
$subscriptionId = "myscubscriptionid"
$spId = "azureclientid"
$spKey = "azureclientsecret"
$resourceGroupName = "myresourcename"
$dataFactoryName = "myadfname"
$region = "West Europe" (but also tried "WestEurope" or "West-Europe")

And later I found:
$Env:SQLDatabase = ""
There I added this;
Server=tcp:adfframework.database.windows.net,1433;Initial Catalog=frameworkmetadatadb;Persist Security Info=False;User ID=MYUSERNAME;Password=MYPASSWORD;MultipleActiveResultSets=False;Encrypt=True;TrustServerCertificate=False;Connection Timeout=30;

I'm pretty sure that I am making a mistake. But I don't know which one and how to fix my problem. Can I get support here?

Add Worker Pipeline Auto Retry Options

In the event of a worker pipeline failure develop the ability to allow the worker to automatically retry.

Configure via the properties table:

  • Enabled.
  • Retry attempts.

Email alerts sent to blank email addresses when using specific alert bit flags

Describe the bug
When alerting is setup using the bit mask to specify specific alerts only, eg Errors, the proc to determine if alerts should be processed returned before the outcome was known. This meant a successful worker configured to only alert on failure would make the framework child pipeline fail when processing the alerts as it would attempt to send the alert without having a valid address to send it to.

Affected services
Which resource within the processing framework does this affect?

  • All of them

To Reproduce
Steps to reproduce the behavior:

  1. Modify procfwk.PipelineAlertLink to setup a worker pipeline to alert an email address only when an error occurs (OutcomesBitValue = 4). Ensure no other alerts are configured for the pipeline
  2. Run the framework to process the pipeline successfully
  3. In the ADF monitor view, the framework child pipeline will have a status of error from trying to send an email to a blank address

Expected behaviour
The framework should be checking the outcome of the worker pipeline to determine if there are any alerts to be sent based a bit-wise AND with procfwk.PipelineAlertLink.OutcomesBitValue.

Additional context
I have a branch on my fork with a fix, will submit a PR with the details

Support large parameters for ADF pipelines

Generic worker pipelines, datasets and linked services can all be configured to use parameters for dynamic behavior. If some completely data-driven solutions, generic pipelines can have large string parameters that are not currently supported by the framework.

ADF.procwfk should support large pipeline parameters so these generic pipelines can be orchestrated via the framework.

If the procfwk.PipelineParameters.ParameterValue column in the metadata has a value of NULL, no parameters are sent to the pipeline.

If a parameter exists in the metadata, but the procfwk.PipelineParameters.ParameterValue column is NULL, then no parameters will be sent to that pipeline.

This affects the execution of worker pipelines in data factory.

Steps to reproduce the behavior:

  1. Create a pipeline with more than one parameter.
  2. Change the value of one parameter in the procfwk.PipelineParameters table to NULL, but leave the pipeline in place.
  3. Execute the pipeline.

I would expect one of two outcomes:
Null parameters are simply not passed to the pipeline.
A constraint on the procfwk.PipelineParameters.ParameterValue column would prevent NULLs.

Avoid pointless execution run after a CleanPreviousRun results in all pipelines having the status Success

Use Case:

If the metadata of some pipelines, for some reason, isn't updated, when a new execution is triggered and the Clean Up Previous Run activity runs, the status of all pipelines not updated will be changed to Success if they completed successfully.

Once that activity completes, the Execution Wrapper activity will start a new execution that will be pointless since all pipelines have finished successfully and nothing will happen during this run, it will only serve to clean the CurrentExecution table at the end.

Suggestion:

Tweak the ExecutionWrapper stored procedure to consider this particular case.

One possible solution is after the check for a running execution is done, to add the following piece of code

--- check for all pipelines' status being success after a clean run
IF EXISTS
(
    select 1 from [procfwk].[CurrentExecution]
)
AND NOT EXISTS
(
    select 1 from [procfwk].[CurrentExecution] where [PipelineStatus] <> 'Success'
)
BEGIN
    EXEC [procfwk].[UpdateExecutionLog]
		@PerformErrorCheck = 0;
END;

Allow pipeline relationships

Currently, if a worker pipeline fails it blocks all other pipeline executions, failing the entire execution. A feature has been put on the backlog to allow this failure to be ignored and allow all other pipelines to execute. In my experience, what is needed is a bit more complex.

For a single data platform you will have several to many data products that are generated. These products each require certain pipelines to run, in order. Often, data products can depend on common pipelines, but this is not always the case.

For example, consider the following scenario:

id | pipeline name        | stage
----------------------------------
1  | data_source1_http    | 1
2  | trans_source1_stage  | 2
3  | trans_product1_load  | 3
4  | data_source2_odata   | 1
5  | trans_source2_stage  | 2
6  | trans_product2_load  | 3
7  | trans_product3_dwh   | 4

trans_product3_dwh pulls data from the staging area for source1 and source 2. trans_product1_load only uses data from source1. trans_product2_load only uses data from source 2.

If pipeline data_source1_http fails for some reason, then the execution for pipelines 2, 3 and 7 should be held up, but not 4, 5 and 6. Those pipelines are independent of the failed pipeline and the subsequent products can be created without a problem.

This type of pipeline dependency/heirarchy would be very helpful as part of the orchestration framework.

Previously cancelled worker pipeline executions fails when the framework is restarted

Describe the bug
If you cancel a running worker pipeline, the framework stops in the same way as if an error happened (in simple failure handling mode) . If you then restart the framework processing, instead of re-running the worker pipeline, it fails to reset the state and the parent framework pipeline fails without triggering any child pipelines.

Affected services
Which resource within the processing framework does this affect?

  • All of them

To Reproduce
Steps to reproduce the behavior:

  1. Trigger a new run in Data Factory
  2. Cancel one of the worker pipelines
  3. Wait for the current framework execution to complete
  4. Check the current executions table - the cancelled pipeline should have a status of Cancelled/Unknown
  5. Re-run the framework parent pipeline. The framework fails to restart the processing cleanly like it does for a failed execution.

Expected behaviour
A cancelled pipeline should be restart-able like a failed pipeline.

Additional context
I have a branch on my fork with a fix, will submit a PR with the details

Refactor Child & Infant Pipelines to Make Worker Execute Simpler and Isolated

Move worker pipeline execute activities into infant pipeline along side run handling, making the call to run a worker pipeline an isolated operation.

The child level pipeline can then be made simpler and used just for the purpose of scaling out executions.

Once done. Refactor infant to use local variables replacing hard coded activity output expressions where possible.

Update failed pipeline error handling and restarts across execution stages.

  1. Add new table [PipelineDependencies] allowing 1:many relationships for Pipelines.

| PipelineId | DependantPipelineId |

  1. Add a new Property called 'FailureHandling'. This property could have 3 possible values....

None = The failure status is updated in the Current Execution table and the framework carries on regardless.

Simple = The failure status is updated in the Current Execution table. Downstream execution stages are marked as blocked and processing is stopped after the stage where the failure occurred. This is basically what currently happens.

Informed = The failure status is updated in the Current Execution table. Downstream pipelines are marked as blocked (using the DependantPipelineId attribute in the PipelineDependencies table). However, processing is allowed to continue. When the next execution stage starts only pipelines NOT blocked will be prepared and ran.

Add a support for a customer precursor stored procedure.

Control pipeline run frequency via the framework metadata. Eg. Dynamic enable/disable of pipelines for a given pattern. Maybe use a precursor procedure with custom code to drive if a pipeline/stage should be executed daily/weekly/monthly etc?

Creation of new execution in overiderestart mode fails when a previous run didn't finish successfully

Describe the bug
When the OverideRestart property is set to 1 and there's a current execution where some pipelines have a status <> 'Success', the creation of a new execution will fail. This happens since the ExecutionWrapper stored procedure calls the UpdateExecutionLog stored procedure and when the OverideRestart property is set to 1, the process will only work if all pipelines in the Current Execution table have a status of 'Success' (in theory, they wouldn't even be there and moved to the final log table once framework completes), otherwise it will always throw an error.

Affected services
Which resource within the processing framework does this affect?

  • Data Factory
  • SQL Database

To Reproduce
Steps to reproduce the behavior:

  1. Change a pipeline parameter to force a specific pipeline to fail. For example, in a wait activity set the value 'abc'
  2. Trigger a new run in Data Factory and wait for it to finish with a failed pipeline
  3. Set the property OverideRestart to 1
  4. Trigger another new run in Data Factory
  5. Framework will fail when running the Execution Wrapper activity.

Expected behaviour
The creation of a new run should be possible (by setting OverideProperty = 1) even when the previous run didn't finish successfully

Get Property Value Scalar Function

Add a scalar function to return the latest property value. Use this internally replacing complete select queries against the properties view.

Update Activity Timeout Values

Reduce all framework pipeline activity timeout values from the service default of 7 days.

In the event of a wider platform failure, the framework processes should not "get stuck" and be in a situation of waiting 7 days before a failure is realised.

Extend Execution Stages with a new Level - Execution Batches

Building on community feedback in the following issues:

#61
#62

Allow the framework to support multiple executions of the parent pipeline using a new higher level concept called Batches.

  • A batch sits above execution stages.
  • Batches can be linked to 1/many execution stages.
  • Be enabled/disabled via properties.
  • Batches can run concurrently. Batch 1 = Running, Batch 2 = Running.
  • A single batch ID cannot run concurrently.

Examples of Batch names:

  1. Hourly
  2. Daily
  3. Weekly
  4. Monthly

Finally, it is expected that a Trigger hitting the parent pipeline will have the batch name included.

Triggering the Parent pipeline multiple times could overwrite preceding in progress runs

Describe the bug
When the parent pipeline is triggered and the framework has not reached the stage of registering into the framework db, if the user triggers again, there is a potential chance that subsequent trigger(s) will overwrite the preceding runs

Affected services
Which resource within the processing framework does this affect?

  • Data Factory
  • SQL Database (I believe the subsequent triggered pipeline will overwrite records of the preceding one)
  • Functions
  • All of them
  • Other

To Reproduce
Steps to reproduce the behavior:

  1. Trigger 02-Parent. Then immediately, trigger 02-Parent again. Technically, this would create two trigger runs. Recognizing that the current framework is not designed to allow multiple triggering of the same "job", so this is technically not an allowed function. But the current integrity check relies on the existence of the framework database records, and in this scenario, these records are not written/committed to the db yet hence the framework will not know that there is a job already triggered

PS: this is one of the reasons for Enhancement Request #61

Expected behaviour
The 2nd triggered run should fail (as if the integrity check has detected via db records)

Screenshots
Sorry I no longer have screen shots as I have created my own work around detection for this

Additional context
My current work around is that I created an additional function to check the ADF run log if a triggered job has already occurred (in my version, i have implemented a version of the enhancement request #61)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.