PostImputation Module
Module to run the post-imputation processing tasks on VCF files.
This module provides classes for running various post-imputation tasks in parallel, including unzipping VCF files, filtering variants based on imputation quality, normalizing VCF files, and indexing VCF files. It uses the ThreadPoolExecutor for parallel execution and tqdm for progress tracking. The tasks are designed to handle large genomic datasets efficiently by leveraging multi-threading.
It also includes functionality to download and use reference genomes for normalization, and convert VCF file into a format suitable for further analysis, that is PLINK binary files.
- class ideal_genom.preprocessing.PostImputation.AnnotateVCF
Bases:
ParallelTaskRunnerA parallel task runner for annotating normalized VCF files using reference annotation.
This class provides functionality to annotate normalized VCF files with identifiers from a reference annotation file using bcftools. It processes multiple VCF files in parallel, making it efficient for large genomic datasets.
The class identifies all normalized VCF files matching a specified pattern and annotates them using the provided reference annotation file. It adds identifiers from the reference file to the VCF entries.
- output_prefix
Prefix to add to the output files. Defaults to ‘annotated-‘.
- Type:
str, optional
- ref_annotation
Path to the reference annotation file used for annotating VCF files.
- Type:
Path
- (See `ParallelTaskRunner` for inherited attributes.)
- Raises:
TypeError – If ref_annotation is not a Path object or output_prefix is not a string.
FileNotFoundError – If the reference annotation file does not exist.
IsADirectoryError – If the reference annotation file is not a file.
Note
This class requires bcftools to be installed and available in the system path.
- __init__(input_path: Path, output_path: Path, ref_annotation: Path, max_workers: int | None = None, output_prefix: str = 'annotated-') None
- annotate_vcf(input_file: Path, ref_annotation: Path, output_prefix: str = 'annotated-') None
Annotates a VCF file with identifiers from a reference annotation file using bcftools. This method takes an input VCF file and annotates it with IDs from a reference annotation file. The annotated VCF is saved to a new file with the specified prefix.
- Parameters:
input_file (Path) – Path to the input VCF file to be annotated.
ref_annotation (Path) – Path to the reference annotation file used for annotation.
output_prefix (str (optional)) – Prefix to add to the output filename. Defaults to ‘annotated-‘.
- Return type:
None
- Raises:
FileExistsError – If the input file does not exist.
IsADirectoryError – If the input file is a directory, not a file.
TypeError – If ref_annotation is not a Path object or output_prefix is not a string.
subprocess.CalledProcessError – If the bcftools command fails.
FileNotFoundError – If the input file is not found during execution.
- execute_task() None
Annotates normalized VCF files using a reference annotation file.
This method collects all normalized VCF files matching the pattern
normalized-*dose.vcf.gzand annotates them using the provided reference annotation file. The annotated files will be saved with the specified output prefix.- Return type:
None
- class ideal_genom.preprocessing.PostImputation.FilterVariants
Bases:
ParallelTaskRunnerA class for filtering genetic variants in VCF/BCF files based on imputation quality (R² statistic). This class extends ParallelTaskRunner to provide parallel processing capabilities for filtering variants across multiple VCF files. It identifies variants with imputation quality below a specified R² threshold and removes them from the output files.
- r2_threshold
The threshold value for the R² statistic. Variants with an R² value below this threshold will be filtered out.
- Type:
float
- output_prefix
The prefix to be added to output filenames. Default is ‘filtered-‘.
- Type:
str, optional
- (See `ParallelTaskRunner` for inherited attributes.)
Notes
The class searches for files matching the pattern
*dose.vcf.gzin the input directory and processes them in parallel. The filtered output files will be saved in the output directory with the specified prefix added to their original filenames.Note
bcftools must be installed and available in the system path
- __init__(input_path: Path, output_path: Path, max_workers: int | None = None, r2_threshold: float = 0.3, output_prefix: str = 'filtered-') None
- execute_task() None
Execute the task of filtering variants based on an R² threshold.
This method collects the necessary files with the pattern
*dose.vcf.gzand runs the filtering task with the specified parameters.- Return type:
None
- Raises:
TypeError – If r2_threshold is not a float or output_prefix is not a string.
Notes
The method uses internal methods _file_collector and _run_task to perform the filtering operation.
- filter_variants(input_file: Path, r2_threshold: float, output_prefix: str = 'filtered-') None
Filter variants from a VCF/BCF file based on R2 imputation quality threshold.
This method takes an imputed VCF/BCF file and filters out variants with imputation quality (R2) below the specified threshold. The filtered output is saved as a compressed VCF.
- Parameters:
input_file (Path) – Path to the input VCF/BCF file to be filtered
r2_threshold (float) – Minimum R2 imputation quality threshold (variants with R2 <= threshold will be removed)
output_prefix (str, optional) – Prefix to add to the output filename. Defaults to ‘filtered-‘.
- Returns:
The method outputs a filtered VCF file but doesn’t return a value.
- Return type:
None
- Raises:
FileExistsError – If the input file does not exist
IsADirectoryError – If the input path is a directory, not a file
TypeError – If r2_threshold is not a float or output_prefix is not a string
Notes
The output file will be saved in the instance’s output_path directory with
the name constructed as: output_prefix + input_file.name
Note
This method requires bcftools to be installed and available in the system path.
- class ideal_genom.preprocessing.PostImputation.GetPLINK
Bases:
objectA class for converting concatenated VCF files to PLINK binary format.
This class handles conversion of a concatenated VCF file to a PLINK binary ready for further analysis.
- input_path
Directory path where the input VCF file is located.
- Type:
Path
- output_path
Directory path where the output files will be saved.
- Type:
Path
- input_name
Name of the input VCF file (must end with .vcf or .vcf.gz).
- Type:
str
- output_name
Name for the output file. If not provided, it will be derived from input_name.
- Type:
str, optional
- Raises:
TypeError – If input_path or output_path is not a Path object, or if input_name or output_name is not a string.
FileNotFoundError – If input_path or output_path does not exist.
NotADirectoryError – If input_path or output_path is not a directory.
ValueError – If input_name is not provided or if it doesn’t end with .vcf or .vcf.gz.
- __init__(input_path: Path, input_name: str, output_path: Path, output_name: str) None
- convert_vcf_to_plink(double_id: bool = True, threads: int | None = None, memory: int | None = None) None
Convert a VCF file to PLINK binary format (.bed, .bim, .fam).
This method runs the plink2 command-line tool to convert the input VCF file to PLINK binary format, filtering for SNPs with standard ACGT alleles only.
- Parameters:
double_id (bool, optional) – Whether to use the –double-id flag in plink2 command, which sets both FID and IID to the sample ID. Defaults to True.
threads (int, optional) – Number of CPU threads to use. If None, defaults to (available CPU cores - 2) or 10 if CPU count can’t be determined.
memory (int, optional) – Memory allocation in MB for plink2. If None, defaults to approximately 2/3 of available system memory.
- Returns:
None
Side Effects
————
Creates PLINK binary files (.bed, .bim, .fam) in the self.analysis_ready directory with the prefix self.output_name + “-nosex”.
- Raises:
subprocess.CalledProcessError – If the plink2 command execution fails.
- update_fam(for_fam_update_file: Path, threads: int | None = None) None
Add family information to the PLINK .fam file.
This method reads a family information file and updates the PLINK .fam file using the provided family information, via PLINK2.
- Parameters:
for_fam_update_file (Path) – Path to the family information file (.fam or without suffix).
threads (int, optional) – Number of threads to use for PLINK2 (defaults to available CPUs - 2).
- Return type:
None
- class ideal_genom.preprocessing.PostImputation.IndexVCF
Bases:
ParallelTaskRunnerA class for indexing VCF (Variant Call Format) files using bcftools in parallel.
This class extends ParallelTaskRunner to enable parallel processing of multiple VCF files. It creates index files that facilitate quick random access to compressed VCF files.
- pattern
The glob pattern to match VCF files for indexing. Defaults to
normalized-*dose.vcf.gz.- Type:
str, optional
- (See `ParallelTaskRunner` for inherited attributes.)
- Raises:
TypeError – If pattern is not a string.
Note
bcftools must be installed and available in the system path
- __init__(input_path: Path, output_path: Path, max_workers: int | None = None, pattern: str = 'normalized-*dose.vcf.gz') None
- execute_task() None
Execute the task of indexing VCF files.
This method collects files based on the provided pattern and indexes the VCF files.
- Return type:
None
- index_vcf(input_file: Path) None
Index a VCF file using bcftools.
This method creates an index for the specified VCF file using bcftools, which is required for efficient querying and processing of VCF files.
- Parameters:
input_file (Path) – Path to the VCF file to be indexed. Must be an existing file.
- Return type:
None
- Raises:
FileExistsError – If the input file does not exist.
- class ideal_genom.preprocessing.PostImputation.NormalizeVCF
Bases:
ParallelTaskRunnerA class for normalizing VCF files post-imputation in parallel.
This class provides functionality to process VCF files by normalizing them using bcftools. It’s specifically designed to handle post-imputation VCF files and split multiallelic variants into separate entries. The class inherits from ParallelTaskRunner to enable parallel processing of multiple VCF files, which improves performance for large-scale genomic datasets.
- Inherits all attributes from ParallelTaskRunner
- output_prefixstr, optional
Prefix to add to the output files. Defaults to ‘uncompressed-‘.
Note
bcftools must be installed and available in the system path
- __init__(input_path: Path, output_path: Path, max_workers: int | None = None, output_prefix: str = 'uncompressed-') None
- execute_task() None
Execute the post-imputation normalization task on VCF files.
This method collects filtered dose VCF files matching the pattern
filtered-*dose.vcf.gzand runs the normalization process on them. The normalized files will be prefixed with the provided output_prefix.- Parameters:
output_prefix (str (optional)) – Prefix to add to the output files. Defaults to ‘uncompressed-‘.
- Raises:
TypeError – If output_prefix is not a string.
- Return type:
None
- normalize_vcf(input_file: Path, output_prefix: str = 'uncompressed-') None
Normalizes a VCF file using bcftools norm with the -m -any option.
This method takes a VCF file, performs normalization using bcftools to split multiallelic variants into separate entries, and outputs the normalized file with the specified prefix.
- Parameters:
input_file (Path) – Path to the input VCF file to be normalized
output_prefix (str, optional) – Prefix for the output file name. Defaults to ‘uncompressed-’
- Return type:
None
- Raises:
FileExistsError – If the input file does not exist
IsADirectoryError – If the input file path points to a directory
TypeError – If output_prefix is not a string
Notes
The output file will be saved in the output_path directory with the naming convention: output_prefix + base_name, where base_name is derived from the input file.
- class ideal_genom.preprocessing.PostImputation.ParallelTaskRunner
Bases:
objectA base class for running parallel tasks on files.
This class provides the basic infrastructure for parallel processing of files using ThreadPoolExecutor. It handles file collection and parallel task execution while providing progress monitoring and logging.
- input_path
Directory path where input files are located.
- Type:
Path
- output_path
Directory path where output files will be saved.
- Type:
Path
- max_workers
Maximum number of worker threads to use. Defaults to min(8, CPU count).
- Type:
int
- files
List of files to be processed.
- Type:
List[Path]
- Raises:
TypeError – If input_path or output_path are not Path objects.
FileNotFoundError – If input_path or output_path don’t exist.
NotADirectoryError – If input_path or output_path are not directories.
- __init__(input_path: Path, output_path: Path, max_workers: int | None = None) None
- _file_collector(filename_pattern: str) List[Path]
Collect files matching a given pattern from the input directory. This method finds all files matching the specified glob pattern within the input directory, sorts them, and stores the resulting list as an instance attribute.
- Parameters:
filename_pattern (str) – A glob pattern string to match files (e.g.,
*.vcf.gz).- Returns:
A sorted list of Path objects for the files matching the pattern.
- Return type:
List[Path]
- Raises:
TypeError – If filename_pattern is not a string.
FileNotFoundError – If no files match the given pattern in the input directory.
Notes
The matched files are also stored in the instance attribute files.
- _run_task(task_fn: Callable, task_args: Dict[str, Any], desc: str = 'Running tasks') None
Execute a task function across all files using parallel processing with ThreadPoolExecutor.
This method applies the given task function to each file in self.files concurrently, managing thread allocation, progress tracking, and error handling.
- Parameters:
task_fn (Callable) – The function to execute for each file. First argument should accept a file, and it should accept
**kwargsfor additional arguments.task_args (Dict[str, Any]) – Dictionary of keyword arguments to pass to the task function.
desc (str, optional) – Description for the progress bar and logging, by default “Running tasks”.
- Return type:
None
Notes
Uses ThreadPoolExecutor with max_workers defined in class initialization
Provides progress tracking via tqdm
Logs timing information and any exceptions that occur
Does not raise exceptions from individual tasks but logs them instead
- execute_task() None
Execute the specific post-imputation processing task.
This abstract method should be implemented by all subclasses to perform their specific post-imputation processing operations. Implementations should handle the execution logic for the particular task the subclass is designed to perform.
- Return type:
None
- Raises:
NotImplementedError – If the subclass does not implement this method.
- class ideal_genom.preprocessing.PostImputation.ProcessVCF
Bases:
objectProcessVCF class for post-imputation processing of Variant Call Format (VCF) files.
This class provides a pipeline for processing VCF files through multiple sequential steps:
Unzipping VCF files (if compressed)
Filtering variants based on imputation quality (R²)
Normalizing variant representation
Normalizing against a reference genome
Indexing the normalized VCF files
Annotating variants with additional information
Concatenating multiple VCF files into a single output file
- input_path
Path to the directory containing input VCF files.
- Type:
Path
- output_path
Path to the directory where processed files will be saved.
- Type:
Path
- Raises:
TypeError – If input_path or output_path is not of type Path.
FileNotFoundError – If input_path or output_path does not exist.
NotADirectoryError – If input_path or output_path is not a directory.
Notes
A subdirectory named process_vcf is created inside the input_path directory for storing intermediate files during processing.
This class is designed to handle multiple sequential steps in VCF file processing, such as unzipping, filtering, normalizing, and annotating.
Note
This class requires bcftools to be installed and available in the system path.
- __init__(input_path: Path, output_path: Path) None
- execute_annotate(ref_annotation: Path, output_prefix: str = 'annotated-') None
Annotates a VCF file using a reference annotation file.
This method initializes an AnnotateVCF object and executes the annotation process on the current VCF file.
- Parameters:
ref_annotation (Path) – Path to the reference annotation file.
output_prefix (str, optional) – Prefix to be added to the output file name. Default is ‘annotated-‘.
- Return type:
None
- execute_concatenate(output_name: str, max_threads: int | None = None) None
Concatenates annotated VCF files using bcftools concat.
This method finds all annotated VCF files in the process_vcf directory, sorts them, and concatenates them into a single compressed VCF file.
- Parameters:
output_name (str) – Name of the output file.
max_threads (int (optional)) – Maximum number of threads to use for concatenation. If None, uses min(8, os.cpu_count()). Defaults to None.
- Return type:
None
- Raises:
TypeError – If output_name is not a string.
FileNotFoundError – If no annotated VCF files are found in the process_vcf directory.
ValueError – If max_threads is less than 1.
Notes
The output file will be saved in the output_path directory. The method uses the ‘bcftools concat’ command with Oz compression.
- execute_filter(r2_threshold: float = 0.3) None
Execute a filtering operation on VCF data based on R² threshold.
This method filters variants in the processed VCF file by creating and executing a FilterVariants object with the specified R² threshold. Both input and output are set to the same process_vcf file.
- Parameters:
r2_threshold (float, optional) – The R² threshold value for filtering variants. Variants with R² value below this threshold will be filtered out. Default is 0.3.
- Return type:
None
- execute_index(pattern: str = 'normalized-*dose.vcf.gz') None
Index VCF files matching a specific pattern.
This method creates an indexer for VCF files and executes the indexing task on files that match the given pattern in the process_vcf directory.
- Parameters:
pattern (str, optional) – The glob pattern to match VCF files for indexing. Defaults to
normalized-*dose.vcf.gz.- Return type:
None
- execute_normalize() None
Normalizes the VCF file using the NormalizeVCF class.
This method creates a NormalizeVCF object with the current processed VCF file as both input and output, then executes the normalization task. The normalization process updates the VCF file in place.
- Return type:
None
- execute_reference_normalize(build: str = '38', ref_genome: Path | None = None) None
Normalize the VCF file against a reference genome.
This method creates a ReferenceNormalizeVCF object and executes the normalization task on the processed VCF file, using the specified genome build or reference file.
- Parameters:
build (str, optional) – Genome build version to use. Defaults to ‘38’.
reference_file (Path, optional) – Path to a custom reference file. If provided, this will be used instead of the default reference for the specified build. Defaults to None.
- Return type:
None
- execute_unzip(password: str | None = None) None
Unzips a VCF file using the UnzipVCF utility.
This method creates an instance of UnzipVCF with the input and process paths from the current object, then executes the unzipping task. If the VCF file is password-protected, a password can be provided.
- Parameters:
password (str, optional) – Password for the protected zip file. Defaults to None.
- Return type:
None
- class ideal_genom.preprocessing.PostImputation.ReferenceNormalizeVCF
Bases:
ParallelTaskRunnerA class for normalizing VCF files using a reference genome in parallel.
This class extends ParallelTaskRunner to process multiple VCF files concurrently, normalizing them against a reference genome using bcftools. If a reference file is not provided, it will automatically download the appropriate reference genome based on the specified build.
- build
Genome build version, either ‘37’ or ‘38’. Defaults to ‘38’.
- Type:
str
- output_prefix
Prefix to add to the output files. Defaults to ‘normalized-‘.
- Type:
str
- reference_file
Path to the reference genome file used for normalization. Defaults to None. If None or the file does not exist, it will be downloaded automatically based on the build.
- Type:
Path, optional
- (See `ParallelTaskRunner` for inherited attributes.)
Note
bcftools must be installed and available in the system path
- __init__(input_path: Path, output_path: Path, max_workers: int | None = None, build: str = '38', output_prefix: str = 'normalized-', reference_file: Path | None = None) None
- execute_task() None
Execute the post-imputation normalization task with reference genome.
This method normalizes VCF files using a reference genome. If no reference file is provided, it automatically downloads the appropriate reference genome based on the build parameter.
- Return type:
None
- Raises:
TypeError – If output_prefix is not a string.
Notes
This method collects uncompressed dose VCF files using a pattern match and normalizes them against the reference genome. The downloaded reference genomes come from the 1000 Genomes Project.
- normalize_with_reference(input_file: Path, output_prefix: str = 'normalized-') None
Normalize a VCF file with a reference genome using bcftools.
This method takes an input VCF file and normalizes it against a reference genome using bcftools norm. The normalized output is compressed with gzip (-Oz).
- Parameters:
input_file (Path) – Path to the input VCF file to be normalized.
output_prefix (str, default='normalized-') – Prefix to add to the output filename.
- Returns:
The method doesn’t return a value but creates a normalized VCF file at the output_path location.
- Return type:
None
- Raises:
TypeError – If output_prefix is not a string.
subprocess.CalledProcessError – If the bcftools command fails.
FileNotFoundError – If the input file cannot be found.
Notes
The output filename is constructed from the output_prefix and the base name extracted from the input filename (after the first hyphen).
- class ideal_genom.preprocessing.PostImputation.UnzipVCF
Bases:
ParallelTaskRunnerA class for unzipping VCF (Variant Call Format) files after imputation, with support for parallel processing.
This class extends ParallelTaskRunner to efficiently extract VCF files from zip archives, including password-protected ones. It collects all zip files in the working directory and extracts their contents to the output directory.
- (See `ParallelTaskRunner` for inherited attributes.)
Notes
VCF files are commonly used in genomics for storing gene sequence variations
The class only extracts files (not directories) from the zip archives
All extracted files are placed directly in the output directory without preserving paths
This class is designed for post-imputation processing in genetic data pipelines
- __init__(input_path: Path, output_path: Path, max_workers: int | None = None, password: str | None = None) None
- execute_task() None
Execute the post-imputation unzipping task on VCF files.
This method performs the following steps: 1. Collects all zip files in the working directory 2. Unzips the VCF files, using the provided password if necessary
- Parameters:
password (Optional[str]) – Password to decrypt zip files if they are password-protected. Default is None.
- Returns:
This method doesn’t return any value.
- Return type:
None
- unzip_files(zip_path: Path, password: str | None = None, output_prefix: str = 'unzipped-') None
Extract files from a password-protected zip archive. This method extracts all non-directory files from the specified zip archive to the class’s output_path directory. If the zip file is password-protected, provide the password as a parameter.
- Parameters:
zip_path (Path) – Path to the zip file to be extracted
password (Optional[str], optional) – Password for the zip file, None if the file is not password-protected. Defaults to None.
output_prefix (str, optional) – Prefix to add to extracted filenames. Defaults to ‘unzipped-‘.
- Return type:
None
- Raises:
zipfile.BadZipFile – If the zip file is corrupted or password is incorrect
FileNotFoundError – If the zip file does not exist
PermissionError – If there are insufficient permissions to read the zip file or write to output directory
Notes
Files are extracted to the output_path directory of the class instance. Only files (not directories) are extracted from the archive. File paths are not preserved - all files are placed directly in output_path. The output_prefix is added to the beginning of each extracted filename.