Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pulsar-1 #39

Merged
merged 6 commits into from
Aug 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 14 additions & 4 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ dependencies = [
"numpy",
"numba",
"astropy",
"matplotlib",
"h5py",
"bottleneck",
"attrs",
Expand Down Expand Up @@ -61,7 +62,14 @@ spp_extract = "sigpyproc.apps.spp_extract:main"
spp_clean = "sigpyproc.apps.spp_clean:main"

[tool.ruff]
include = ["pyproject.toml", "sigpyproc/**/*.py"]
include = [
"pyproject.toml",
"sigpyproc/**/*.py",
"tests/**/*.py",
]

exclude = ["sigpyproc/apps/spp_digifil.py"]

line-length = 88
indent-width = 4
target-version = "py39"
Expand All @@ -75,12 +83,12 @@ select = ["ALL"]
ignore = ["D1", "ANN1", "PLR2004", "G004"]

[tool.ruff.lint.pylint]
max-args = 10
max-args = 15

[tool.ruff.lint.pydocstyle]
convention = "numpy"

[tool.ruff.per-file-ignores]
[tool.ruff.lint.per-file-ignores]
"tests/**/*.py" = ["S101", "FBT", "PLR2004", "PT011", "SLF001"]

[tool.pytest.ini_options]
Expand All @@ -91,7 +99,7 @@ testpaths = "tests"
source = ["./sigpyproc/"]

[tool.coverage.run]
omit = ["tests/*", "docs/*", "*__init__.py", "sigpyproc/core/kernels.py"]
omit = ["tests/*", "docs/*", "*__init__.py"]

[tool.coverage.report]
show_missing = true
Expand All @@ -100,10 +108,12 @@ ignore_errors = true
exclude_lines = [
'raise NotImplementedError',
'if TYPE_CHECKING:',
'except ModuleNotFoundError:',
'if __name__ == "__main__":',
'if outfile_name is None:',
]

[tool.mypy]
enable_incomplete_feature = ["Unpack"]
ignore_missing_imports = true
plugins = ["numpy.typing.mypy_plugin"]
32 changes: 13 additions & 19 deletions sigpyproc/apps/spp_digifil.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,26 +74,20 @@
help="Output filename",
)
def main(
filfile,
cont,
nbits,
block_size,
rescale_constant,
tscrunch_factor,
fscrunch_factor,
rescale_seconds,
scale_fac,
apply_FITS_scale_and_offset,
):
filfile: str,
cont: bool,
nbits: int,
block_size: int,
rescale_constant: bool,
tscrunch_factor: int,
fscrunch_factor: int,
rescale_seconds: float,
scale_fac: float,
apply_FITS_scale_and_offset: bool,
) -> None:
"""Convert to sigproc output digifil style."""
raise NotImplementedError("This function is not implemented yet.")
#logger = get_logger(__name__)
#nbytes_per_sample =
#gulpsize = block_size * 1024 * 1024 //
#logger.info(f"Reading {filfile}")
#fil = FilReader(filfile)
#fil.downsample(tfactor=tfactor, ffactor=ffactor, gulp=gulp, filename=outfile)

msg = "This function is not implemented yet."

Check warning on line 89 in sigpyproc/apps/spp_digifil.py

View check run for this annotation

Codecov / codecov/patch

sigpyproc/apps/spp_digifil.py#L89

Added line #L89 was not covered by tests
raise NotImplementedError(msg)


if __name__ == "__main__":
Expand Down
156 changes: 99 additions & 57 deletions sigpyproc/base.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from __future__ import annotations

from abc import ABC, abstractmethod
from contextlib import ExitStack
from typing import TYPE_CHECKING

import numpy as np
Expand Down Expand Up @@ -45,7 +46,13 @@ def header(self) -> Header:
""":class:`~sigpyproc.header.Header`: Header metadata of input file."""

@abstractmethod
def read_block(self, start: int, nsamps: int) -> FilterbankBlock:
def read_block(
self,
start: int,
nsamps: int,
fch1: float | None = None,
nchans: int | None = None,
) -> FilterbankBlock:
"""Read a data block from the filterbank file stream.

Parameters
Expand All @@ -54,6 +61,10 @@ def read_block(self, start: int, nsamps: int) -> FilterbankBlock:
first time sample of the block to be read
nsamps : int
number of samples in the block (i.e. block will be nsamps*nchans in size)
fch1 : float, optional
frequency of the first channel, by default None (header value)
nchans : int, optional
number of channels in the block, by default None (header value)

Returns
-------
Expand All @@ -63,7 +74,7 @@ def read_block(self, start: int, nsamps: int) -> FilterbankBlock:
Raises
------
ValueError
if requested samples are out of range
if requested samples or channels are out of range
"""

@abstractmethod
Expand Down Expand Up @@ -178,13 +189,13 @@ def compute_stats(
Keyword arguments for :func:`read_plan`.
"""
bag = ChannelStats(self.header.nchans, self.header.nsamples)
for nsamps_r, ii, data in self.read_plan(
for _, ii, data in self.read_plan(
gulp=gulp,
start=start,
nsamps=nsamps,
**plan_kwargs,
):
bag.push_data(data, nsamps_r, ii, mode="full")
bag.push_data(data, ii, mode="full")
self._chan_stats = bag

def compute_stats_basic(
Expand All @@ -208,13 +219,13 @@ def compute_stats_basic(
Keyword arguments for :func:`read_plan`.
"""
bag = ChannelStats(self.header.nchans, self.header.nsamples)
for nsamps_r, ii, data in self.read_plan(
for _, ii, data in self.read_plan(
gulp=gulp,
start=start,
nsamps=nsamps,
**plan_kwargs,
):
bag.push_data(data, nsamps_r, ii, mode="basic")
bag.push_data(data, ii, mode="basic")
self._chan_stats = bag

def collapse(
Expand Down Expand Up @@ -289,7 +300,10 @@ def bandpass(
kernels.extract_bpass(data, bpass_ar, self.header.nchans, nsamps_r)
num_samples += nsamps_r
bpass_ar /= num_samples
return TimeSeries(bpass_ar, self.header.new_header({"nchans": 1}))
return TimeSeries(
bpass_ar,
self.header.new_header({"nchans": 1, "nsamples": len(bpass_ar)}),
)

def dedisperse(
self,
Expand Down Expand Up @@ -345,7 +359,10 @@ def dedisperse(
nsamps_r,
ii * (gulp - max_delay),
)
return TimeSeries(tim_ar, self.header.new_header({"nchans": 1, "dm": dm}))
return TimeSeries(
tim_ar,
self.header.new_header({"nchans": 1, "dm": dm, "nsamples": tim_len}),
)

def read_chan(
self,
Expand Down Expand Up @@ -626,6 +643,7 @@ def extract_chans(
self,
chans: np.ndarray | None = None,
outfile_base: str | None = None,
batch_size: int = 200,
gulp: int = 16384,
start: int = 0,
nsamps: int | None = None,
Expand All @@ -639,6 +657,8 @@ def extract_chans(
channel numbers to extract, by default all channels
outfile_base : str, optional
base name of output files, by default ``header.basename``.
batch_size : int, optional
number of channels to extract in each batch, by default 200
gulp : int, optional
number of samples in each read, by default 16384
start : int, optional
Expand Down Expand Up @@ -671,30 +691,38 @@ def extract_chans(
raise ValueError(msg)
if outfile_base is None:
outfile_base = self.header.basename

filenames = [
f"{outfile_base}_chan{chans[ichan]:04d}.tim"
for ichan in range(nchans_extract)
]
out_files = [
self.header.prep_outfile(
filenames[ichan],
updates={"nchans": 1, "nbits": 32, "data_type": "time series"},
nbits=32,
)
for ichan in range(nchans_extract)
]
for nsamps_r, _ii, data in self.read_plan(
gulp=gulp,
start=start,
nsamps=nsamps,
**plan_kwargs,
):
data_2d = data.reshape(nsamps_r, self.header.nchans)
for ifile, out_file in enumerate(out_files):
out_file.cwrite(data_2d[:, chans[ifile]])
for out_file in out_files:
out_file.close()
filenames = [f"{outfile_base}_chan{chan:04d}.tim" for chan in chans]

# Process in batches to avoid file open/close limits
for batch_start in range(0, nchans_extract, batch_size):
batch_end = min(batch_start + batch_size, nchans_extract)
batch_chans = chans[batch_start:batch_end]
batch_files = filenames[batch_start:batch_end]

with ExitStack() as stack:
out_files = [
stack.enter_context(
self.header.prep_outfile(
filename,
updates={
"nchans": 1,
"nbits": 32,
"data_type": "time series",
},
nbits=32,
),
)
for filename in batch_files
]
for nsamps_r, _, data in self.read_plan(
gulp=gulp,
start=start,
nsamps=nsamps,
**plan_kwargs,
):
data_2d = data.reshape(nsamps_r, self.header.nchans)
for ifile, out_file in enumerate(out_files):
out_file.cwrite(data_2d[:, batch_chans[ifile]])
return filenames

def extract_bands(
Expand All @@ -703,6 +731,7 @@ def extract_bands(
nchans: int,
chanpersub: int | None = None,
outfile_base: str | None = None,
batch_size: int = 200,
gulp: int = 16384,
start: int = 0,
nsamps: int | None = None,
Expand All @@ -720,6 +749,8 @@ def extract_bands(
number of channels in each sub-band, by default ``nchans``
outfile_base: str, optional
base name of output files, by default ``header.basename``.
batch_size: int, optional
number of sub-bands to extract in each batch, by default 200
gulp : int, optional
number of samples in each read, by default 16384
start : int, optional
Expand Down Expand Up @@ -767,31 +798,42 @@ def extract_bands(
outfile_base = self.header.basename

filenames = [f"{outfile_base}_sub{isub:02d}.fil" for isub in range(nsub)]
out_files = [
self.header.prep_outfile(
filenames[isub],
updates={
"nchans": chanpersub,
"fch1": fstart + isub * chanpersub * self.header.foff,
},
nbits=self.header.nbits,
)
for isub in range(nsub)
]

for nsamps_r, _ii, data in self.read_plan(
gulp=gulp,
start=start,
nsamps=nsamps,
**plan_kwargs,
):
data_2d = data.reshape(nsamps_r, self.header.nchans)
for ifile, out_file in enumerate(out_files):
iband_chanstart = chanstart + ifile * chanpersub
subband_ar = data_2d[:, iband_chanstart : iband_chanstart + chanpersub]
out_file.cwrite(subband_ar.ravel())
for out_file in out_files:
out_file.close()
# Process in batches to avoid file open/close limits
for batch_start in range(0, nsub, batch_size):
batch_end = min(batch_start + batch_size, nsub)
batch_files = filenames[batch_start:batch_end]

with ExitStack() as stack:
out_files = [
stack.enter_context(
self.header.prep_outfile(
filename,
updates={
"nchans": chanpersub,
"fch1": fstart
+ (batch_start + i) * chanpersub * self.header.foff,
},
nbits=self.header.nbits,
),
)
for i, filename in enumerate(batch_files)
]

for nsamps_r, _ii, data in self.read_plan(
gulp=gulp,
start=start,
nsamps=nsamps,
**plan_kwargs,
):
data_2d = data.reshape(nsamps_r, self.header.nchans)
for ifile, out_file in enumerate(out_files):
iband_chanstart = chanstart + (batch_start + ifile) * chanpersub
subband_ar = data_2d[
:,
iband_chanstart : iband_chanstart + chanpersub,
]
out_file.cwrite(subband_ar.ravel())
return filenames

def requantize(
Expand Down Expand Up @@ -892,7 +934,7 @@ def remove_zerodm(
if outfile_name is None:
outfile_name = f"{self.header.basename}_noZeroDM.fil"

bpass = self.bandpass(**plan_kwargs)
bpass = self.bandpass(**plan_kwargs).data
chanwts = bpass / bpass.sum()
out_ar = np.empty(
self.header.nsamples * self.header.nchans,
Expand Down
Loading
Loading