mardi 20 février 2024

Localstack behind TLS interception

 Introduction :


Corporate proxies have room for improvement in some organisations because they block and intercept any encrypted traffic. It invalidates the TLS/SSL certificate, so most tools will fail. This is the case for localstack https://www.localstack.cloud/, a very useful tool to simulate object storage and Lambda functions.


Steps :


Get the root certificate.

First, extract your root certificate, open Windows, then search for "Certificate", or use Run and type certlm.msc or certsrv.msc. Under root, you will find the certificate of your company :





Create a new docker image

The first thing we need to do is to create a new Docker image from localstack and add our certificate within . So copy the certificate in the same repository as your Dockerfile. Create a Dockerfile with the following code ( in my case, my certificate is FHVI_root.cer ):


FROM localstack/localstack:latest
COPY ./FHVI_root.cer /usr/local/share/ca-certificates/cert-bundle.crt
RUN update-ca-certificates
ENV CURL_CA_BUNDLE=/etc/ssl/certs/ca-certificates.crt
ENV REQUESTS_CA_BUNDLE=/etc/ssl/certs/ca-certificates.crt
ENV NODE_EXTRA_CA_CERTS=/etc/ssl/certs/ca-certificates.crt


Create a docker image with a new name ( in my case localstackfhvi) using the following command :

 docker build -t localstackfhvi .


Compose and run the new container


Then, create a docker-compose.yml file with the following code :


version: "3.11"

services:
  localstack:
    container_name: "${LOCALSTACK_DOCKER_NAME:-localstack-main}"
    image: localstackfhvi
    ports:
      - "127.0.0.1:4566:4566"            # LocalStack Gateway
      - "127.0.0.1:4510-4559:4510-4559"  # external services port range
    environment:
      # LocalStack configuration: https://docs.localstack.cloud/references/configuration/
      - DEBUG=${DEBUG:-0}
      - SKIP_SSL_CERT_DOWNLOAD=1
      - ACTIVATE_PRO=0
    volumes:
      - "${LOCALSTACK_VOLUME_DIR:-./volume}:/var/lib/localstack"
      - "/var/run/docker.sock:/var/run/docker.sock"



Now run the docker-compose file using :

docker compose up


Test it with aws cli


Aws cli let you connect to your new localstack S3 object storage. On Windows, you can install it using the installer https://awscli.amazonaws.com/AWSCLIV2.msi or with pip pip install awscli


Now type :

aws --endpoint-url=http://localhost:4566 kinesis list-streams

You should get an empty list as a result.

Alternatively, you can store the configuration on your profile so you don't have to
type it every time.

The credential file is stored on C:\Users\USERNAME\.aws\credentials

For localstack the file will be :

[localstack]
aws_access_key_id = test
aws_secret_access_key = test


Then there is the config file C:\Users\USERNAME\.aws\config with the following content :

[profile localstack]
region = us-east-1
output = json


Now let's create a bucket "test" with the AWS cli :

aws s3 mb s3://test --profile localstack

Use the following command to list your bucket :

aws s3 ls --profile localstack


Conclusion



Modern data stack has dependencies and auto-updates at its heart,
consequently, some sub-sub processes will try to download new libraries and fail
because of a certificate error.

This is where the cloud shines by offering an industrialised platform with normalized
configuration, allowing time to be spent on valuable tasks.

Working on any modern data stack behind a corporate proxy with TLS interception is
a drag-out process. The extra time required is very difficult to justify to a non-technical/management
person, so be wary about its impact on the perception of your performance.

mercredi 7 février 2024

Operational analytics data platform,what's right for your organisation ?

Introduction 


Business intelligence platforms are traditionally for strategic decisions, but requirements are more for operational analytics nowadays.For example, bed planning in a hospital is mostly based on surgery planning. A dashboard with fresh updates highlighting the number of external extra nurses required is valuable.


These operational business intelligence platforms have one or more of the following in common :


  • More frequent data refresh than traditional nightly batch

  • Expose data to a wider array of tool than traditional corporate business intelligence tool ( like Tableau, PowerBi,...)

  • Need to take action based on analysis ( if data from the sensor show something critical, do something )

  • Scale horizontally, need the ability to add compute/storage resources during the timeline so handle additional workload.


These requirements drive different types of architecture; the following show the different key factors strongly influencing architecture for an operational analytics platform; there are, of course, way more factors than these. Also, lambda architecture can be added to a lakehouse approach to satisfy both operational analytics and operational scenarios. Lambda architecture will be for another post.


These are only key points; hire a data architect before proceeding with an architecture. 


Let’s recap how Business intelligence architecture evolved through time.


Early adopter, storage and computing mixed.


Business intelligence traditionally relies on relational databases to store, transform, and present data. Relational databases have evolved and can now handle structured and unstructured data very well ( just look at Postgresql's latest version, json capabilities are impressive ). Also, they deal very well with binary data and files. Their engines are very efficients in computing row-oriented datasets. The term relational database should not be confused with SQL engine. SQL is a language. Relational databases ( Microsoft SQL Server,Postgresql,Oracle,Mysql) include both the SQL engine and the storage. They are optimised for OLTP workload and not for analytical workload yet ( I separate them from MPP engines, which are indeed good at OLAP). Also ( for now, the adoption of Apache arrow is changing that), every time you need to export a large analytical dataset, RDMS use an inefficient ODBC/JDBC interface that serializes and deserialises row-oriented datasets.


The early stage of a new platform


Two decades ago came the big data “revolution”. It was the T-14 armada tank of the data world. Completely Inefficient for actual corporations. Why? Solving a problem that didn’t exist in most companies. Yes, it’s amazing to deal with any supercollider dataset, but otherwise, it’s an expensive overkill.


The idea was to separate the compute and the storage of the datastore, the compute started it’s journey with map-reduce and the storage on an early stage of the distributed filesystem with Hadoop Distributed file system (HDFS). Sacrificing latency for raw bandwidth/throughput, redundancy, and scalability. The concept was interesting. Also, anchor modelling approach ( ex Datavault ) adoption improved. This new modelling emphasises the modular approach and use of business keys instead of surrogate keys. This avoids large lookups and overuse of joins that only relational databases were capable of at the time.



The young adult stage


Then, some common sense arose, and instead of using an overly complicated computing engine, the like of Spark/Databrick appeared. Let’s use cheap, abundant RAM on modern hardware instead of an integrated complex engine. 


The concept is simple: waste CPU and RAM for fast and simple computation. Next was the wide adoption and integration of Pandas framework. It allows impressive code simplification to reduce labour costs and ease evolutive maintenance. SQL is very, very bad for metadata-oriented transformation. Let’s say you have a dataset coming from CDC; for each row, you have the previous and new values. You need to compare each group's first and second rows for every non-PK column. It will take a whopping amount of code to do so ( >5000 if you don’t want to crash your tempdb) because without tricking it, SQL forces you to name the columns. With Pandas, it takes 9 lines of code; the maintenance cost will align with the amount of code.


In Switzerland, a day of consultant labour is above 1000 chf/euro/usd and 32 Go of ram is 100 chf/usd/chf. It became a no-brainer for small and medium-sized datasets ( any dataset with the biggest table under 1 TB of data ).


Storage started to mature with online object storage of AWS and their S3 protocol,Azure and ADLS v2. Binary immutable datafile (parquet ) became a standard.


Tools to generate modular modelling gain in popularity ( Wherescape,BIML, etc.) and improve computing usage without long repetitive code.



Finally, a mature stage? 


Data Lake keeps maturing. Databrick makes their Delta Lake storage format open source. This format adds a transaction file to “update “ data in the data lake. S3 fixed their problem about concurrent write issues. So now we have a situation where we can scale storage and compute and manage a "real-time" data stream within a new concept named Lakehouse. Lakehouse is simply a data lake that can update the data instead of just append it.


Distributed compute mature and efficient memory management comes into play, hence the introduction of Pandas 2.x + Apache Arrow, but yet to be battle-tested. Single-node solutions for computing, like Polars and Duckdb, simplify architecture for middle-size datasets and offer very cost-effective options. However, they are still very new to the landscape.



What does it look like ? A computing engine pulls/transform data from either a batch or stream of data into an object storage. Tables are stored in a Delta lake format ( parquet files+ transactions files + metadata files). Instead of a monolith platform like a relational database. We have a three-tier approach :


  • A compute engine

  • An object storage

  • A query engine (optional )



How to choose if a Lakehouse approach is right for you?


First and foremost, assess the business needs and weigh each requirement's actual value. Second, do you have the skill in your pool to modernise your data platform? If getting it (training, consulting, hiring) is higher than the expected return on investment. Don’t do it.


Company culture is also very important; all companies advertise their ability to change, but only a few are good at it. Modern data platforms involve a lot of changes in how we think about the data, especially in how we model them. Modelling for flexibility is very different. It requires a lot of training. If your data team is not ready for it, don’t do it.


Besides a decision tree as a guideline for operational analytic architecture, blue-filling is a business requirement you would like to fulfil. Purple is a skill required. Green is a decision.








Conclusion


The data landscape is a passionate field to work in these days with a lot of possibilities; as new technologies arise, we need to adapt how we model, how we code, and how we design our platform. So it's a never-ending cycle of change. Enjoy

mardi 15 février 2022

ADF Pipeline Beginner series : Recursive parsing of folder

  This is a series of tiny articles about some features that are lacking in the Azure data factory and how to overcome them :


A few limitations using only ADF Pipeline prevent the easy recursive parsing of files in a folder.

This article uses previously documented techniques to overcome these limitations.

All the support files can be found in my GitHub repository ADF_showcase


Top-level: Overview


On the first level, we will leverage the input parameters to set an initial path, then we will iterate on all the subfolders beneath it.


Deep dive into details

1- Set initial path 


The first activity set the variable to iterate. The code is the following @json(concat('[{"name":"/',pipeline().parameters.INITIAL_PATH,'"}]'))

An explanation of the logic can be found in this post.

2 - Until queue is empty

This part is responsible for creating the recursive behaviour of looping, explanation in my post here.




2_1 - Set Current Folder path


The first task is to find the Item of the stack we want to iterate into.
We achieve this with the first activity. Using the code 
@first(variables('Queue')).name

2_2 - Set _tmpQUeue with Queue


In combination with the 2_3-Set Queue, this logic allows us to bypass the limitation of the ADF Pipeline (self-referencing variables are not allowed) and add/remove value to the variable Queue.
The idea is to store the variable in another one (prefix with _) before using it.

2_3 - Set Queue 

Now that we start iterating over the first element of the stack, we need to remove it so we don't iterate over this item twice. Therefore we set the Queue variable with this code. 

@if(greater(length(variables('_tmpQueue')),1),
json(
        replace( 
            string(variables('_tmpQueue')),
            concat(string(first(variables('_tmpQueue'))),','),
            ''
            )
        ),
        json(
        replace( 
            string(variables('_tmpQueue')),
            string(first(variables('_tmpQueue'))),
            ''
            )
        )
        )
   
We can't directly remove value from an Array; look at my post here that explains how and why.

2_4 - Get Metadata


This activity looks into the hierarchy of subfolders and files for a given path and exports it through the activity's output value.

The critical bit is to select Child Items in the list of Arguments.

2_5 - Store output of the meta in _tmpQueue


We now store the list of subfolders and files from the previous activity into the _tmpQueue using this code :

@activity('2_4 - Get Metadata').output.childItems


2_6 - Filter only files using -tmpQUeue as source

We are using this activity to separate the file and the folder; this activity outcome will
be a list of files, and we can do this using this code.

This will parse the Array (_tmpQueue ), and for each item ( keyword item() ), use the type to filer 'File'.

@equals(item().type,'File')


2_7 - Set _List files with value of ListFiles


This will store the content of the Array ListFiles inside the variable _ListFiles; the reason is self-referencing variables are not allow
Article about it here


2_8 - Union filter get meta plus _ListFile

This one follows the 2_7 and joins the content of _ListFiles, and the new files obtain through 2_4 and filter with the 2_6. This work jointly with the 2_7 as a self-referencing variable is not allowed. Also, this adds the parameter path to the array.

The code used is the following :

@union(
json(replace(string(activity('2_6 - Filter only files using -tmpQUeue as source').output.Value),
        '"name":',
        concat('"path":"',variables('CurrentFolderPatch'),'","name":')
        )),
variables('_ListFiles'))

2_9 - Add path to file and load in _tmpList


This step adds a path to the _tmpList so a fully qualified list of folders can be added to the queue.
As opposed to step 2_8 here, the path is added directly in the array's name attribute.
The code is the following :

@json(
    replace(string(
            variables('_tmpQueue')
            ),
        '"name":"',
        concat('"name":"',variables('CurrentFolderPatch'),'/')
    )
)

2_10 - Fillter folder


This filter the array to it will output only the list of a folder, this will be useful to add the list of subfolders to the queue.


2_11 - Set _temp Queue with the value of Queue


Again self-referencing variables are not permitted; therefore, we store the value of Queue inside _tempQueue

2_12 - Test if list subfolder is empty


This if statement verifies if the subfolder list is empty before attempting to add it to the Queue. Knowing we do text manipulation to insert a list to another list, we need to know if the sub-list is null or we will insert a comma followed by null.

The code to perform this test is the following
 @empty(activity('2_10 - Filter folder').output.Value)
it leverages the 2_10 activity to check if the filtered list of folders is empty or not.

If the list of subfolders is not empty, we go to activity 2_12_1. With subfolder

2_12_1 With subfolder


This code do another check to verify if parsing sub-folder is authorize and if so add the list of subfolder using the following code to set the variable Queue.
@if(pipeline().parameters.SUBFOLDER_SEARCH,union(
    activity('2_10 - FIlter folder').output.Value,
    variables('_tmpQueue')
    ),json(''))

Conclusion


This is the end; please use the code in the repos and push it inside your ADF; it will be faster than re-do all the steps.







dimanche 6 février 2022

ADF Pipeline Beginner series : How to create nested loop

   This is a series of tiny articles about some features that are lacking in the Azure data factory and how to overcome them :


As of today ( January 2022 ), the ADF pipeline doesn't allow a loop inside a loop; this limitation can be trouble if, for example, you attempt to create action on file a nested hierarchy with subfolders.


The idea is to use an array variable. Because a variable can be refreshed and a parameter cannot. Also, "Foreach" loop takes the input condition variable and store it without refreshing it during the iteration. The until is capable of reloading the variable before evaluating the expression. 

Warning: ADF Pipeline call it an Until loop, but it's actually a do until loop as it will iterate at least once regardless of the condition. The condition will be used to start the second to n iteration.


In this example, we have a preliminary activity that sets the array variable, then the "until" activity will loop through it until there isn't any more item to go through.

Look at my article "ADF Pipeline beginner series: Add a list of items to an array variable" to know how to add items to an array, and my article "ADF Pipeline Beginner series: Remove first item of an Array" explains how to remove the item to the list.



With the first activity we set an Array for exemple this one is a list of path to explore :

@json(concat('[{"name":"/',pipeline().parameters.INITIAL_PATH,'"}]'))

The Expression/Condition of the Until activity will run (except the first iteration)
until the expression is no longer valid.
The code is the following @empty(variables('Queue'))



Inside the loop, we will have first one activity that removes the current item ( My post ADF Pipeline Beginner series: Remove first item of an Array) and at the end one that add item to be loop through ( My post (ADF Pipeline beginner series : Add list of items to an array variable) .



ADF Pipeline Beginner series : Remove first item of an Array

  This is a series of tiny articles about some features that are lacking in the Azure data factory and how to overcome them :



As of today ( January 2022 ), ADF pipelines don't have a native function to remove one item of an array. This prevents us from using an array as a stack; however, it's possible to overcome this limitation with a bit of code. The key is to convert the array to a string and modify the line manually. Then two cases occur; first, the Array has only one item; therefore doesn't need to deal with the separator between items as there will be none. Second, there is more than one item in the second case, and we need to deal with the separator.

In this example, we use the _tmpQueue variable as registry variable ( explanation on my article "ADF Pipeline beginner series: Add a list of items to an array variable".



@if(greater(length(variables('_tmpQueue')),1),
json(
        replace( 
            string(variables('_tmpQueue')),
            concat(string(first(variables('_tmpQueue'))),','),
            ''
            )
        ),
        json(
        replace( 
            string(variables('_tmpQueue')),
            string(first(variables('_tmpQueue'))),
            ''
            )
        )
        )
    

First we extract the JSON code of the first item of the array using first(variables('_tmpQueue'))
then we transform this code into a string to remove this part from the global Array ( which will be converted as a string to achieve a text replacement )

The first line of the code @if(greater(length(variables('_tmpQueue')),1) test if the Array has one more than one item and uses this information to deal with the comma separator


ADF Pipeline beginner series : Add list of items to an array variable

 This is a series of tiny articles about some features that are lacking in Azure data factory and how to overcome them :

This follow this post 

As today ( 28/01/2022) Azure Data Factory doesn't allow the Append variable activity to add a list of item to a array variable. The append to array activity state the possibility to add an item to a type array,however, there the two way to deal with it are dead en 

First  if you try to pass a list of item only the first one will be happend to the list.

Second if you try to append an array directly you will reach an error


or



Therefore to achieve the desire outome we can leverage two variable. The first one can be variable1 and the second one _Variable1 and we will use the underscore one like a register.

In this example we looking to add the files name to an existing list of file.

So the first step is we store the actual value of the ListFiles into _ListFiles.


We use a Set variable activity to achieve this :

Second step is to join both item into the array,since we don't have any function to achieve this we need to use text edition command and re-cast as an Object.

One way to achieve this is to use the @union command like

 @union(variables('_ListFiles'),activity('2_6 - Filter only files using -tmpQUeue as source').output.Value)

 this way we add the existing data of ListFiles (which was stored temporarily into
_ListFiles and the output of one activity ( 2_6 on this case )