Pipelines
Pipelines are one of the fundamental building blocks of both ADF and adfPy. To create an AdfPipeline:
from adfpy.pipeline import AdfPipeline
pipeline = AdfPipeline(name="copyPipeline", activities=[extract, ingest], schedule="* * 5 * *")
from adfpy.activities.execution import AdfCopyActivity, AdfDatabricksSparkPythonActivity
extract = AdfCopyActivity(
name="copyBlobToBlob",
input_dataset_name="staging",
output_dataset_name="landing",
source_type=BlobSource,
sink_type=BlobSink,
pipeline=pipeline
)
ingest = AdfDatabricksSparkPythonActivity(name="ingest", python_file="foo.py", pipeline=pipeline)
Note also that instead of specifying a separate Trigger
resource (which is the approach ADF takes), adfPy allows you to set this on the adfPipeline object. For more information on this, take a look at the Triggers page.
Setting dependencies between activities
Setting dependencies between activities in adfPy is very similar to how Airflow does this using the bitshift operators:
will create a dependency onactivity1
for activity2
. This can also be used with lists, as in Airflow:
is equivalent to
The implicit assumption here is that the dependency condition between the activities is Succeeded
. This means that in the previous examples activity2
will only execute if activity1
completed successfully. If you want to deviate from this, for example to define failure handling, you can explicitly set dependencies for an activity:
- Succeeded
- Failed
- Skipped
- Completed