data_validation_framework.task

Specific tasks.

Functions

SkippableMixin([default_value])

Create a mixin class to add a skip parameter.

spec_report(task)

Hook to create a specification report.

success_summary(task)

Hook to log a summary report of the task.

Classes

BaseValidationTask(*args, **kwargs)

Base luigi task used for validation steps.

ElementValidationTask(*args, **kwargs)

A class to validate each element of a data set without considering the other elements.

InputParameters(col_mapping, **kwargs)

A helper to build task inputs.

SetValidationTask(*args, **kwargs)

A class to validate an entire data set (useful for global properties).

TagResultOutputMixin(*args, **kwargs)

Initialize target prefixes and optionally add a tag to the resut directory.

ValidationWorkflow(*args, **kwargs)

Class to define and process a validation workflow.

Exceptions

ValidationError

Exception raised if the validation could not be performed properly.

class data_validation_framework.task.BaseValidationTask(*args, **kwargs)

Bases: LogTargetMixin, RerunMixin, TagResultOutputMixin, Task

Base luigi task used for validation steps.

Warning

Only use child of this class such as ElementValidationTask or SetValidationTask.

To provide a validation function, use it as:

class Task(ChildBaseValidationTask):
    data_dir = luigi.Parameter(default='my_data')
    validation_function = staticmethod(my_validation_function)
    output_columns = {'my_new_col': None}

    def inputs(self):
        return {PreviousTask: {"input_data", "input_data"}

Note

The dataset_df and result_path attributes are passed to the requirements when they have None values for these attributes.

args()

List of addititonal arguments to provide to validation_function().

This method can be overridden to pass these arguments to the validation_function() and to the pre_process() and post_process() methods.

static check_inputs(inputs)

Check a given dict of inputs.

extra_input()

Targets of the tasks given to extra_requires().

extra_requires()

Requirements that should not be considered as validation tasks.

static filter_columns(all_dfs, mappings)

Filter the columns of the given DataFrames according to a given mapping.

gather_inputs = False

Gather the inputs in the pandas.DataFrame given to the validation_function().

Type:

bool

inputs()

Information about required input data.

This method can be overridden and should return a dict of one of the following forms:

{<task_name>: {"<input_column_name>": "<current_column_name>"}}
{
    <task_name>: (
        {"<input_column_name>": "<current_column_name>"},
        {
            "<kwarg_1>": "<value_1>",
            "<kwarg_2>": "<value_2>",
        }
    )
}
{
    <task_name>: InputParameters(
        {"<input_column_name>": "<current_column_name>"},
        kwarg_1="<value_1>",
        kwarg_2="<value_2>",
    )
}
where:
  • <task_name> is the name of the required task,

  • <input_column_name> is the name of the column we need from the report of task_name,

  • <current_column_name> is the name of the same column in the current task.

  • <kwarg_*> is the name of a keyword argument passed to the constructor of the required task.

kwargs()

Dict of addititonal keyword arguments to provide to validation_function().

This method can be overridden to pass these keyword arguments to the validation_function() and to the pre_process() and post_process() methods.

nb_total = None

Total number of processed elements.

Type:

int

nb_valid = None

Total number of valid elements.

Type:

int

output()

The targets of the current task.

output_columns = None

A dict with names as keys and empty values as values for new columns created by the current task.

Type:

dict

post_process(df, args, kwargs)

Method executed after applying the external function.

pre_process(df, args, kwargs)

Method executed before applying the external function.

processed_inputs()

Process the inputs to automatically propagate the values from the workflow.

read_dataset()

Import the dataset to a pandas.DataFrame.

This method can be overridden to load custom data (e.g. GeoDataFrame, etc.). The dataset should always be loaded from the path given by self.dataset_df.

requires()

Process the inputs to generate the requirements.

results = None

The results of the validation_function().

Type:

pandas.DataFrame

run()

The main process of the current task.

property task_name

The name of the task.

transform_index(df)

Method executed after loading the dataset to transform its index.

Note

This transformation is applied to both the dataset and the input reports.

static validation_function(*args, **kwargs)

The validation function to apply to the current data set.

class data_validation_framework.task.ElementValidationTask(*args, **kwargs)

Bases: BaseValidationTask

A class to validate each element of a data set without considering the other elements.

The validation_function will receive the row and the output_path as first arguments.

class data_validation_framework.task.InputParameters(col_mapping, **kwargs)

Bases: object

A helper to build task inputs.

This class should only be used to build the input dictionaries:

class Task(ChildBaseValidationTask):
    data_dir = luigi.Parameter(default='my_data')
    validation_function = staticmethod(my_validation_function)
    output_columns = {'my_new_col': None}

    def inputs(self):
        return {
            PreviousTask: InputParameters(
                {"input_data", "input_data"},
                kwarg_1="value_1",
                kwarg_2="value_2"
            )
        }
Parameters:

col_mapping (dict) – The column mapping.

Keyword Arguments:

kwargs – All the keyword arguments passed to the constructor.

Warning

The keyword arguments will always override the arguments from the config file.

class data_validation_framework.task.SetValidationTask(*args, **kwargs)

Bases: BaseValidationTask

A class to validate an entire data set (useful for global properties).

The validation_function will receive the DataFrame and the output_path as first arguments.

Note

The given pandas.DataFrame will always have the columns ["is_valid", "ret_code", "comment", "exception"]. The validation_function should at least update the values for the is_valid column.

data_validation_framework.task.SkippableMixin(default_value=False)

Create a mixin class to add a skip parameter.

This mixin must be applied to a data_validation_framework.ElementValidationTask. It will create a skip parameter and wrap the validation function to just skip it if the skip argument is set to True. If skipped, it will keep the is_valid values as is and add a specific comment to inform the user.

Parameters:

default_value (bool) – The default value for the skip argument.

class data_validation_framework.task.TagResultOutputMixin(*args, **kwargs)

Bases: object

Initialize target prefixes and optionally add a tag to the resut directory.

Warning

If this mixin is used alongside the luigi_tools.task.RerunMixin, then it can have two different behaviors:

  • if placed on the right side of the RerunMixin, a new tag is created and thus rerun does not do anything.

  • if placed on the left side of the RerunMixin, the targets from the tagged directory without conflict are removed then a new tagged directory with conflict is created.

target_cls

alias of TaggedOutputLocalTarget

exception data_validation_framework.task.ValidationError

Bases: Exception

Exception raised if the validation could not be performed properly.

class data_validation_framework.task.ValidationWorkflow(*args, **kwargs)

Bases: SetValidationTask, WrapperTask

Class to define and process a validation workflow.

gather_inputs = True

Gather the inputs in the pandas.DataFrame given to the validation_function().

Type:

bool

static validation_function(*args, **kwargs)

The validation function to apply to the current data set .

This method should usually do nothing for ValidationWorkflow as this class is only supposed to gather validation steps.

data_validation_framework.task.spec_report(task)

Hook to create a specification report.

data_validation_framework.task.success_summary(task)

Hook to log a summary report of the task.