diff --git a/README.rst b/README.rst index 95e594e..4ddea58 100644 --- a/README.rst +++ b/README.rst @@ -1,27 +1,51 @@ - .. image:: dysgu/logo.png :align: left +.. |Generic badge| image:: https://img.shields.io/badge/install%20with-bioconda-brightgreen.svg + :target: http://bioconda.github.io/recipes/dysgu/README.html + +.. |Li badge| image:: https://anaconda.org/bioconda/dysgu/badges/license.svg + :target: https://github.com/kcleal/dysgu/blob/master/LICENSE.md + dysgu (pronounced *duss-key*) is a set of command line tools and `python-API `_, for calling structural variants using paired-end or long read sequencing data. See recent long-read benchmarks `here `_, and `here `_. +|Generic badge| |Li badge| -βš™οΈ Installation ---------------- +`βš™οΈ Installation`_ -|Generic badge| |Li badge| +`πŸš€ Quick start`_ -.. |Generic badge| image:: https://img.shields.io/badge/install%20with-bioconda-brightgreen.svg - :target: http://bioconda.github.io/recipes/dysgu/README.html +`🎯 Calling SVs`_ -.. |Li badge| image:: https://anaconda.org/bioconda/dysgu/badges/license.svg - :target: https://github.com/kcleal/dysgu/blob/master/LICENSE.md +`🚦Filtering SVs`_ + +`βž• Merging SVs`_ + +`β™‹ Somatic SVs / tumor-normal calling / pool-of-normals`_ + +`πŸ” Genotype list of sites`_ + +`πŸ”ͺ Regions of interest / excluding regions`_ -Dysgu requires Python >=3.7 - 3.10 plus htslib and has been tested on linux and MacOS. +`πŸ”§ Useful parameters`_ + +`πŸš‘ Issues`_ + +`🐍 Python API`_ + +`πŸŽ“ Citation`_ + +---- + +βš™οΈ Installation +--------------- + +Dysgu requires Python >=3.8 - 3.10 plus htslib and has been tested on linux and MacOS. The list of python packages needed can be found in requirements.txt. To install:: - + conda install -c bioconda -c conda-forge dysgu Or, fetch from PyPI:: @@ -38,8 +62,8 @@ Run tests:: $ dysgu test -πŸš€ Usage --------- +πŸš€ Quick start +-------------- Available commands:: dysgu run # Run using default arguments, wraps fetch and call commands @@ -57,8 +81,8 @@ For help use:: To use the python-API see the `documentation `_, or `jupyter notebook `_, -πŸ“– User Guide -------------- +🎯 Calling SVs +-------------- Paired-end reads **************** @@ -104,25 +128,30 @@ the insert size can be specified manually using the -I option:: dysgu call --ibam all_reads.bam reference.fa temp_dir temp_dir/temp_dir.dysgu_reads.bam > svs.vcf +Models available +~~~~~~~~~~~~~~~~~ +There are a choice of three models per read type. By default, a diploid model will be used that takes into account +changes in read-depth around break sites. This model is +preferred as it often attains higher precision in germline whole-genome samples. However, for somatic samples (e.g. tumors) copy +number changes, poly-clonality or poly-ploidy can lead to events with low allelic fraction. For such samples, a non-diploid +model might work better. This is selected by applying `--diploid False`. A model with no information on allelic fraction +will then be utilized. -Merging SVs from multiple files -------------------------------- -If you plan on merging samples, it is recommended that the '-v2' option be used when running the 'run/call' modules; this will -ensure that all consensus sequences will be reported in the vcf file to help with downstream merging. -Multiple output vcf files can be merged, e.g. tumor.vcf and normal.vcf, or illumina.vcf and pacbio.vcf:: - - dysgu merge sample1.vcf sample2.vcf > combined.vcf +Finally, if the diploid/non-diploid models are not picking up your SV of interest, a simpler model can be used with the +`--contigs False` option. This model has all sequence-related metrics removed, so only read-support information is +retained. In general the performance of models follows diploid > non-diploid > no-contigs. -Merging SVs between platforms at multiallelic/complex sites is still tricky and there is a trade off between under merging -(leading to duplication) and over merging (leading to loss of multiallelic/complex SVs). Setting the '--merge-within True' option will perform -a single round of merging for each input file before merging across input files. This will shift the balance to over merging, but reduces the -problem of duplication:: +It is also possible to switch models post-calling using the python-API. For an example of how to do this, +see the dysgu_api_demon.ipynb - dysgu merge --merge-within True pacbio.vcf illumina.vcf > combined.vcf +Resource requirements +~~~~~~~~~~~~~~~~~~~~~ +Using a single core and depending on hard-drive speed, dysgu usually takes ~1h to analyse a 30X coverage genome of 150 bp paired-end reads and +uses < 6 GB memory. Also note that when `fetch` is utilized (or using run command), a large temp file is generated consisting of SV-associated reads >5 Gb in size. -Filtering SVs -------------- +🚦Filtering SVs +---------------- Remove events with low probability:: dysgu filter --min-prob 0.2 input.vcf > output.vcf @@ -136,8 +165,28 @@ Re-label events with probability >= 0.3 as PASS:: dysgu filter --pass-prob 0.3 input.vcf > output.vcf -Somatic SVs / tumor-normal calling / pool-of-normals ----------------------------------------------------- +βž• Merging SVs +-------------- +If you plan on merging samples, it is recommended that the '-v2' option be used when running the 'run/call' modules; this will +ensure that all consensus sequences will be reported in the vcf file to help with downstream merging. +Multiple output vcf files can be merged, e.g. tumor.vcf and normal.vcf, or illumina.vcf and pacbio.vcf:: + + dysgu merge *.vcf > combined.vcf + +For large numbers of samples, an input list can be used, and merging can be performed in parallel (by chromosome and SV type):: + + dysgu merge -p24 --input-list samples.txt --wd wd > combined.vcf + +Merging SVs between platforms at multiallelic/complex sites is still tricky and there is a trade off between under merging +(leading to duplication) and over merging (leading to loss of multiallelic/complex SVs). Setting the '--merge-within True' option will perform +a single round of merging for each input file before merging across input files. This will shift the balance to over merging, but reduces the +problem of duplication:: + + dysgu merge --merge-within True pacbio.vcf illumina.vcf > combined.vcf + + +β™‹ Somatic SVs / tumor-normal calling / pool-of-normals +------------------------------------------------------ For tumor/normal pairs, the recommended workflow is to call SVs independently in each sample, then obtain tumor specific (somatic) SVs by running dysgu filter:: @@ -172,7 +221,7 @@ Also a target VCF can be filtered against a normal vcf if desired (without align By default, SV calls with a PROB value < ``--min-prob`` are removed from the final output, and SV calls with a PROB value >= ``--pass-prob`` will be re-labelled as PASS in the output. However, these -thresholds currently require tuning depending on sequencing platform, coverage and the size of the cohort used for filtering. +thresholds currently require tuning depending on sequencing platform, coverage and the size of the cohort used for filtering. Suitable values for `--pass-prob` often lie in the range 0.2 - 0.4. For paired-end reads, a pass-prob of around 0.35 can work well, whereas for long-reads a lower threshold of 0.2 can work better e.g:: dysgu filter --pass-prob 0.2 --min-prob 0.1 --normal-vcf normal.vcf tumour.vcf normal.bam > somatic.vcf @@ -180,56 +229,11 @@ Suitable values for `--pass-prob` often lie in the range 0.2 - 0.4. For paired-e To quickly test and visualise different filtering thresholds, output can be piped to the command line tool `GW `_, which will display the results to screen for inspection:: dysgu filter --pass-prob 0.2 filtered.vcf | \ - gw hg38 -b normal.bam -b tumor.bam -v - + gw hg38 -b normal.bam -b tumor.bam -v - -Models available ----------------- -There are a choice of three models per read type. By default, a diploid model will be used that takes into account -changes in read-depth around break sites. This model is -preferred as it often attains higher precision in germline whole-genome samples. However, for somatic samples (e.g. tumors) copy -number changes, poly-clonality or poly-ploidy can lead to events with low allelic fraction. For such samples, a non-diploid -model might work better. This is selected by applying `--diploid False`. A model with no information on allelic fraction -will then be utilized. - -Finally, if the diploid/non-diploid models are not picking up your SV of interest, a simpler model can be used with the -`--contigs False` option. This model has all sequence-related metrics removed, so only read-support information is -retained. In general the performance of models follows diploid > non-diploid > no-contigs. - -It is also possible to switch models post-calling using the python-API. For an example of how to do this, -see the dysgu_api_demon.ipynb - - -Specifying regions of interest / excluding regions --------------------------------------------------- -Regions of the genome can be skipped from analysis by providing a .bed file using the `--exclude` option. This option -takes precedence over the options detailed below, and acts as a hard filter, removing regions of the genome from analysis. - -Dysgu provides two ways to analyse regions of interest. Target genomic regions can be specified using a .bed file with -the --search option. This will also act as a hard filter, limiting analysis only to those regions, while regions outside -will be ignored. - -Alternatively, regions can be specified using the --regions option (.bed file). If this option is used, all reads not -excluded by the --exclude/--search options will be analysed. Variants will then be -labelled in the output vcf according to their intersection with those regions. The INFO > KIND column will be labelled -with either 'intra-regional' - both SV ends within same interval, 'extra-regional' - neither SV end in an interval, -'inter-regional' - SV ends in separate intervals, or 'hemi-regional' - one SV end in an interval. These labels may be -useful for some targeted sequencing experiments. - -Additionally, there is also the --regions-only option. The option is only available for 'dysgu call'. If this is set to 'True', then dysgu will search all reads in ---regions and also analyse any mate-pairs that do not overlap those regions of interest. This method can be quicker to -run when the regions of interest are small relative to the genome. However, this option can consume a lot of memory if the -regions are large, so use with caution. - -For deep targeted sequencing experiments, the --regions-mm-only option can also be used, which can help prevent over -clustering of reads. When set to 'True', dysgu will only use minimizer based clustering within the intervals specified -by --regions. - -Also of note, it is possible to use --exclude, --search, and --regions at the same time. - - -Genotype list of sites ----------------------- +πŸ” Genotype list of sites +------------------------- Calls from multiple samples can be merged into a unified site list:: dysgu run -v2 ref.fa wd1 sample1.bam > sample1.vcf @@ -262,9 +266,41 @@ in --sites is also a true variant in the input sample. For related individuals o If the --sites vcf file is from a previous dysgu run, the PROB values can be utilized by setting '--parse-probs True'. This option can work well when using dysgu calls from a related individual. +Also of note, the ``--ignore-sample-sites`` option is set to True by default. This results in the input sample name (from the bam SM tag) + being ignored from a multi-sample sites file. This may not be the deired behavior if trying to re-genotype a sample using different + read types, for example. + + +πŸ”ͺ Regions of interest / excluding regions +------------------------------------------ +Regions of the genome can be skipped from analysis by providing a .bed file using the `--exclude` option. This option +takes precedence over the options detailed below, and acts as a hard filter, removing regions of the genome from analysis. + +Dysgu provides two ways to analyse regions of interest. Target genomic regions can be specified using a .bed file with +the --search option. This will also act as a hard filter, limiting analysis only to those regions, while regions outside +will be ignored. + +Alternatively, regions can be specified using the --regions option (.bed file). If this option is used, all reads not +excluded by the --exclude/--search options will be analysed. Variants will then be +labelled in the output vcf according to their intersection with those regions. The INFO > KIND column will be labelled +with either 'intra-regional' - both SV ends within same interval, 'extra-regional' - neither SV end in an interval, +'inter-regional' - SV ends in separate intervals, or 'hemi-regional' - one SV end in an interval. These labels may be +useful for some targeted sequencing experiments. + +Additionally, there is also the --regions-only option. The option is only available for 'dysgu call'. If this is set to 'True', then dysgu will search all reads in +--regions and also analyse any mate-pairs that do not overlap those regions of interest. This method can be quicker to +run when the regions of interest are small relative to the genome. However, this option can consume a lot of memory if the +regions are large, so use with caution. + +For deep targeted sequencing experiments, the --regions-mm-only option can also be used, which can help prevent over +clustering of reads. When set to 'True', dysgu will only use minimizer based clustering within the intervals specified +by --regions. + +Also of note, it is possible to use --exclude, --search, and --regions at the same time. + -Useful parameters ------------------ +πŸ”§ Useful parameters +-------------------- The most important parameter affecting sensitivity is --min-support, lower values increase sensitivity but also runtime. The --max-cov parameter may need to be adjusted for high coverage samples (default is 200), or samples that might have @@ -295,14 +331,8 @@ ambiguous candidate alignments. compared to matching reference bases. Reads that have anomalous divergence at the ends of the read are ignored during calling. -Resource requirements ---------------------- -Using a single core and depending on hard-drive speed, dysgu usually takes ~1h to analyse a 30X coverage genome of 150 bp paired-end reads and -uses < 6 GB memory. Also note that when `fetch` is utilized (or using run command), a large temp file is generated consisting of SV-associated reads >5 Gb in size. - - -Issues ------- +πŸš‘ Issues +--------- - Currently cram files are only supported when using the "run" command. This is because pysam cannot use seek on a cram file. - If the temp file created during the fetch stage of the pipeline is too big, the --compression level can be set to reduce space. @@ -316,8 +346,8 @@ Issues - If you input data or aligner do not seem to be working well with dysgu, please get in touch clealk@cardiff.ac.uk -Python API ----------- +🐍 Python API +------------- Dysgu can also be used from a python script. A full demo of the API can be found in the `ipython notebook `_,. In this example, dysgu is @@ -344,8 +374,8 @@ used to call SVs on the first 10 Mb of chr1: The API can also be used to apply different machine-learning models, merge SVs, and call SVs using target bed regions. -Citation --------- +πŸŽ“ Citation +----------- To cite dysgu, or to learn more about implementation details please see: https://academic.oup.com/nar/advance-article/doi/10.1093/nar/gkac039/6517943 diff --git a/dysgu/call_component.pyx b/dysgu/call_component.pyx index b74fad4..e466ed2 100644 --- a/dysgu/call_component.pyx +++ b/dysgu/call_component.pyx @@ -395,7 +395,7 @@ cdef make_generic_insertion_item(aln, int insert_size, int insert_std): aln_span = aln.reference_end - aln.pos v_item.size_inferred = 1 if insert_std > 0: - rand_insert_pos = insert_size - aln_span + int(normal(0, insert_std)) + rand_insert_pos = abs(insert_size - aln_span + int(normal(0, insert_std))) else: # single read mode v_item.svtype = "BND" clip_s = max(clip_sizes(aln)) diff --git a/dysgu/cluster.pyx b/dysgu/cluster.pyx index c09392d..33bc24a 100644 --- a/dysgu/cluster.pyx +++ b/dysgu/cluster.pyx @@ -26,6 +26,9 @@ import time ctypedef EventResult EventResult_t +np.random.seed(0) +random.seed(0) + def filter_potential(input_events, tree, regions_only): potential = [] @@ -198,13 +201,13 @@ def enumerate_events(G, potential, max_dist, try_rev, tree, paired_end=False, re ci2 = ei.contig2 ci_alt = ei.variant_seq if isinstance(ci_alt, str): - if len(ci_alt) > 0 and ci_alt[0] in "<.": + if len(ci_alt) == 1 or (len(ci_alt) > 0 and ci_alt[0] in "<."): ci_alt = "" cj = ej.contig cj2 = ej.contig2 cj_alt = ej.variant_seq if isinstance(cj_alt, str): - if len(cj_alt) > 0 and cj_alt[0] in "<.": + if len(cj_alt) == 1 or (len(cj_alt) > 0 and cj_alt[0] in "<."): cj_alt = "" any_contigs_to_check = any((ci, ci2, ci_alt)) and any((cj, cj2, cj_alt)) @@ -675,33 +678,33 @@ def process_job(msg_queue, args): completed_file.close() -def postcall_job(preliminaries, aux_data): - mode, ref_path, no_gt, insert_stdev, paired_end, drop_gaps = aux_data - ref_genome = pysam.FastaFile(ref_path) - preliminaries = re_map.drop_svs_near_reference_gaps(preliminaries, paired_end, ref_genome, drop_gaps) - preliminaries = post_call.ref_repetitiveness(preliminaries, ref_genome) - preliminaries = post_call.strand_binom_t(preliminaries) - preliminaries = assembler.contig_info(preliminaries) # GC info, repetitiveness - preliminaries = find_repeat_expansions(preliminaries, insert_stdev) - preliminaries = post_call.compressability(preliminaries) - preliminaries = post_call.get_gt_metric2(preliminaries, mode, no_gt) - preliminaries = post_call.get_ref_base(preliminaries, ref_genome) - return preliminaries +# def postcall_job(preliminaries, aux_data): +# mode, ref_path, no_gt, insert_stdev, paired_end, drop_gaps = aux_data +# ref_genome = pysam.FastaFile(ref_path) +# preliminaries = re_map.drop_svs_near_reference_gaps(preliminaries, paired_end, ref_genome, drop_gaps) +# preliminaries = post_call.ref_repetitiveness(preliminaries, ref_genome) +# preliminaries = post_call.strand_binom_t(preliminaries) +# preliminaries = assembler.contig_info(preliminaries) # GC info, repetitiveness +# preliminaries = find_repeat_expansions(preliminaries, insert_stdev) +# preliminaries = post_call.compressability(preliminaries) +# preliminaries = post_call.get_gt_metric2(preliminaries, mode, no_gt) +# preliminaries = post_call.get_ref_base(preliminaries, ref_genome) +# return preliminaries # https://rvprasad.medium.com/data-and-chunk-sizes-matter-when-using-multiprocessing-pool-map-in-python-5023c96875ef -aux_data = None - -def initializer(init_data): - global aux_data - aux_data = init_data +# aux_data = None +# +# def initializer(init_data): +# global aux_data +# aux_data = init_data -def with_initializer_worker_wrapper(varying_data): - return postcall_job(varying_data, aux_data) +# def with_initializer_worker_wrapper(varying_data): +# return postcall_job(varying_data, aux_data) -def pipe1(args, infile, kind, regions, ibam, ref_genome, bam_iter=None): +def pipe1(args, infile, kind, regions, ibam, ref_genome, sample_name, bam_iter=None): procs = args['procs'] low_mem = args['low_mem'] tdir = args["working_directory"] @@ -774,21 +777,14 @@ def pipe1(args, infile, kind, regions, ibam, ref_genome, bam_iter=None): max_single_size = min(max(args["max_cov"] * 50, 10000), 100000) # limited between 5000 - 50,000 reads event_id = 0 block_edge_events = [] - # min_support = int(args["min_support"]) - # args["min_support"] = min_support - # logging.info("Minimum support {}".format(min_support)) - # if args["pl"] == "pe": # reads with internal SVs can be detected at lower support - # lower_bound_support = min_support - 1 if min_support - 1 > 1 else 1 - # else: - # lower_bound_support = min_support + clip_length = args["clip_length"] merge_dist = args["merge_dist"] min_size = args["min_size"] length_extend = args["length_extend"] divergence = args["divergence"] read_buffer = genome_scanner.read_buffer - sites_info = sites_utils.vcf_reader(args["sites"], infile, args["reference"], paired_end, parse_probs=args["parse_probs"], - default_prob=args["sites_prob"], pass_only=args["sites_pass_only"] == "True") + sites_info = sites_utils.vcf_reader(args["sites"], infile, args["parse_probs"], sample_name, args["ignore_sample_sites"] == "True", args["sites_prob"], args["sites_pass_only"] == "True") cdef Py_SimpleGraph G G, node_to_name, bad_clip_counter, sites_adder, n_aligned_bases = graph.construct_graph(genome_scanner, @@ -1084,23 +1080,24 @@ def pipe1(args, infile, kind, regions, ibam, ref_genome, bam_iter=None): preliminaries = post_call.get_badclip_metric(preliminaries, bad_clip_counter, infile, regions) - if False: - aux_data = args["mode"], args["reference"], args["no_gt"], insert_stdev, paired_end, args["drop_gaps"] == "True" - pool = multiprocessing.Pool(procs, initializer, (aux_data,)) - n = int(len(preliminaries) / procs) - preliminaries = [preliminaries[i:i + n] for i in range(0, len(preliminaries), n)] - preliminaries = pool.map(with_initializer_worker_wrapper, preliminaries) - preliminaries = [item for m in preliminaries for item in m] + # if False: + # aux_data = args["mode"], args["reference"], args["no_gt"], insert_stdev, paired_end, args["drop_gaps"] == "True" + # pool = multiprocessing.Pool(procs, initializer, (aux_data,)) + # n = int(len(preliminaries) / procs) + # preliminaries = [preliminaries[i:i + n] for i in range(0, len(preliminaries), n)] + # preliminaries = pool.map(with_initializer_worker_wrapper, preliminaries) + # preliminaries = [item for m in preliminaries for item in m] + # + # else: - else: - preliminaries = re_map.drop_svs_near_reference_gaps(preliminaries, paired_end, ref_genome, args["drop_gaps"] == "True") - preliminaries = post_call.ref_repetitiveness(preliminaries, ref_genome) - preliminaries = post_call.strand_binom_t(preliminaries) - preliminaries = assembler.contig_info(preliminaries) # GC info, repetitiveness - preliminaries = find_repeat_expansions(preliminaries, insert_stdev) - preliminaries = post_call.compressability(preliminaries) - preliminaries = post_call.get_gt_metric2(preliminaries, args["mode"], args["no_gt"]) - preliminaries = post_call.get_ref_base(preliminaries, ref_genome) + preliminaries = re_map.drop_svs_near_reference_gaps(preliminaries, paired_end, ref_genome, args["drop_gaps"] == "True") + preliminaries = post_call.ref_repetitiveness(preliminaries, ref_genome) + preliminaries = post_call.strand_binom_t(preliminaries) + preliminaries = assembler.contig_info(preliminaries) # GC info, repetitiveness + preliminaries = find_repeat_expansions(preliminaries, insert_stdev) + preliminaries = post_call.compressability(preliminaries) + preliminaries = post_call.get_gt_metric2(preliminaries, args["mode"], args["no_gt"]) + preliminaries = post_call.get_ref_base(preliminaries, ref_genome, args["symbolic_sv_size"]) preliminaries = sample_level_density(preliminaries, regions) preliminaries = coverage_analyser.normalize_coverage_values(preliminaries) @@ -1188,7 +1185,7 @@ def cluster_reads(args): ##################### # Run dysgu here # ##################### - events, site_adder = pipe1(args, infile, kind, regions, ibam, ref_genome) + events, site_adder = pipe1(args, infile, kind, regions, ibam, ref_genome, sample_name) if not events: logging.critical("No events found") return diff --git a/dysgu/coverage.pyx b/dysgu/coverage.pyx index cab987f..6da4246 100644 --- a/dysgu/coverage.pyx +++ b/dysgu/coverage.pyx @@ -159,7 +159,7 @@ cdef class GenomeScanner: if self.bam_iter is not None: f_iter = self.bam_iter else: - f_iter = self.input_bam + f_iter = self.input_bam.fetch(until_eof=True) for aln in f_iter: # if aln.flag & 1284 or aln.mapq < mq_thresh or aln.cigartuples is None: # continue diff --git a/dysgu/extra_metrics.pyx b/dysgu/extra_metrics.pyx index 31c2cb7..5e5a2c6 100644 --- a/dysgu/extra_metrics.pyx +++ b/dysgu/extra_metrics.pyx @@ -14,7 +14,7 @@ import pandas as pd pd.options.mode.chained_assignment = None from sys import byteorder import os - +import resource ctypedef EventResult EventResult_t @@ -27,6 +27,9 @@ class BadClipCounter: if not self.low_mem: self.clip_pos_arr = [array.array("L", []) for i in range(n_references)] # 32 bit unsigned int else: + soft, hard = resource.getrlimit(resource.RLIMIT_NOFILE) + if soft + (n_references * 2) > soft: + resource.setrlimit(resource.RLIMIT_NOFILE, (min(hard, soft + (n_references * 2)), hard)) self.clip_pos_arr = [open(f"{self.temp_dir}/{i}.badclip.bin", "wb") for i in range(n_references)] def tidy(self): diff --git a/dysgu/filter_normals.py b/dysgu/filter_normals.py index 59cad2e..61f9ad4 100644 --- a/dysgu/filter_normals.py +++ b/dysgu/filter_normals.py @@ -255,18 +255,20 @@ def get_right_clip(r): def get_contig_clipped_bases(cont, extend_by_ratio=2): - if not cont: + if not cont or len(cont) < 20: return None, None left = "" right = "" if cont[0].islower(): i = 1 - while cont[i].islower(): + m = len(cont) + while i < m and cont[i].islower(): i += 1 left = cont[:(extend_by_ratio*i)-1] if cont[-1].islower(): i = -1 - while cont[i].islower(): + m = -len(cont) + while i > m and cont[i].islower(): i -= 1 i = max(0, len(cont) + (extend_by_ratio*i)) right = cont[i:] diff --git a/dysgu/graph.pyx b/dysgu/graph.pyx index 9bf6b46..9a0f720 100644 --- a/dysgu/graph.pyx +++ b/dysgu/graph.pyx @@ -762,7 +762,7 @@ cdef void add_to_graph(Py_SimpleGraph G, AlignedSegment r, PairedEndScoper_t pe_ if bnd_site_node != bnd_site_node2 and bnd_site_node2 >= 0 and not G.hasEdge(node_name, bnd_site_node2): G.addEdge(node_name, bnd_site_node2, 0) # Debug: - # if r.qname == "6211787c-d032-4c6a-8f8b-020f616f2055": + # if r.qname == "M03762:232:000000000-L65J4:1:2111:17729:15161": # echo("---", r.qname, read_enum, node_name, (event_pos, pos2), length_from_cigar, list(other_nodes)) # look = {'a4d38568-fd80-4785-8fa5-84ed132b445c', '2313a985-385c-4c84-b02c-dddfc627940b', '0031840a-bd2d-475d-9a04-528f71c7b512'} # if r.qname in look: @@ -1449,6 +1449,11 @@ cpdef proc_component(node_to_name, component, read_buffer, infile, Py_SimpleGrap else: return # Debug: + # if 157835 in n2n: + # echo("parts", partitions) + # echo("s_between", support_between) + # echo("s_within", support_within) + # echo("parts", partitions) # echo("s_between", support_between) # echo("s_within", support_within) diff --git a/dysgu/io_funcs.pyx b/dysgu/io_funcs.pyx index 5d0aee5..baa3f7b 100644 --- a/dysgu/io_funcs.pyx +++ b/dysgu/io_funcs.pyx @@ -6,14 +6,14 @@ cimport numpy as np import logging from map_set_utils import merge_intervals, echo from collections import defaultdict -import pkg_resources +from importlib.metadata import version import sortedcontainers import pandas as pd import os import sys import gzip from dysgu.map_set_utils import Py_BasicIntervalTree - +import random from libc.stdlib cimport malloc @@ -28,7 +28,7 @@ cdef char *basemap = [ '\0', '\0', '\0', '\0', '\0', '\0', '\0', '\0', '\0', '\0 '\0', '\0', '\0', '\0', 'a', 'a' ] np.random.seed(0) - +random.seed(0) cpdef str reverse_complement(str seq, int seq_len): """https://bioinformatics.stackexchange.com/questions/3583/\ @@ -144,11 +144,16 @@ cpdef list col_names(small_output): ] -def make_main_record(r, version, index, format_f, df_rows, add_kind, small_output): +def make_main_record(r, dysgu_version, index, format_f, df_rows, add_kind, small_output): rep, repsc, lenprec = 0, 0, 1 mean_prob, max_prob = None, None + debug = False + # if abs(r['posA'] - 39084726) < 10: + # echo('found', dict(r)) + # echo(len(format_f) > 1) + # echo([(int(v["su"]), v["posA"], v["event_id"], k) for k, v in df_rows.items()]) + # debug = True if len(format_f) > 1: - best = sorted([(int(v["su"]), k) for k, v in df_rows.items()], reverse=True)[0][1] probs = [v["prob"] for k, v in df_rows.items()] mean_prob = np.mean(probs) @@ -264,7 +269,7 @@ def make_main_record(r, version, index, format_f, df_rows, add_kind, small_outpu fmt_keys = "GT:GQ:NMP:NMS:NMB:MAPQP:MAPQS:NP:MAS:SU:WR:PE:SR:SC:BND:SQC:SCW:SQR:BE:COV:MCOV:LNK:NEIGH:NEIGH10:RB:PS:MS:SBT:NG:NSA:NXA:NMU:NDC:RMS:RED:BCC:FCC:STL:RAS:FAS:ICN:OCN:CMP:RR:JIT:PROB" if "variant_seq" in r and isinstance(r["variant_seq"], str): - if r['svtype'] == "INS": + if r['svtype'] == "INS" or r.variant_seq: alt_field = r.variant_seq.upper() else: alt_field = f"<{r['svtype']}>" @@ -281,7 +286,7 @@ def make_main_record(r, version, index, format_f, df_rows, add_kind, small_outpu alt_field, ".", "." if "filter" not in r else r['filter'], # INFO line - ";".join([f"SVMETHOD=DYSGUv{version}", + ";".join([f"SVMETHOD=DYSGUv{dysgu_version}", f"SVTYPE={r['svtype']}", f"END={r['posB']}" if r['chrA'] == r['chrB'] else f"END={r['posA'] + 1}", f"CHR2={r['chrB']}" + chr2_pos, @@ -297,7 +302,8 @@ def make_main_record(r, version, index, format_f, df_rows, add_kind, small_outpu # FORMAT line(s) for item in format_f.values(): rec.append(":".join(map(str, item))) - + # if debug: + # echo("returned ", rec) return rec @@ -341,7 +347,7 @@ def gen_format_fields(r, df, names, n_fields, small_output): row = cols[name] format_fields[name] = get_fmt(row, small_output) else: - format_fields[name] = [0] * n_fields + format_fields[name] = ['0/0'] + [0] * (n_fields - 1) return format_fields, cols @@ -451,8 +457,8 @@ def to_vcf(df, args, names, outfile, show_names=True, contig_names="", header=N contig_names=contig_names) + "\t" + "\t".join(names) + "\n") if show_names: - logging.info("Input samples: {}".format(str(list(names)))) - version = pkg_resources.require("dysgu")[0].version + logging.info("Samples: {}".format(str(list(names)))) + dysgu_version = version("dysgu") seen_idx = set([]) cnames = ['raw_reads_10kb', 'NMpri', 'NMsupp', 'MAPQpri', 'MAPQsupp', "NMbase", "n_gaps"] for col in cnames: @@ -489,7 +495,7 @@ def to_vcf(df, args, names, outfile, show_names=True, contig_names="", header=N if "partners" in r and r["partners"] is not None and r["partners"] != ".": seen_idx |= set(r["partners"]) - r_main = make_main_record(r, version, count, format_f, df_rows, add_kind, small_output_f) + r_main = make_main_record(r, dysgu_version, count, format_f, df_rows, add_kind, small_output_f) recs.append(r_main) count += 1 diff --git a/dysgu/main.py b/dysgu/main.py index 28fbeaa..6714c90 100644 --- a/dysgu/main.py +++ b/dysgu/main.py @@ -6,7 +6,7 @@ import time from multiprocessing import cpu_count from subprocess import run -import pkg_resources +from importlib.metadata import version import warnings from dysgu import cluster, view, sv2bam, filter_normals import datetime @@ -96,7 +96,7 @@ def apply_preset(kwargs): kwargs[k] = v -version = pkg_resources.require("dysgu")[0].version +dysgu_version = version("dysgu") logFormatter = logging.Formatter("%(asctime)s [%(levelname)-7.7s] %(message)s") rootLogger = logging.getLogger() @@ -126,11 +126,11 @@ def make_wd(args, call_func=False): "or supply --ibam to re-use temp files in working directory") else: temp_dir = args["wd"] - if temp_dir is None: - return -1 + if not temp_dir: + raise ValueError("Working directory is empty") if not os.path.exists(temp_dir): os.mkdir(temp_dir) - elif not args["overwrite"]: + elif "overwrite" in args and not args["overwrite"]: raise ValueError("Working directory already exists. Add -x / --overwrite=True to proceed") @@ -152,6 +152,8 @@ def cli(): @click.option('--sites-pass-only', help="Only add variants from sites that have PASS", default="True", type=click.Choice(["True", "False"]), show_default=True) +@click.option('--ignore-sample-sites', help="If --sites is multi-sample, ignore variants from the input file SV-ALIGNS", + default="True", type=click.Choice(["True", "False"]), show_default=True) @click.option('--parse-probs', help="Parse INFO:MeanPROB or FORMAT:PROB instead of using --sites-p", default="False", type=click.Choice(["True", "False"]), show_default=True) @@ -167,9 +169,9 @@ def cli(): show_default=True, default="wb0", type=str) @click.option("-p", "--procs", help="Number of cpu cores to use", type=cpu_range, default=1, show_default=True) -@click.option('--mode', help="Type of input reads. Multiple options are set, overrides other options. " - "pacbio: --mq 20 --paired False --min-support 'auto' --max-cov 150 --dist-norm 200 --trust-ins-len True. " - "nanopore: --mq 20 --paired False --min-support 'auto' --max-cov 150 --dist-norm 900 --trust-ins-len False", +@click.option('--mode', help=f"Type of input reads. Multiple options are set, overrides other options. " + f"pacbio: --mq {presets['pacbio']['mq']} --paired False --min-support '{presets['pacbio']['min_support']}' --max-cov {presets['pacbio']['max_cov']} --dist-norm {presets['pacbio']['dist_norm']} --trust-ins-len True. " + f"nanopore: --mq {presets['nanopore']['mq']} --paired False --min-support '{presets['nanopore']['min_support']}' --max-cov {presets['nanopore']['max_cov']} --dist-norm {presets['nanopore']['dist_norm']} --trust-ins-len False", default="pe", type=click.Choice(["pe", "pacbio", "nanopore"]), show_default=True) @click.option('--pl', help=f"Type of input reads [default: {defaults['pl']}]", type=click.Choice(["pe", "pacbio", "nanopore"]), callback=add_option_set) @@ -221,9 +223,10 @@ def cli(): @click.option("--metrics", help="Output additional metrics for each SV", default=False, is_flag=True, flag_value=True, show_default=True) @click.option("--no-gt", help="Skip adding genotype to SVs", is_flag=True, flag_value=False, show_default=False, default=True) @click.option("--keep-small", help="Keep SVs < min-size found during re-mapping", default=False, is_flag=True, flag_value=True, show_default=False) +@click.option("--symbolic-sv-size", help="Use symbolic representation if SV >= this size. Set to -1 to ignore", default=-1, type=int, show_default=False) @click.option("--low-mem", help="Use less memory but more temp disk space", is_flag=True, flag_value=True, show_default=False, default=False) @click.option("-x", "--overwrite", help="Overwrite temp files", is_flag=True, flag_value=True, show_default=False, default=False) -@click.option("-c", "--clean", help="Remove temp files when finished", is_flag=True, flag_value=True, show_default=False, default=False) +@click.option("-c", "--clean", help="Remove temp files and working directory when finished", is_flag=True, flag_value=True, show_default=False, default=False) @click.option("--thresholds", help="Probability threshold to label as PASS for 'DEL,INS,INV,DUP,TRA'", default="0.45,0.45,0.45,0.45,0.45", type=str, show_default=True) @click.pass_context @@ -231,7 +234,7 @@ def run_pipeline(ctx, **kwargs): """Run the dysgu pipeline. Important parameters are --mode, --diploid, --min-support, --min-size, --max-cov""" # Add arguments to context t0 = time.time() - logging.info("[dysgu-run] Version: {}".format(version)) + logging.info("[dysgu-run] Version: {}".format(dysgu_version)) make_wd(kwargs) apply_preset(kwargs) show_params() @@ -294,7 +297,7 @@ def run_pipeline(ctx, **kwargs): def get_reads(ctx, **kwargs): """Filters input bam/cram for read-pairs that are discordant or have a soft-clip of length > '--clip-length', saves bam file in WORKING_DIRECTORY""" - logging.info("[dysgu-fetch] Version: {}".format(version)) + logging.info("[dysgu-fetch] Version: {}".format(dysgu_version)) make_wd(kwargs) ctx = apply_ctx(ctx, kwargs) return sv2bam.process(ctx.obj) @@ -314,8 +317,9 @@ def get_reads(ctx, **kwargs): @click.option("--sites-prob", help="Prior probability that a matching variant in --sites is true", required=False, type=click.FloatRange(0, 1), default=0.6, show_default=True) @click.option('--sites-pass-only', help="Only add variants from sites that have PASS", - default="True", type=click.Choice(["True", "False"]), - show_default=True) + default="True", type=click.Choice(["True", "False"]), show_default=True) +@click.option('--ignore-sample-sites', help="If --sites is multi-sample, ignore variants from the input file SV-ALIGNS", + default="True", type=click.Choice(["True", "False"]), show_default=True) @click.option('--parse-probs', help="Parse INFO:MeanPROB or FORMAT:PROB instead of using --sites-p", default="False", type=click.Choice(["True", "False"]), show_default=True) @@ -324,9 +328,9 @@ def get_reads(ctx, **kwargs): @click.option('--pfix', help="Post-fix of temp alignment file (used when a working-directory is provided instead of " "sv-aligns)", default="dysgu_reads", type=str, required=False) -@click.option('--mode', help="Type of input reads. Multiple options are set, overrides other options. " - "pacbio: --mq 20 --paired False --min-support 'auto' --max-cov 150 --dist-norm 200 --trust-ins-len True. " - "nanopore: --mq 20 --paired False --min-support 'auto' --max-cov 150 --dist-norm 900 --trust-ins-len False", +@click.option('--mode', help=f"Type of input reads. Multiple options are set, overrides other options. " + f"pacbio: --mq {presets['pacbio']['mq']} --paired False --min-support '{presets['pacbio']['min_support']}' --max-cov {presets['pacbio']['max_cov']} --dist-norm {presets['pacbio']['dist_norm']} --trust-ins-len True. " + f"nanopore: --mq {presets['nanopore']['mq']} --paired False --min-support '{presets['nanopore']['min_support']}' --max-cov {presets['nanopore']['max_cov']} --dist-norm {presets['nanopore']['dist_norm']} --trust-ins-len False", default="pe", type=click.Choice(["pe", "pacbio", "nanopore"]), show_default=True) @click.option('--pl', help=f"Type of input reads [default: {defaults['pl']}]", type=click.Choice(["pe", "pacbio", "nanopore"]), callback=add_option_set) @@ -379,15 +383,16 @@ def get_reads(ctx, **kwargs): @click.option("--metrics", help="Output additional metrics for each SV", default=False, is_flag=True, flag_value=True, show_default=True) @click.option("--no-gt", help="Skip adding genotype to SVs", is_flag=True, flag_value=False, show_default=False, default=True) @click.option("--keep-small", help="Keep SVs < min-size found during re-mapping", default=False, is_flag=True, flag_value=True, show_default=False) +@click.option("--symbolic-sv-size", help="Use symbolic representation if SV >= this size. Set to -1 to ignore", default=-1, type=int, show_default=False) @click.option("--low-mem", help="Use less memory but more temp disk space", is_flag=True, flag_value=True, show_default=False, default=False) @click.option("-x", "--overwrite", help="Overwrite temp files", is_flag=True, flag_value=True, show_default=False, default=False) -@click.option("-c", "--clean", help="Remove temp files when finished", is_flag=True, flag_value=True, show_default=False, default=False) +@click.option("-c", "--clean", help="Remove temp files and working directory when finished", is_flag=True, flag_value=True, show_default=False, default=False) @click.option("--thresholds", help="Probability threshold to label as PASS for 'DEL,INS,INV,DUP,TRA'", default="0.45,0.45,0.45,0.45,0.45", type=str, show_default=True) @click.pass_context def call_events(ctx, **kwargs): """Call structural variants from bam alignment file/stdin""" - logging.info("[dysgu-call] Version: {}".format(version)) + logging.info("[dysgu-call] Version: {}".format(dysgu_version)) make_wd(kwargs, call_func=True) if kwargs["sv_aligns"] is None: # Try and open from working director @@ -414,10 +419,14 @@ def call_events(ctx, **kwargs): @cli.command("merge") -@click.argument('input_files', required=True, type=click.Path(), nargs=-1) +@click.argument('input_files', required=False, type=click.Path(), nargs=-1) +@click.option("-i", "--input-list", help="Input list of file paths, one line per file ", required=False, type=click.Path(exists=True)) @click.option("-o", "--svs-out", help="Output file, [default: stdout]", required=False, type=click.Path()) @click.option("-f", "--out-format", help="Output format", default="vcf", type=click.Choice(["csv", "vcf"]), show_default=True) +@click.option("-p", "--procs", help="Number of processors to use when merging, requires --wd option to be supplied", type=cpu_range, default=1, show_default=True) +@click.option("-d", "--wd", help="Working directory to use/create when merging", type=click.Path(exists=False), required=False) +@click.option("-c", "--clean", help="Remove working directory when finished", is_flag=True, flag_value=True, show_default=False, default=False) @click.option("--collapse-nearby", help="Merges more aggressively by collapsing nearby SV", default="True", type=click.Choice(["True", "False"]), show_default=True) @click.option("--merge-across", help="Merge records across input samples", default="True", @@ -437,8 +446,13 @@ def call_events(ctx, **kwargs): @click.pass_context def view_data(ctx, **kwargs): """Merge vcf/csv variant files""" - logging.info("[dysgu-merge] Version: {}".format(version)) + logging.info("[dysgu-merge] Version: {}".format(dysgu_version)) ctx = apply_ctx(ctx, kwargs) + if ctx.obj["wd"]: + make_wd(ctx.obj) + if not ctx.obj["input_files"] and not ctx.obj["input_list"]: + logging.error("Please supply either INPUT_FILES as an argument or use the --input-list option") + quit() view.view_file(ctx.obj) @@ -462,7 +476,7 @@ def filter_normal(ctx, **kwargs): """Filter a vcf generated by dysgu. Unique SVs can be found in the input_vcf by supplying a --normal-vcf (single or multi-sample), and normal bam files. Bam/vcf samples with the same name as the input_vcf will be ignored""" - logging.info("[dysgu-filter] Version: {}".format(version)) + logging.info("[dysgu-filter] Version: {}".format(dysgu_version)) ctx = apply_ctx(ctx, kwargs) filter_normals.run_filtering(ctx.obj) @@ -472,7 +486,7 @@ def filter_normal(ctx, **kwargs): def test_command(ctx, **kwargs): """Run dysgu tests""" pwd = os.getcwd() - logging.info("[dysgu-test] Version: {}".format(version)) + logging.info("[dysgu-test] Version: {}".format(dysgu_version)) tests_path = os.path.dirname(__file__) + "/tests" tests = list() tests.append(["dysgu fetch", @@ -481,40 +495,40 @@ def test_command(ctx, **kwargs): tests_path + '/small.bam']) tests.append(["dysgu run", "-x --drop-gaps False", - "-o " + pwd + '/test.dysgu{}.vcf'.format(version), + "-o " + pwd + '/test.dysgu{}.vcf'.format(dysgu_version), tests_path + '/ref.fa', pwd + '/wd_test', tests_path + '/small.bam']) tests.append(["dysgu run", "-x --drop-gaps False", "--regions " + tests_path + '/targets.bed', - "-o " + pwd + '/test_regions.dysgu{}.vcf'.format(version), + "-o " + pwd + '/test_regions.dysgu{}.vcf'.format(dysgu_version), tests_path + '/ref.fa', pwd + '/wd_test2', tests_path + '/small.bam']) tests.append(["dysgu run", "-x --drop-gaps False --procs 2", "--regions " + tests_path + '/targets.bed', - "-o " + pwd + '/test_regions.dysgu{}.vcf'.format(version), + "-o " + pwd + '/test_regions.dysgu{}.vcf'.format(dysgu_version), tests_path + '/ref.fa', pwd + '/wd_test2', tests_path + '/small.bam']) tests.append(["dysgu run", "-x --drop-gaps False --mode pacbio", - "-o " + pwd + '/test2.dysgu{}.vcf'.format(version), + "-o " + pwd + '/test2.dysgu{}.vcf'.format(dysgu_version), tests_path + '/ref.fa', pwd + '/wd_test', tests_path + '/small.bam']) tests.append(["dysgu call", "-x --drop-gaps False", - "-o " + pwd + '/test2.dysgu{}.vcf'.format(version), + "-o " + pwd + '/test2.dysgu{}.vcf'.format(dysgu_version), tests_path + '/ref.fa', pwd + '/wd_test', tests_path + '/small.bam']) tests.append(["dysgu merge", - pwd + '/test.dysgu{}.vcf'.format(version), - pwd + '/test2.dysgu{}.vcf'.format(version), - "-o " + pwd + '/test.merge.dysgu{}.vcf'.format(version)]) + pwd + '/test.dysgu{}.vcf'.format(dysgu_version), + pwd + '/test2.dysgu{}.vcf'.format(dysgu_version), + "-o " + pwd + '/test.merge.dysgu{}.vcf'.format(dysgu_version)]) for t in tests: c = " ".join(t) v = run(shlex.split(c), shell=False, capture_output=True, check=True) diff --git a/dysgu/post_call.py b/dysgu/post_call.py index 0ce6bcd..b9a0044 100644 --- a/dysgu/post_call.py +++ b/dysgu/post_call.py @@ -343,16 +343,35 @@ def strand_binom_t(events): return events -def get_ref_base(events, ref_genome): +def get_ref_base(events, ref_genome, symbolic_sv_size): for e in events: if not e.ref_seq: - if e.posA == 0: - e.posA = 1 - try: - base = ref_genome.fetch(e.chrA, e.posA - 1, e.posA).upper() - e.ref_seq = base - except: - pass + if symbolic_sv_size == -1 or e.svtype == "INS" or e.svtype == "TRA": + if e.posA == 0: + e.posA = 1 + try: + base = ref_genome.fetch(e.chrA, e.posA - 1, e.posA).upper() + e.ref_seq = base + except: + pass + else: + if e.svlen < symbolic_sv_size: + if e.posA == 0: + e.posA = 1 + try: + bases = ref_genome.fetch(e.chrA, e.posA, e.posB).upper() + e.ref_seq = bases + e.variant_seq = bases[0] if len(bases) else "" + except: + pass + else: + if e.posA == 0: + e.posA = 1 + try: + base = ref_genome.fetch(e.chrA, e.posA - 1, e.posA).upper() + e.ref_seq = base + except: + pass return events diff --git a/dysgu/sites_utils.py b/dysgu/sites_utils.py index 4b6cac6..8d0f3eb 100644 --- a/dysgu/sites_utils.py +++ b/dysgu/sites_utils.py @@ -46,23 +46,27 @@ def parse_variant_seqs_dysgu(r, svt, chrom, start, chrom2, stop, paired_end, ref Site = namedtuple("Site", ["chrom", "start", "chrom2", "end", "svtype", "index", "id", "svlen", "prob"]) -def vcf_reader(pth, infile, ref_genome_pth, paired_end, parse_probs, default_prob=0.6, pass_only=True): +def vcf_reader(pth, infile, parse_probs, sample_name, ignore_sample, default_prob=0.6, pass_only=True): if pth is None: return None - logging.info("Reading --sites") - # ref_genome = pysam.FastaFile(ref_genome_pth) - vcf = pysam.VariantFile(pth) # auto detect vcf / bcf / vcf.gz + logging.info(f"Reading --sites {pth}") + vcf = pysam.VariantFile(pth) + + samples = list(vcf.header.samples) + if len(samples) == 1 or sample_name not in samples: + ignore_sample = False # variants must be fed into graph in roughly sorted order recs = {} - not_parsed = [] for idx, r in enumerate(vcf): - if pass_only and "PASS" not in r.filter: continue - + if ignore_sample and sample_name in r.samples and "GT" in r.samples[sample_name]: + this_gt = r.samples[sample_name]['GT'] + if not (this_gt == "0/0" or this_gt == "0" or this_gt == "." or this_gt == "./." or this_gt == "0|0"): + continue if "CHROM2" in r.info: chrom2_info = r.info["CHROM2"] else: diff --git a/dysgu/sv_category.pyx b/dysgu/sv_category.pyx index 7162598..5189413 100644 --- a/dysgu/sv_category.pyx +++ b/dysgu/sv_category.pyx @@ -786,7 +786,7 @@ cdef void translocation(AlignmentItem v): v.query_overlap = query_overlap -cdef void classify_d(AlignmentItem v): +cdef void classify_d(AlignmentItem v): #, debug=False): v.breakA_precise = 0 v.breakB_precise = 0 @@ -803,4 +803,5 @@ cdef void classify_d(AlignmentItem v): # Debug - # echo(v.read_a.qname, v.rA == v.rB, v.priA and v.priB, v.inferred_sv_len, "strands", v.strandA == v.strandB, "pos", v.posA, v.posB, "breaks", v.breakA, v.breakB, "a_q > b_q", v.a_qstart > v.b_qstart) + # if debug: + # echo(v.read_a.qname, v.rA == v.rB, v.priA and v.priB, v.inferred_sv_len, "strands", v.strandA == v.strandB, "pos", v.posA, v.posB, "breaks", v.breakA, v.breakB, "a_q > b_q", v.a_qstart > v.b_qstart) diff --git a/dysgu/view.py b/dysgu/view.py index c91dde2..42eefe6 100644 --- a/dysgu/view.py +++ b/dysgu/view.py @@ -1,3 +1,4 @@ +import glob import os import sys import pandas as pd @@ -8,26 +9,31 @@ from collections import defaultdict from dysgu import io_funcs, cluster from dysgu.map_set_utils import echo, merge_intervals -import subprocess -import io +import multiprocessing import gzip import pysam import numpy as np -from sys import stdin, stdout +from sys import stdout +from heapq import heappop, heappush +import resource +from copy import deepcopy +import re -def open_outfile(args, names_dict): +def open_outfile(args, names_list, log_messages=True): if args["separate"] == "True": outfiles = {} - for item, name in names_dict.items(): + for name in names_list: outname = f"{name}.{args['post_fix']}.csv" outfiles[name] = open(outname, "w") return outfiles if args["svs_out"] == "-" or args["svs_out"] is None: - logging.info("SVs output to stdout") + if log_messages: + logging.info("SVs output to stdout") outfile = stdout else: - logging.info("SVs output to {}".format(args["svs_out"])) + if log_messages: + logging.info("SVs output to {}".format(args["svs_out"])) outfile = open(args["svs_out"], "w") return outfile @@ -61,8 +67,9 @@ def mung_df(df, args): return df -def merge_df(df, n_samples, merge_dist, tree=None, merge_within_sample=False, aggressive=False): - logging.info("Merge distance: {} bp".format(merge_dist)) +def merge_df(df, n_samples, merge_dist, tree=None, merge_within_sample=False, aggressive=False, log_messages=True): + if log_messages: + logging.info("Merge distance: {} bp".format(merge_dist)) df.reset_index(inplace=True) df["event_id"] = df.index df["contig"] = df["contigA"] @@ -70,7 +77,6 @@ def merge_df(df, n_samples, merge_dist, tree=None, merge_within_sample=False, ag # Assume: df["preciseA"] = [1] * len(df) df["preciseB"] = [1] * len(df) - potential = [dotdict(set_numeric(i)) for i in df.to_dict("records")] if not merge_within_sample: found = cluster.merge_events(potential, merge_dist, tree, try_rev=False, pick_best=False, add_partners=True, @@ -201,7 +207,8 @@ def vcf_to_df(path): path_h = path df = pd.read_csv(path_h, index_col=None, comment="#", sep="\t", header=None) if len(df.columns) > 10: - raise ValueError(f"Can only merge files with one sample in. N samples = {len(df.columns) - 9}") + msg = f"Can only merge files with one sample in. N samples = {len(df.columns) - 9}" + raise ValueError(msg) parsed = pd.DataFrame() parsed["chrA"] = df[0] parsed["posA"] = df[1] @@ -304,19 +311,28 @@ def vcf_to_df(path): } df.rename(columns={k: v[0] for k, v in col_map.items()}, inplace=True) + for value_name, value_type in col_map.values(): + if value_name != "posB_tra" and value_name not in df: + if value_type == np.int64 or value_type == float: + df[value_name] = [0] * len(df) + else: + df[value_name] = [''] * len(df) + df["GQ"] = pd.to_numeric(df["GQ"], errors='coerce').fillna(".") for k, dtype in col_map.values(): if k in df: if df[k].dtype != dtype: if dtype == str: - df[k] = df[k].fillna("") + if k == "GT": + df[k] = ["0/0" if not i else i for i in list(df[k])] + else: + df[k] = df[k].fillna("") else: df[k] = df[k].fillna(0) try: df[k] = df[k].astype(dtype) except ValueError or OverflowError: - echo(list(df[k])) - raise ValueError("Problem for feature {}, could not intepret as {}".format(k, dtype)) + raise ValueError("Problem for feature {}, could not interpret as {}".format(k, dtype)) if "contigA" not in df: df["contigA"] = [""] * len(df) if "contigB" not in df: @@ -329,187 +345,284 @@ def vcf_to_df(path): return df, header, n_fields, "\n" + contig_names.strip() if contig_names else contig_names -def call_pons(samps, df, args): - pad = 500 - intervals = [] - for chrA, posA, chrB, posB in zip(df.chrA, df.posA, df.chrB, df.posB): - r1 = chrA, 0 if posA - pad < 0 else posA - pad, posA + pad - r2 = chrB, 0 if posB - pad < 0 else posB - pad, posB + pad - intervals.append(r1) - intervals.append(r2) - - merged_intervals = merge_intervals(intervals) - - wd = args["wd"] - with open(f"{wd}/pon.search.bed", "w") as b: - for c, s, e in merged_intervals: - b.write(f"{c}\t{s}\t{e}\n") - - ref = args["ref"] - pon_dfs = [] - for af, read_type in samps: # todo parallelize this - result = subprocess.run( - f"dysgu fetch --mq 0 --search {wd}/pon.search.bed --min-size 25 -x -o {wd}/pon.dysgu_reads.bam {wd} {af}", - shell=True, stdout=subprocess.PIPE) - if result.returncode != 0: - raise RuntimeError(f"dysgu run failed on {af}") - result = subprocess.run( - f"dysgu call --mq 0 --ibam {af} -f csv --min-support 1 -x --mode {read_type} {ref} {wd} {wd}/pon.dysgu_reads.bam", - shell=True, stdout=subprocess.PIPE) - if result.returncode != 0: - raise RuntimeError("dysgu run failed on pon") - output = io.StringIO() - output.write(result.stdout.decode("utf-8")) - output.seek(0) - df_p = pd.read_csv(output, sep=",", index_col=None) - df_p["sample"] = ["PON-dysgu"] * len(df_p) - df_p["table_name"] = ["PON-dysgu"] * len(df_p) - pon_dfs.append(df_p) - - return pon_dfs - - -def call_from_samp(samps, df, args): +def get_names_list(file_list, ignore_csv=True): + seen_names = sortedcontainers.SortedSet([]) + names_list = [] + name_c = defaultdict(int) + for item in file_list: + if item == "-": + raise ValueError("Reading from stdin is not supported using merge") + if item.endswith(".csv"): + if ignore_csv: + continue + raise ValueError(".csv files are not supported when using merge and option --wd") + name = list(pysam.VariantFile(item, 'r').header.samples) + if len(name) > 1: + msg = f"Sample file {item} contains more than one sample: {name}" + raise ValueError(msg) + name = name[0] + if name in seen_names: + bname = f"{name}_{name_c[name]}" + names_list.append(bname) + logging.info(f"Sample {name} is present in more than one input file, sample in file {item} will be named {bname} in merged vcf") + seen_names.add(bname) + else: + names_list.append(name) + seen_names.add(name) + name_c[name] += 1 + return seen_names, names_list - pon_dfs = call_pons(samps, df, args) - df_c = pd.concat([df] + pon_dfs) +def process_file_list(args, file_list, seen_names, names_list, log_messages): + dfs = [] + header = None + contig_names = None + for bname, item in zip(names_list, file_list): + name, ext = os.path.splitext(item) - df_c["chrA"] = df_c["chrA"].astype(str) - df_c["chrB"] = df_c["chrB"].astype(str) - df_c["contigA"] = [i if isinstance(i, str) else "" for i in df_c["contigA"]] - df_c["contigB"] = [i if isinstance(i, str) else "" for i in df_c["contigB"]] + if ext != ".csv": # assume vcf + df, header, _, contig_names = vcf_to_df(item) # header here, assume all input has same number of fields + else: + df = pd.read_csv(item, index_col=None) - if "partners" in df_c: - del df_c["partners"] - if "event_id" in df_c: - del df_c["event_id"] + name = list(set(df["sample"])) + if len(name) > 1: + msg = f"More than one sample in {item}" + raise ValueError(msg) - seen_names = len(set(df_c["sample"])) + df["sample"] = [bname] * len(df) + df["table_name"] = [bname for _ in range(len(df))] + if len(set(df["sample"])) > 1: + raise ValueError("More than one sample per input file") - df_m = merge_df(df_c, seen_names, args["merge_dist"], {}, aggressive=True) - df_m = df_m.sort_values(["chrA", "posA", "chrB", "posB"]) + df = mung_df(df, args) + if args["merge_within"] == "True": + l_before = len(df) + df = merge_df(df, 1, args["merge_dist"], {}, merge_within_sample=True, + aggressive=args['collapse_nearby'] == "True", log_messages=log_messages) + if log_messages: + logging.info("{} rows before merge-within {}, rows after {}".format(name, l_before, len(df))) + if "partners" in df: + del df["partners"] + dfs.append(df) - pon_partners = set([]) - for s in df_m[df_m["sample"] == "PON-dysgu"]["partners"]: - pon_partners |= s + df = pd.concat(dfs) + if args["merge_across"] == "True": + if len(seen_names) > 1: + df = merge_df(df, len(seen_names), args["merge_dist"], {}, aggressive=args['collapse_nearby'] == "True", + log_messages=log_messages) - return pon_partners + df = df.sort_values(["chrA", "posA", "chrB", "posB"]) + outfile = open_outfile(args, names_list, log_messages=log_messages) + if args["out_format"] == "vcf": + count = io_funcs.to_vcf(df, args, seen_names, outfile, header=header, small_output_f=True, + contig_names=contig_names, show_names=log_messages) + if log_messages: + logging.info("Sample rows before merge {}, rows after {}".format(list(map(len, dfs)), count)) + else: + to_csv(df, args, outfile, small_output=False) -def check_raw_alignments(df, args, pon): - # get soft-clip position and direction - clips = [] - for chrA, posA, contA, chrB, posB, contB, idx, svlen, spanning in zip(df.chrA, df.posA, df.contigA, df.chrB, df.posB, df.contigB, df.index, df.svlen, df.spanning): - if spanning: - clips.append((chrA, posA, 3, idx, chrA == chrB, svlen)) - clips.append((chrB, posB, 3, idx, chrA == chrB, svlen)) +class VcfWriter(object): + def __init__(self, out_path, target_header, new_name=None): + self.path = out_path + self.vcf = open(out_path, 'w') if out_path != '-' else stdout + self.header = target_header + str_hdr = str(target_header) + if new_name: + endi = len(str_hdr) + while True: + endi -= 1 + if str_hdr[endi] == "\t": + break + str_hdr = str_hdr[:endi + 1] + f"{new_name}\n" + self.vcf.write(str_hdr) + def close(self): + self.vcf.close() + def write(self, r): + self.vcf.write(str(r)) + + +def shard_job(wd, item_path, name, Global): + shards = {} + vcf = pysam.VariantFile(item_path, 'r') + contigs = len(vcf.header.contigs) + if contigs == 0: + contigs = 250 + ub = contigs * contigs + soft, hard = resource.getrlimit(resource.RLIMIT_NOFILE) + ub += soft + resource.setrlimit(resource.RLIMIT_NOFILE, (min(hard, ub), hard)) + Global.soft = ub + + rows = 0 + for r in vcf.fetch(): + svtype = r.info['SVTYPE'] + svtype = 'INSDUP' if (svtype == "INS" or svtype == "DUP") else svtype + chrom = r.chrom + if svtype != "TRA": + key = f'{svtype}_{chrom}', name else: - if contA: - start_lower = contA[0].islower() - end_lower = contA[-1].islower() - if start_lower and not end_lower: - clip_side = 0 - elif not start_lower and end_lower: - clip_side = 1 - else: # start_lower and end_lower: - clip_side = 3 # any side - clips.append((chrA, posA, clip_side, idx, chrA == chrB, svlen)) - if contB: - start_lower = contB[0].islower() - end_lower = contB[-1].islower() - if start_lower and not end_lower: - clip_side = 0 - elif not start_lower and end_lower: - clip_side = 1 + chrom2 = r.info["CHR2"] + key = f'{svtype}_{min(chrom, chrom2)}_{max(chrom, chrom2)}', name + if key not in shards: + shards[key] = VcfWriter(os.path.join(wd, name + "~" + key[0] + f".vcf"), vcf.header, name) + shards[key].write(r) + rows += 1 + for v in shards.values(): + v.close() + vcf.close() + return list(shards.keys()), rows, name + + +def sort_into_single_file(out_f, vcf_header, file_paths_to_combine, sample_list): + outf = VcfWriter(out_f, target_header=vcf_header) + file_iterators = [] + count = 0 + for item in file_paths_to_combine: + file_iterators.append(pysam.VariantFile(item, 'r')) + count += 1 + if not file_iterators: + return 0 + + var_q = [] + done_count = 0 + for idx, item in enumerate(file_iterators): + try: + v = item.__next__() + heappush(var_q, (v.chrom, v.pos, idx, v)) + except StopIteration: + file_iterators[idx].close() + file_iterators[idx] = None + done_count += 1 + + written = 0 + while done_count < len(file_iterators): + if var_q: + chrom, pos, idx, v = heappop(var_q) + # ensure correct sample ordering + str_v = str(v).strip().split('\t') + main_record = str_v[:9] + samp_records = str_v[9:] + record_samples = {k: v for k, v in zip(v.samples.keys(), samp_records)} + rd = [] + c = 0 + for samp in sample_list: + if samp in record_samples: + rd.append(record_samples[samp]) + c += 1 else: - clip_side = 3 - clips.append((chrB, posB, clip_side, idx, chrA == chrB, svlen)) - - clips = sorted(clips, key=lambda x: (x[0], x[1])) - - opts = {"bam": "rb", "cram": "rc", "sam": "r", "-": "rb", "stdin": "rb"} - pad = 20 - found = set([]) - for pth, _ in pon: - # open alignment file - kind = pth.split(".")[-1] - bam_mode = opts[kind] - - pysam.set_verbosity(0) - infile = pysam.AlignmentFile(pth, bam_mode, threads=1, - reference_filename=None if kind != "cram" else args["ref"]) - pysam.set_verbosity(3) - - for chrom, pos, cs, index, intra, svlen in clips: - - if index in found: + rd.append('0/0:' + ':'.join(list('0' * (len(v.format.keys()) - 1)))) + if c < len(record_samples): + raise RuntimeError('Number of samples in record was greater than out file, please report this') + main_record += rd + outf.write('\t'.join(main_record) + '\n') + written += 1 + + if file_iterators[idx] is None: continue - - for a in infile.fetch(chrom, pos - pad if pos - pad > 0 else 0, pos + pad): - if not a.cigartuples: + try: + v = file_iterators[idx].__next__() + heappush(var_q, (v.chrom, v.pos, idx, v)) + except StopIteration: + file_iterators[idx].close() + file_iterators[idx] = None + done_count += 1 + else: + for idx, item in enumerate(file_iterators): + if item is None: continue - - if a.cigartuples[0][0] == 4 and cs != 1: - current_pos = a.pos - if abs(current_pos - pos) < 8: - found.add(index) - break - if a.cigartuples[-1][0] == 4 and cs != 0: - current_pos = a.reference_end - if abs(current_pos - pos) < 8: - found.add(index) - break - - df = df.drop(found) - - return df - - -def process_pon(df, args): # legacy code - pon = [] - for item, read_type in zip(args["pon"].split(","), args["pon_rt"].split(",")): - pon.append((item, read_type)) - - if len(pon) == 0: - raise IOError("--pon parse failed") - - if "partners" in df.columns: - # check one representative from each group - dont_check = set([]) - check = set([]) - for idx, p in zip(df.index, df["partners"]): - if idx not in dont_check: - check.add(idx) - if p is not None: - dont_check |= p - - df_check = df.loc[list(check)] - pon_idxs = call_from_samp(pon, df_check, args) - for idx, s in zip(df.index, df["partners"]): - if idx in pon_idxs: - pon_idxs |= s - continue - for i in s: - if i in pon_idxs: - pon_idxs |= s - break - + try: + v = item.__next__() + heappush(var_q, (v.chrom, v.pos, idx, v)) + except StopIteration: + file_iterators[idx].close() + file_iterators[idx] = None + done_count += 1 + outf.close() + assert all(i is None for i in file_iterators) + return written + + +def shard_data(args, input_files, Global): + logging.info(f"Merge distance: {args['merge_dist']} bp") + out_f = args['svs_out'] if args['svs_out'] else '-' + logging.info("SVs output to {}".format(out_f if out_f != '-' else 'stdout')) + + to_delete = [] + seen_names, names_list = get_names_list(input_files, ignore_csv=False) + + # Split files into shards + job_args = [] + for name, item in zip(names_list, input_files): + job_args.append((args['wd'], item, name, Global)) + pool = multiprocessing.Pool(args['procs']) + results = pool.starmap(shard_job, job_args) + input_rows = {} + job_blocks = defaultdict(list) + for block_keys, rows, sample_name in results: + input_rows[sample_name] = rows + for block_id, _ in block_keys: + job_blocks[block_id].append(sample_name) + to_delete.append(os.path.join(args['wd'], block_id + '_merged.vcf')) + + # Set upper bound on open files + soft, hard = resource.getrlimit(resource.RLIMIT_NOFILE) + resource.setrlimit(resource.RLIMIT_NOFILE, (min(hard, max(Global.soft, soft) * len(input_files)), hard)) + + # Process shards + job_args2 = [] + needed_args = {k: args[k] for k in ["wd", "metrics", "merge_within", "merge_dist", "collapse_nearby", "merge_across", "out_format", "separate", "verbosity", "add_kind"]} + merged_outputs = [] + for block_id, names in job_blocks.items(): + job_files = glob.glob(os.path.join(args['wd'], '*' + block_id + '.vcf')) + to_delete += job_files + srt_keys = {os.path.basename(i).split("~")[0]: i for i in job_files} + file_targets = tuple(srt_keys[n] for n in names) + target_args = deepcopy(needed_args) + fout = os.path.join(args['wd'], block_id + '_merged.vcf') + merged_outputs.append(fout) + target_args["svs_out"] = fout + job_args2.append((target_args, file_targets, seen_names, names, False)) + + if args['procs'] > 1: + pool.starmap(process_file_list, job_args2) else: - pon_idxs = call_from_samp(pon, df, args) - - df = df.drop([i for i in pon_idxs if i in df.index]) # indexes might reference pon dataframe - - # now check pon alignment files for matching soft-clips or other false negative events - df = check_raw_alignments(df, args, pon) - - return df + for ja in job_args2: + process_file_list(*ja) + + # Make a header with all the samples in + vcf_header = None + for result_file in merged_outputs: + vcf_header = pysam.VariantFile(result_file, 'r').header + tmp = list(filter(None, str(vcf_header).split("\n"))) + new_header = pysam.VariantHeader() + for line in tmp: + l = str(line) + if re.search('^#CHROM', l): + break + new_header.add_line(l) + new_header.add_samples(names_list) + vcf_header = new_header + break + + sample_list = list(vcf_header.samples) + logging.info(f"Samples: {sample_list}") + written = sort_into_single_file(out_f, vcf_header, merged_outputs, sample_list) + logging.info("Sample rows before merge {}, rows after {}".format([input_rows[k] for k in names_list], written)) + for item in to_delete: + if os.path.exists(item): + os.remove(item) def view_file(args): t0 = time.time() + if args["input_list"]: + with open(args["input_list"], "r") as il: + args["input_files"] += tuple([i.strip() for i in il.readlines()]) + if args["separate"] == "True": if args["out_format"] == "vcf": raise ValueError("--separate only supported for --out-format csv") @@ -519,79 +632,20 @@ def view_file(args): args["metrics"] = False # only option supported so far args["contigs"] = False - seen_names = sortedcontainers.SortedSet([]) - names_dict = {} - name_c = defaultdict(int) - dfs = [] - header = None - contig_names = None - for item in args["input_files"]: - - if item == "-": - df = pd.read_csv(stdin) - name = list(set(df["sample"])) - if len(name) > 1: - raise ValueError("More than one sample in stdin") - else: - name = name[0] - seen_names.add(name) - else: - name, ext = os.path.splitext(item) - if ext != ".csv": # assume vcf - df, header, _, contig_names = vcf_to_df(item) # header here, assume all input has same number of fields - else: - df = pd.read_csv(item, index_col=None) - name = list(set(df["sample"])) - - if len(name) > 1: - raise ValueError("More than one sample in stdin") - else: - name = name[0] - names_dict[item] = name - if name in seen_names: - logging.info("Sample {} is present in more than one input file".format(name)) - bname = f"{name}_{name_c[name]}" - df["sample"] = [bname] * len(df) - df["table_name"] = [bname for _ in range(len(df))] - seen_names.add(bname) - else: - df["table_name"] = [name for _ in range(len(df))] - seen_names.add(name) - name_c[name] += 1 - - if len(set(df["sample"])) > 1: - raise ValueError("More than one sample per input file") - - df = mung_df(df, args) - if args["merge_within"] == "True": - l_before = len(df) - df = merge_df(df, 1, args["merge_dist"], {}, merge_within_sample=True, - aggressive=args['collapse_nearby'] == "True") - logging.info("{} rows before merge-within {}, rows after {}".format(name, l_before, len(df))) + soft, hard = resource.getrlimit(resource.RLIMIT_NOFILE) + manager = multiprocessing.Manager() + Global = manager.Namespace() + Global.open_files = soft # we dont know how many file descriptors are already open by the user, assume all + Global.soft = soft - if "partners" in df: - del df["partners"] - dfs.append(df) - - df = pd.concat(dfs) - if args["merge_across"] == "True": - if len(seen_names) > 1: - df = merge_df(df, len(seen_names), args["merge_dist"], {}, aggressive=args['collapse_nearby'] == "True") - - df = df.sort_values(["chrA", "posA", "chrB", "posB"]) - - # if "pon" in args and args["pon"] is not None: - # logging.info("Calling SVs from --pon samples") - # df = process_pon(df, args) - - outfile = open_outfile(args, names_dict) - - if args["out_format"] == "vcf": - count = io_funcs.to_vcf(df, args, seen_names, outfile, header=header, small_output_f=True, contig_names=contig_names) - logging.info("Sample rows before merge {}, rows after {}".format(list(map(len, dfs)), count)) + if not args["wd"]: + seen_names, names_list = get_names_list(args["input_files"]) + process_file_list(args, args["input_files"], seen_names, names_list, log_messages=True) else: - to_csv(df, args, outfile, small_output=False) + shard_data(args, args["input_files"], Global=Global) + if args['clean']: + os.rmdir(args['wd']) logging.info("dysgu merge complete h:m:s, {}".format(str(datetime.timedelta(seconds=int(time.time() - t0))), time.time() - t0)) diff --git a/setup.py b/setup.py index 5bd3d37..ef077bf 100644 --- a/setup.py +++ b/setup.py @@ -164,7 +164,7 @@ def get_extra_args(): url="https://github.com/kcleal/dysgu", description="Structural variant calling", license="MIT", - version='1.6.0', + version='1.6.1', python_requires='>=3.7', install_requires=[ # runtime requires 'setuptools>=63.0',