mardi 15 février 2022

ADF Pipeline Beginner series : Recursive parsing of folder

  This is a series of tiny articles about some features that are lacking in the Azure data factory and how to overcome them :


A few limitations using only ADF Pipeline prevent the easy recursive parsing of files in a folder.

This article uses previously documented techniques to overcome these limitations.

All the support files can be found in my GitHub repository ADF_showcase


Top-level: Overview


On the first level, we will leverage the input parameters to set an initial path, then we will iterate on all the subfolders beneath it.


Deep dive into details

1- Set initial path 


The first activity set the variable to iterate. The code is the following @json(concat('[{"name":"/',pipeline().parameters.INITIAL_PATH,'"}]'))

An explanation of the logic can be found in this post.

2 - Until queue is empty

This part is responsible for creating the recursive behaviour of looping, explanation in my post here.




2_1 - Set Current Folder path


The first task is to find the Item of the stack we want to iterate into.
We achieve this with the first activity. Using the code 
@first(variables('Queue')).name

2_2 - Set _tmpQUeue with Queue


In combination with the 2_3-Set Queue, this logic allows us to bypass the limitation of the ADF Pipeline (self-referencing variables are not allowed) and add/remove value to the variable Queue.
The idea is to store the variable in another one (prefix with _) before using it.

2_3 - Set Queue 

Now that we start iterating over the first element of the stack, we need to remove it so we don't iterate over this item twice. Therefore we set the Queue variable with this code. 

@if(greater(length(variables('_tmpQueue')),1),
json(
        replace( 
            string(variables('_tmpQueue')),
            concat(string(first(variables('_tmpQueue'))),','),
            ''
            )
        ),
        json(
        replace( 
            string(variables('_tmpQueue')),
            string(first(variables('_tmpQueue'))),
            ''
            )
        )
        )
   
We can't directly remove value from an Array; look at my post here that explains how and why.

2_4 - Get Metadata


This activity looks into the hierarchy of subfolders and files for a given path and exports it through the activity's output value.

The critical bit is to select Child Items in the list of Arguments.

2_5 - Store output of the meta in _tmpQueue


We now store the list of subfolders and files from the previous activity into the _tmpQueue using this code :

@activity('2_4 - Get Metadata').output.childItems


2_6 - Filter only files using -tmpQUeue as source

We are using this activity to separate the file and the folder; this activity outcome will
be a list of files, and we can do this using this code.

This will parse the Array (_tmpQueue ), and for each item ( keyword item() ), use the type to filer 'File'.

@equals(item().type,'File')


2_7 - Set _List files with value of ListFiles


This will store the content of the Array ListFiles inside the variable _ListFiles; the reason is self-referencing variables are not allow
Article about it here


2_8 - Union filter get meta plus _ListFile

This one follows the 2_7 and joins the content of _ListFiles, and the new files obtain through 2_4 and filter with the 2_6. This work jointly with the 2_7 as a self-referencing variable is not allowed. Also, this adds the parameter path to the array.

The code used is the following :

@union(
json(replace(string(activity('2_6 - Filter only files using -tmpQUeue as source').output.Value),
        '"name":',
        concat('"path":"',variables('CurrentFolderPatch'),'","name":')
        )),
variables('_ListFiles'))

2_9 - Add path to file and load in _tmpList


This step adds a path to the _tmpList so a fully qualified list of folders can be added to the queue.
As opposed to step 2_8 here, the path is added directly in the array's name attribute.
The code is the following :

@json(
    replace(string(
            variables('_tmpQueue')
            ),
        '"name":"',
        concat('"name":"',variables('CurrentFolderPatch'),'/')
    )
)

2_10 - Fillter folder


This filter the array to it will output only the list of a folder, this will be useful to add the list of subfolders to the queue.


2_11 - Set _temp Queue with the value of Queue


Again self-referencing variables are not permitted; therefore, we store the value of Queue inside _tempQueue

2_12 - Test if list subfolder is empty


This if statement verifies if the subfolder list is empty before attempting to add it to the Queue. Knowing we do text manipulation to insert a list to another list, we need to know if the sub-list is null or we will insert a comma followed by null.

The code to perform this test is the following
 @empty(activity('2_10 - Filter folder').output.Value)
it leverages the 2_10 activity to check if the filtered list of folders is empty or not.

If the list of subfolders is not empty, we go to activity 2_12_1. With subfolder

2_12_1 With subfolder


This code do another check to verify if parsing sub-folder is authorize and if so add the list of subfolder using the following code to set the variable Queue.
@if(pipeline().parameters.SUBFOLDER_SEARCH,union(
    activity('2_10 - FIlter folder').output.Value,
    variables('_tmpQueue')
    ),json(''))

Conclusion


This is the end; please use the code in the repos and push it inside your ADF; it will be faster than re-do all the steps.







Aucun commentaire:

Enregistrer un commentaire