Skip to content

Commit

Permalink
Removed asyncio
Browse files Browse the repository at this point in the history
  • Loading branch information
michaelgatzen committed Sep 16, 2024
1 parent 3791870 commit 9a5bdbf
Showing 1 changed file with 20 additions and 44 deletions.
64 changes: 20 additions & 44 deletions ImputationPipeline/ScoreBGE/ScoreBGE.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,11 +59,6 @@ def _compare_record_and_weight(self, record, weight):
if record.pos <= weight.position <= record.stop:
return 0
return record.pos - weight.position

def _sites_scored_sort_key(self, site):
locus = site[0]
contig, position = locus.split(':')
return (self.ref_dict.index(contig), int(position))

def _check_biallelic(self, record):
if record is not None and len(record.alts) > 1:
Expand Down Expand Up @@ -127,12 +122,11 @@ def _print_wes_and_wgs_metrics(self):
print(f'WES GVCF + WGS VCF Scoring:')
print(f' Total sites scored: Min: {sites_scored_min_max[0]} Max: {sites_scored_min_max[1]}')

async def _process_weight_wes_asyncio(self, weight, gvcf, site_gq_threshold, start_time, step_time, lock):
def _process_weight_wes(self, weight, gvcf, site_gq_threshold, start_time, step_time):
i_weight = weight.Index
if i_weight % 100000 == 0:
async with lock:
print(f'Scored {i_weight} sites. Current locus: {weight.locus}. Time elapsed: {(datetime.now() - start_time).total_seconds():,.0f}s. Time since last step: {(datetime.now() - step_time[0]).total_seconds():,.0f}s')
step_time[0] = datetime.now()
print(f'Scored {i_weight} sites. Current locus: {weight.locus}. Time elapsed: {(datetime.now() - start_time).total_seconds():,.0f}s. Time since last step: {(datetime.now() - step_time[0]).total_seconds():,.0f}s')
step_time[0] = datetime.now()

site_records = gvcf.fetch(weight.contig, weight.position - 1, weight.position)
record = next(site_records, None)
Expand All @@ -146,17 +140,7 @@ async def _process_weight_wes_asyncio(self, weight, gvcf, site_gq_threshold, sta
return

if self._compare_record_and_weight(record, weight) == 0:
async with lock:
self._gvcf_score_site(record, weight, site_gq_threshold)

async def _process_weights_wes_asyncio(self, prs_weights, gvcf, site_gq_threshold):
start_time = datetime.now()
step_time = [start_time] # List to allow for mutable variable in async function
lock = asyncio.Lock()

tasks = [self._process_weight_wes_asyncio(weight, gvcf, site_gq_threshold, start_time, step_time, lock) for weight in prs_weights.itertuples()]
await asyncio.gather(*tasks)

self._gvcf_score_site(record, weight, site_gq_threshold)

def score_wes_gvcf(self, gvcf_path, sample_names=None, site_gq_threshold=30):
"""
Expand All @@ -183,20 +167,21 @@ def score_wes_gvcf(self, gvcf_path, sample_names=None, site_gq_threshold=30):
self.gvcf_sites_scored = {sample_name: [] for sample_name in self.sample_names}
self.gvcf_low_quality_sites = {sample_name: [] for sample_name in self.sample_names}

asyncio.run(self._process_weights_wes_asyncio(self.prs_weights, gvcf, site_gq_threshold))

for sample_name in self.sample_names:
self.gvcf_sites_scored[sample_name].sort(key=self._sites_scored_sort_key)
start_time = datetime.now()
step_time = [start_time] # list in order to pass by reference

for weight in self.prs_weights.itertuples():
self._process_weight_wes(weight, gvcf, site_gq_threshold, start_time, step_time)

# Save the scored sites as a set for faster lookup
self.gvcf_sites_scored_set = {key: set(value) for key, value in self.gvcf_sites_scored.items()}
self._print_wes_gvcf_metrics()

async def _process_weight_wgs_asyncio(self, weight, vcf, start_time, step_time, lock):
def _process_weight_wgs(self, weight, vcf, start_time, step_time):
i_weight = weight.Index
if i_weight % 100000 == 0:
async with lock:
print(f'Scored {i_weight} sites. Current locus: {weight.locus}. Time elapsed: {(datetime.now() - start_time).total_seconds():,.0f}s. Time since last step: {(datetime.now() - step_time[0]).total_seconds():,.0f}s')
step_time[0] = datetime.now()
print(f'Scored {i_weight} sites. Current locus: {weight.locus}. Time elapsed: {(datetime.now() - start_time).total_seconds():,.0f}s. Time since last step: {(datetime.now() - step_time[0]).total_seconds():,.0f}s')
step_time[0] = datetime.now()

site_records = vcf.fetch(weight.contig, weight.position - 1, weight.position)

Expand All @@ -214,20 +199,10 @@ async def _process_weight_wgs_asyncio(self, weight, vcf, start_time, step_time,

# Skip the weight if the record was not found
if record is None:
async with lock:
self.vcf_num_sites_not_found += 1
print(' Site not found in VCF:', weight.locus, weight.ref, weight.alt)
self.vcf_num_sites_not_found += 1
print(' Site not found in VCF:', weight.locus, weight.ref, weight.alt)
return
async with lock:
self._vcf_score_dosage(record, weight)

async def _process_weights_wgs_asyncio(self, prs_weights, vcf):
start_time = datetime.now()
step_time = [start_time] # List to allow for mutable variable in async function
lock = asyncio.Lock()

tasks = [self._process_weight_wgs_asyncio(weight, vcf, start_time, step_time, lock) for weight in prs_weights.itertuples()]
await asyncio.gather(*tasks)
self._vcf_score_dosage(record, weight)

def score_wgs_vcf(self, wgs_vcf_path, sample_names=None, allow_wgs_vcf_only=False):
"""
Expand Down Expand Up @@ -260,10 +235,11 @@ def score_wgs_vcf(self, wgs_vcf_path, sample_names=None, allow_wgs_vcf_only=Fals
self.vcf_sites_scored = {sample_name: [] for sample_name in self.sample_names}
self.vcf_num_sites_not_found = 0

asyncio.run(self._process_weights_wgs_asyncio(self.prs_weights, vcf))
start_time = datetime.now()
step_time = [start_time] # List for passing by reference

for sample_name in self.sample_names:
self.vcf_sites_scored[sample_name].sort(key=self._sites_scored_sort_key)
for weight in self.prs_weights.itertuples():
self._process_weight_wgs(weight, vcf, start_time, step_time)

self._print_wgs_vcf_metrics()
if self.gvcf_sites_scored is not None:
Expand Down

0 comments on commit 9a5bdbf

Please sign in to comment.