PostImputation Module

Module to run the post-imputation processing tasks on VCF files.

This module provides classes for running various post-imputation tasks in parallel, including unzipping VCF files, filtering variants based on imputation quality, normalizing VCF files, and indexing VCF files. It uses the ThreadPoolExecutor for parallel execution and tqdm for progress tracking. The tasks are designed to handle large genomic datasets efficiently by leveraging multi-threading.

It also includes functionality to download and use reference genomes for normalization, and convert VCF file into a format suitable for further analysis, that is PLINK binary files.

class ideal_genom.preprocessing.PostImputation.AnnotateVCF

Bases: ParallelTaskRunner

A parallel task runner for annotating normalized VCF files using reference annotation.

This class provides functionality to annotate normalized VCF files with identifiers from a reference annotation file using bcftools. It processes multiple VCF files in parallel, making it efficient for large genomic datasets.

The class identifies all normalized VCF files matching a specified pattern and annotates them using the provided reference annotation file. It adds identifiers from the reference file to the VCF entries.

output_prefix

Prefix to add to the output files. Defaults to ‘annotated-‘.

Type:

str, optional

ref_annotation

Path to the reference annotation file used for annotating VCF files.

Type:

Path

(See `ParallelTaskRunner` for inherited attributes.)
Raises:
  • TypeError – If ref_annotation is not a Path object or output_prefix is not a string.

  • FileNotFoundError – If the reference annotation file does not exist.

  • IsADirectoryError – If the reference annotation file is not a file.

Note

This class requires bcftools to be installed and available in the system path.

__init__(input_path: Path, output_path: Path, ref_annotation: Path, max_workers: int | None = None, output_prefix: str = 'annotated-') None
annotate_vcf(input_file: Path, ref_annotation: Path, output_prefix: str = 'annotated-') None

Annotates a VCF file with identifiers from a reference annotation file using bcftools. This method takes an input VCF file and annotates it with IDs from a reference annotation file. The annotated VCF is saved to a new file with the specified prefix.

Parameters:
  • input_file (Path) – Path to the input VCF file to be annotated.

  • ref_annotation (Path) – Path to the reference annotation file used for annotation.

  • output_prefix (str (optional)) – Prefix to add to the output filename. Defaults to ‘annotated-‘.

Return type:

None

Raises:
  • FileExistsError – If the input file does not exist.

  • IsADirectoryError – If the input file is a directory, not a file.

  • TypeError – If ref_annotation is not a Path object or output_prefix is not a string.

  • subprocess.CalledProcessError – If the bcftools command fails.

  • FileNotFoundError – If the input file is not found during execution.

execute_task() None

Annotates normalized VCF files using a reference annotation file.

This method collects all normalized VCF files matching the pattern normalized-*dose.vcf.gz and annotates them using the provided reference annotation file. The annotated files will be saved with the specified output prefix.

Return type:

None

class ideal_genom.preprocessing.PostImputation.FilterVariants

Bases: ParallelTaskRunner

A class for filtering genetic variants in VCF/BCF files based on imputation quality (R² statistic). This class extends ParallelTaskRunner to provide parallel processing capabilities for filtering variants across multiple VCF files. It identifies variants with imputation quality below a specified R² threshold and removes them from the output files.

r2_threshold

The threshold value for the R² statistic. Variants with an R² value below this threshold will be filtered out.

Type:

float

output_prefix

The prefix to be added to output filenames. Default is ‘filtered-‘.

Type:

str, optional

(See `ParallelTaskRunner` for inherited attributes.)

Notes

The class searches for files matching the pattern *dose.vcf.gz in the input directory and processes them in parallel. The filtered output files will be saved in the output directory with the specified prefix added to their original filenames.

Note

bcftools must be installed and available in the system path

__init__(input_path: Path, output_path: Path, max_workers: int | None = None, r2_threshold: float = 0.3, output_prefix: str = 'filtered-') None
execute_task() None

Execute the task of filtering variants based on an R² threshold.

This method collects the necessary files with the pattern *dose.vcf.gz and runs the filtering task with the specified parameters.

Return type:

None

Raises:

TypeError – If r2_threshold is not a float or output_prefix is not a string.

Notes

The method uses internal methods _file_collector and _run_task to perform the filtering operation.

filter_variants(input_file: Path, r2_threshold: float, output_prefix: str = 'filtered-') None

Filter variants from a VCF/BCF file based on R2 imputation quality threshold.

This method takes an imputed VCF/BCF file and filters out variants with imputation quality (R2) below the specified threshold. The filtered output is saved as a compressed VCF.

Parameters:
  • input_file (Path) – Path to the input VCF/BCF file to be filtered

  • r2_threshold (float) – Minimum R2 imputation quality threshold (variants with R2 <= threshold will be removed)

  • output_prefix (str, optional) – Prefix to add to the output filename. Defaults to ‘filtered-‘.

Returns:

The method outputs a filtered VCF file but doesn’t return a value.

Return type:

None

Raises:
  • FileExistsError – If the input file does not exist

  • IsADirectoryError – If the input path is a directory, not a file

  • TypeError – If r2_threshold is not a float or output_prefix is not a string

Notes

  • The output file will be saved in the instance’s output_path directory with

  • the name constructed as: output_prefix + input_file.name

Note

This method requires bcftools to be installed and available in the system path.

Bases: object

A class for converting concatenated VCF files to PLINK binary format.

This class handles conversion of a concatenated VCF file to a PLINK binary ready for further analysis.

input_path

Directory path where the input VCF file is located.

Type:

Path

output_path

Directory path where the output files will be saved.

Type:

Path

input_name

Name of the input VCF file (must end with .vcf or .vcf.gz).

Type:

str

output_name

Name for the output file. If not provided, it will be derived from input_name.

Type:

str, optional

Raises:
  • TypeError – If input_path or output_path is not a Path object, or if input_name or output_name is not a string.

  • FileNotFoundError – If input_path or output_path does not exist.

  • NotADirectoryError – If input_path or output_path is not a directory.

  • ValueError – If input_name is not provided or if it doesn’t end with .vcf or .vcf.gz.

__init__(input_path: Path, input_name: str, output_path: Path, output_name: str) None

Convert a VCF file to PLINK binary format (.bed, .bim, .fam).

This method runs the plink2 command-line tool to convert the input VCF file to PLINK binary format, filtering for SNPs with standard ACGT alleles only.

Parameters:
  • double_id (bool, optional) – Whether to use the –double-id flag in plink2 command, which sets both FID and IID to the sample ID. Defaults to True.

  • threads (int, optional) – Number of CPU threads to use. If None, defaults to (available CPU cores - 2) or 10 if CPU count can’t be determined.

  • memory (int, optional) – Memory allocation in MB for plink2. If None, defaults to approximately 2/3 of available system memory.

Returns:

  • None

  • Side Effects

  • ————

  • Creates PLINK binary files (.bed, .bim, .fam) in the self.analysis_ready directory with the prefix self.output_name + “-nosex”.

Raises:

subprocess.CalledProcessError – If the plink2 command execution fails.

update_fam(for_fam_update_file: Path, threads: int | None = None) None

Add family information to the PLINK .fam file.

This method reads a family information file and updates the PLINK .fam file using the provided family information, via PLINK2.

Parameters:
  • for_fam_update_file (Path) – Path to the family information file (.fam or without suffix).

  • threads (int, optional) – Number of threads to use for PLINK2 (defaults to available CPUs - 2).

Return type:

None

class ideal_genom.preprocessing.PostImputation.IndexVCF

Bases: ParallelTaskRunner

A class for indexing VCF (Variant Call Format) files using bcftools in parallel.

This class extends ParallelTaskRunner to enable parallel processing of multiple VCF files. It creates index files that facilitate quick random access to compressed VCF files.

pattern

The glob pattern to match VCF files for indexing. Defaults to normalized-*dose.vcf.gz.

Type:

str, optional

(See `ParallelTaskRunner` for inherited attributes.)
Raises:

TypeError – If pattern is not a string.

Note

bcftools must be installed and available in the system path

__init__(input_path: Path, output_path: Path, max_workers: int | None = None, pattern: str = 'normalized-*dose.vcf.gz') None
execute_task() None

Execute the task of indexing VCF files.

This method collects files based on the provided pattern and indexes the VCF files.

Return type:

None

index_vcf(input_file: Path) None

Index a VCF file using bcftools.

This method creates an index for the specified VCF file using bcftools, which is required for efficient querying and processing of VCF files.

Parameters:

input_file (Path) – Path to the VCF file to be indexed. Must be an existing file.

Return type:

None

Raises:

FileExistsError – If the input file does not exist.

class ideal_genom.preprocessing.PostImputation.NormalizeVCF

Bases: ParallelTaskRunner

A class for normalizing VCF files post-imputation in parallel.

This class provides functionality to process VCF files by normalizing them using bcftools. It’s specifically designed to handle post-imputation VCF files and split multiallelic variants into separate entries. The class inherits from ParallelTaskRunner to enable parallel processing of multiple VCF files, which improves performance for large-scale genomic datasets.

Inherits all attributes from ParallelTaskRunner
output_prefixstr, optional

Prefix to add to the output files. Defaults to ‘uncompressed-‘.

Note

bcftools must be installed and available in the system path

__init__(input_path: Path, output_path: Path, max_workers: int | None = None, output_prefix: str = 'uncompressed-') None
execute_task() None

Execute the post-imputation normalization task on VCF files.

This method collects filtered dose VCF files matching the pattern filtered-*dose.vcf.gz and runs the normalization process on them. The normalized files will be prefixed with the provided output_prefix.

Parameters:

output_prefix (str (optional)) – Prefix to add to the output files. Defaults to ‘uncompressed-‘.

Raises:

TypeError – If output_prefix is not a string.

Return type:

None

normalize_vcf(input_file: Path, output_prefix: str = 'uncompressed-') None

Normalizes a VCF file using bcftools norm with the -m -any option.

This method takes a VCF file, performs normalization using bcftools to split multiallelic variants into separate entries, and outputs the normalized file with the specified prefix.

Parameters:
  • input_file (Path) – Path to the input VCF file to be normalized

  • output_prefix (str, optional) – Prefix for the output file name. Defaults to ‘uncompressed-’

Return type:

None

Raises:
  • FileExistsError – If the input file does not exist

  • IsADirectoryError – If the input file path points to a directory

  • TypeError – If output_prefix is not a string

Notes

The output file will be saved in the output_path directory with the naming convention: output_prefix + base_name, where base_name is derived from the input file.

class ideal_genom.preprocessing.PostImputation.ParallelTaskRunner

Bases: object

A base class for running parallel tasks on files.

This class provides the basic infrastructure for parallel processing of files using ThreadPoolExecutor. It handles file collection and parallel task execution while providing progress monitoring and logging.

input_path

Directory path where input files are located.

Type:

Path

output_path

Directory path where output files will be saved.

Type:

Path

max_workers

Maximum number of worker threads to use. Defaults to min(8, CPU count).

Type:

int

files

List of files to be processed.

Type:

List[Path]

Raises:
  • TypeError – If input_path or output_path are not Path objects.

  • FileNotFoundError – If input_path or output_path don’t exist.

  • NotADirectoryError – If input_path or output_path are not directories.

__init__(input_path: Path, output_path: Path, max_workers: int | None = None) None
_file_collector(filename_pattern: str) List[Path]

Collect files matching a given pattern from the input directory. This method finds all files matching the specified glob pattern within the input directory, sorts them, and stores the resulting list as an instance attribute.

Parameters:

filename_pattern (str) – A glob pattern string to match files (e.g., *.vcf.gz).

Returns:

A sorted list of Path objects for the files matching the pattern.

Return type:

List[Path]

Raises:
  • TypeError – If filename_pattern is not a string.

  • FileNotFoundError – If no files match the given pattern in the input directory.

Notes

The matched files are also stored in the instance attribute files.

_run_task(task_fn: Callable, task_args: Dict[str, Any], desc: str = 'Running tasks') None

Execute a task function across all files using parallel processing with ThreadPoolExecutor.

This method applies the given task function to each file in self.files concurrently, managing thread allocation, progress tracking, and error handling.

Parameters:
  • task_fn (Callable) – The function to execute for each file. First argument should accept a file, and it should accept **kwargs for additional arguments.

  • task_args (Dict[str, Any]) – Dictionary of keyword arguments to pass to the task function.

  • desc (str, optional) – Description for the progress bar and logging, by default “Running tasks”.

Return type:

None

Notes

  • Uses ThreadPoolExecutor with max_workers defined in class initialization

  • Provides progress tracking via tqdm

  • Logs timing information and any exceptions that occur

  • Does not raise exceptions from individual tasks but logs them instead

execute_task() None

Execute the specific post-imputation processing task.

This abstract method should be implemented by all subclasses to perform their specific post-imputation processing operations. Implementations should handle the execution logic for the particular task the subclass is designed to perform.

Return type:

None

Raises:

NotImplementedError – If the subclass does not implement this method.

class ideal_genom.preprocessing.PostImputation.ProcessVCF

Bases: object

ProcessVCF class for post-imputation processing of Variant Call Format (VCF) files.

This class provides a pipeline for processing VCF files through multiple sequential steps:

  1. Unzipping VCF files (if compressed)

  2. Filtering variants based on imputation quality (R²)

  3. Normalizing variant representation

  4. Normalizing against a reference genome

  5. Indexing the normalized VCF files

  6. Annotating variants with additional information

  7. Concatenating multiple VCF files into a single output file

input_path

Path to the directory containing input VCF files.

Type:

Path

output_path

Path to the directory where processed files will be saved.

Type:

Path

Raises:
  • TypeError – If input_path or output_path is not of type Path.

  • FileNotFoundError – If input_path or output_path does not exist.

  • NotADirectoryError – If input_path or output_path is not a directory.

Notes

  • A subdirectory named process_vcf is created inside the input_path directory for storing intermediate files during processing.

  • This class is designed to handle multiple sequential steps in VCF file processing, such as unzipping, filtering, normalizing, and annotating.

Note

This class requires bcftools to be installed and available in the system path.

__init__(input_path: Path, output_path: Path) None
execute_annotate(ref_annotation: Path, output_prefix: str = 'annotated-') None

Annotates a VCF file using a reference annotation file.

This method initializes an AnnotateVCF object and executes the annotation process on the current VCF file.

Parameters:
  • ref_annotation (Path) – Path to the reference annotation file.

  • output_prefix (str, optional) – Prefix to be added to the output file name. Default is ‘annotated-‘.

Return type:

None

execute_concatenate(output_name: str, max_threads: int | None = None) None

Concatenates annotated VCF files using bcftools concat.

This method finds all annotated VCF files in the process_vcf directory, sorts them, and concatenates them into a single compressed VCF file.

Parameters:
  • output_name (str) – Name of the output file.

  • max_threads (int (optional)) – Maximum number of threads to use for concatenation. If None, uses min(8, os.cpu_count()). Defaults to None.

Return type:

None

Raises:
  • TypeError – If output_name is not a string.

  • FileNotFoundError – If no annotated VCF files are found in the process_vcf directory.

  • ValueError – If max_threads is less than 1.

Notes

The output file will be saved in the output_path directory. The method uses the ‘bcftools concat’ command with Oz compression.

execute_filter(r2_threshold: float = 0.3) None

Execute a filtering operation on VCF data based on R² threshold.

This method filters variants in the processed VCF file by creating and executing a FilterVariants object with the specified R² threshold. Both input and output are set to the same process_vcf file.

Parameters:

r2_threshold (float, optional) – The R² threshold value for filtering variants. Variants with R² value below this threshold will be filtered out. Default is 0.3.

Return type:

None

execute_index(pattern: str = 'normalized-*dose.vcf.gz') None

Index VCF files matching a specific pattern.

This method creates an indexer for VCF files and executes the indexing task on files that match the given pattern in the process_vcf directory.

Parameters:

pattern (str, optional) – The glob pattern to match VCF files for indexing. Defaults to normalized-*dose.vcf.gz.

Return type:

None

execute_normalize() None

Normalizes the VCF file using the NormalizeVCF class.

This method creates a NormalizeVCF object with the current processed VCF file as both input and output, then executes the normalization task. The normalization process updates the VCF file in place.

Return type:

None

execute_reference_normalize(build: str = '38', ref_genome: Path | None = None) None

Normalize the VCF file against a reference genome.

This method creates a ReferenceNormalizeVCF object and executes the normalization task on the processed VCF file, using the specified genome build or reference file.

Parameters:
  • build (str, optional) – Genome build version to use. Defaults to ‘38’.

  • reference_file (Path, optional) – Path to a custom reference file. If provided, this will be used instead of the default reference for the specified build. Defaults to None.

Return type:

None

execute_unzip(password: str | None = None) None

Unzips a VCF file using the UnzipVCF utility.

This method creates an instance of UnzipVCF with the input and process paths from the current object, then executes the unzipping task. If the VCF file is password-protected, a password can be provided.

Parameters:

password (str, optional) – Password for the protected zip file. Defaults to None.

Return type:

None

class ideal_genom.preprocessing.PostImputation.ReferenceNormalizeVCF

Bases: ParallelTaskRunner

A class for normalizing VCF files using a reference genome in parallel.

This class extends ParallelTaskRunner to process multiple VCF files concurrently, normalizing them against a reference genome using bcftools. If a reference file is not provided, it will automatically download the appropriate reference genome based on the specified build.

build

Genome build version, either ‘37’ or ‘38’. Defaults to ‘38’.

Type:

str

output_prefix

Prefix to add to the output files. Defaults to ‘normalized-‘.

Type:

str

reference_file

Path to the reference genome file used for normalization. Defaults to None. If None or the file does not exist, it will be downloaded automatically based on the build.

Type:

Path, optional

(See `ParallelTaskRunner` for inherited attributes.)

Note

bcftools must be installed and available in the system path

__init__(input_path: Path, output_path: Path, max_workers: int | None = None, build: str = '38', output_prefix: str = 'normalized-', reference_file: Path | None = None) None
execute_task() None

Execute the post-imputation normalization task with reference genome.

This method normalizes VCF files using a reference genome. If no reference file is provided, it automatically downloads the appropriate reference genome based on the build parameter.

Return type:

None

Raises:

TypeError – If output_prefix is not a string.

Notes

This method collects uncompressed dose VCF files using a pattern match and normalizes them against the reference genome. The downloaded reference genomes come from the 1000 Genomes Project.

normalize_with_reference(input_file: Path, output_prefix: str = 'normalized-') None

Normalize a VCF file with a reference genome using bcftools.

This method takes an input VCF file and normalizes it against a reference genome using bcftools norm. The normalized output is compressed with gzip (-Oz).

Parameters:
  • input_file (Path) – Path to the input VCF file to be normalized.

  • output_prefix (str, default='normalized-') – Prefix to add to the output filename.

Returns:

The method doesn’t return a value but creates a normalized VCF file at the output_path location.

Return type:

None

Raises:
  • TypeError – If output_prefix is not a string.

  • subprocess.CalledProcessError – If the bcftools command fails.

  • FileNotFoundError – If the input file cannot be found.

Notes

The output filename is constructed from the output_prefix and the base name extracted from the input filename (after the first hyphen).

class ideal_genom.preprocessing.PostImputation.UnzipVCF

Bases: ParallelTaskRunner

A class for unzipping VCF (Variant Call Format) files after imputation, with support for parallel processing.

This class extends ParallelTaskRunner to efficiently extract VCF files from zip archives, including password-protected ones. It collects all zip files in the working directory and extracts their contents to the output directory.

(See `ParallelTaskRunner` for inherited attributes.)

Notes

  • VCF files are commonly used in genomics for storing gene sequence variations

  • The class only extracts files (not directories) from the zip archives

  • All extracted files are placed directly in the output directory without preserving paths

  • This class is designed for post-imputation processing in genetic data pipelines

__init__(input_path: Path, output_path: Path, max_workers: int | None = None, password: str | None = None) None
execute_task() None

Execute the post-imputation unzipping task on VCF files.

This method performs the following steps: 1. Collects all zip files in the working directory 2. Unzips the VCF files, using the provided password if necessary

Parameters:

password (Optional[str]) – Password to decrypt zip files if they are password-protected. Default is None.

Returns:

This method doesn’t return any value.

Return type:

None

unzip_files(zip_path: Path, password: str | None = None, output_prefix: str = 'unzipped-') None

Extract files from a password-protected zip archive. This method extracts all non-directory files from the specified zip archive to the class’s output_path directory. If the zip file is password-protected, provide the password as a parameter.

Parameters:
  • zip_path (Path) – Path to the zip file to be extracted

  • password (Optional[str], optional) – Password for the zip file, None if the file is not password-protected. Defaults to None.

  • output_prefix (str, optional) – Prefix to add to extracted filenames. Defaults to ‘unzipped-‘.

Return type:

None

Raises:
  • zipfile.BadZipFile – If the zip file is corrupted or password is incorrect

  • FileNotFoundError – If the zip file does not exist

  • PermissionError – If there are insufficient permissions to read the zip file or write to output directory

Notes

Files are extracted to the output_path directory of the class instance. Only files (not directories) are extracted from the archive. File paths are not preserved - all files are placed directly in output_path. The output_prefix is added to the beginning of each extracted filename.