data_validation_framework.task¶
Specific tasks.
Functions
|
Create a mixin class to add a |
|
Hook to create a specification report. |
|
Hook to log a summary report of the task. |
Classes
|
Base luigi task used for validation steps. |
|
A class to validate each element of a data set without considering the other elements. |
|
A helper to build task inputs. |
|
A class to validate an entire data set (useful for global properties). |
|
Initialize target prefixes and optionally add a tag to the resut directory. |
|
Class to define and process a validation workflow. |
Exceptions
Exception raised if the validation could not be performed properly. |
- class data_validation_framework.task.BaseValidationTask(*args, **kwargs)¶
Bases:
LogTargetMixin
,RerunMixin
,TagResultOutputMixin
,Task
Base luigi task used for validation steps.
Warning
Only use child of this class such as
ElementValidationTask
orSetValidationTask
.To provide a validation function, use it as:
class Task(ChildBaseValidationTask): data_dir = luigi.Parameter(default='my_data') validation_function = staticmethod(my_validation_function) output_columns = {'my_new_col': None} def inputs(self): return {PreviousTask: {"input_data", "input_data"}
Note
The
dataset_df
andresult_path
attributes are passed to the requirements when they haveNone
values for these attributes.- args()¶
List of addititonal arguments to provide to
validation_function()
.This method can be overridden to pass these arguments to the
validation_function()
and to thepre_process()
andpost_process()
methods.
- static check_inputs(inputs)¶
Check a given dict of inputs.
- extra_input()¶
Targets of the tasks given to extra_requires().
- extra_requires()¶
Requirements that should not be considered as validation tasks.
- static filter_columns(all_dfs, mappings)¶
Filter the columns of the given DataFrames according to a given mapping.
- gather_inputs = False¶
Gather the inputs in the
pandas.DataFrame
given to thevalidation_function()
.- Type:
- inputs()¶
Information about required input data.
This method can be overridden and should return a dict of one of the following forms:
{<task_name>: {"<input_column_name>": "<current_column_name>"}}
{ <task_name>: ( {"<input_column_name>": "<current_column_name>"}, { "<kwarg_1>": "<value_1>", "<kwarg_2>": "<value_2>", } ) }
{ <task_name>: InputParameters( {"<input_column_name>": "<current_column_name>"}, kwarg_1="<value_1>", kwarg_2="<value_2>", ) }
- where:
<task_name>
is the name of the required task,<input_column_name>
is the name of the column we need from the report of task_name,<current_column_name>
is the name of the same column in the current task.<kwarg_*>
is the name of a keyword argument passed to the constructor of the required task.
- kwargs()¶
Dict of addititonal keyword arguments to provide to
validation_function()
.This method can be overridden to pass these keyword arguments to the
validation_function()
and to thepre_process()
andpost_process()
methods.
- output()¶
The targets of the current task.
- output_columns = None¶
A dict with names as keys and empty values as values for new columns created by the current task.
- Type:
- post_process(df, args, kwargs)¶
Method executed after applying the external function.
- pre_process(df, args, kwargs)¶
Method executed before applying the external function.
- processed_inputs()¶
Process the inputs to automatically propagate the values from the workflow.
- read_dataset()¶
Import the dataset to a
pandas.DataFrame
.This method can be overridden to load custom data (e.g. GeoDataFrame, etc.). The dataset should always be loaded from the path given by self.dataset_df.
- requires()¶
Process the inputs to generate the requirements.
- results = None¶
The results of the
validation_function()
.- Type:
- run()¶
The main process of the current task.
- property task_name¶
The name of the task.
- transform_index(df)¶
Method executed after loading the dataset to transform its index.
Note
This transformation is applied to both the dataset and the input reports.
- static validation_function(*args, **kwargs)¶
The validation function to apply to the current data set.
- class data_validation_framework.task.ElementValidationTask(*args, **kwargs)¶
Bases:
BaseValidationTask
A class to validate each element of a data set without considering the other elements.
The
validation_function
will receive the row and the output_path as first arguments.
- class data_validation_framework.task.InputParameters(col_mapping, **kwargs)¶
Bases:
object
A helper to build task inputs.
This class should only be used to build the input dictionaries:
class Task(ChildBaseValidationTask): data_dir = luigi.Parameter(default='my_data') validation_function = staticmethod(my_validation_function) output_columns = {'my_new_col': None} def inputs(self): return { PreviousTask: InputParameters( {"input_data", "input_data"}, kwarg_1="value_1", kwarg_2="value_2" ) }
- Parameters:
col_mapping (dict) – The column mapping.
- Keyword Arguments:
kwargs – All the keyword arguments passed to the constructor.
Warning
The keyword arguments will always override the arguments from the config file.
- class data_validation_framework.task.SetValidationTask(*args, **kwargs)¶
Bases:
BaseValidationTask
A class to validate an entire data set (useful for global properties).
The
validation_function
will receive the DataFrame and the output_path as first arguments.Note
The given
pandas.DataFrame
will always have the columns["is_valid", "ret_code", "comment", "exception"]
. Thevalidation_function
should at least update the values for theis_valid
column.
- data_validation_framework.task.SkippableMixin(default_value=False)¶
Create a mixin class to add a
skip
parameter.This mixin must be applied to a
data_validation_framework.ElementValidationTask
. It will create askip
parameter and wrap the validation function to just skip it if theskip
argument is set toTrue
. If skipped, it will keep theis_valid
values as is and add a specific comment to inform the user.- Parameters:
default_value (bool) – The default value for the
skip
argument.
- class data_validation_framework.task.TagResultOutputMixin(*args, **kwargs)¶
Bases:
object
Initialize target prefixes and optionally add a tag to the resut directory.
Warning
If this mixin is used alongside the
luigi_tools.task.RerunMixin
, then it can have two different behaviors:if placed on the right side of the RerunMixin, a new tag is created and thus rerun does not do anything.
if placed on the left side of the RerunMixin, the targets from the tagged directory without conflict are removed then a new tagged directory with conflict is created.
- target_cls¶
alias of
TaggedOutputLocalTarget
- exception data_validation_framework.task.ValidationError¶
Bases:
Exception
Exception raised if the validation could not be performed properly.
- class data_validation_framework.task.ValidationWorkflow(*args, **kwargs)¶
Bases:
SetValidationTask
,WrapperTask
Class to define and process a validation workflow.
- gather_inputs = True¶
Gather the inputs in the
pandas.DataFrame
given to thevalidation_function()
.- Type:
- static validation_function(*args, **kwargs)¶
The validation function to apply to the current data set .
This method should usually do nothing for
ValidationWorkflow
as this class is only supposed to gather validation steps.
- data_validation_framework.task.spec_report(task)¶
Hook to create a specification report.
- data_validation_framework.task.success_summary(task)¶
Hook to log a summary report of the task.