Skip to content

Commit

Permalink
add support for downloading arbitrary archive files
Browse files Browse the repository at this point in the history
  • Loading branch information
stefan6419846 committed Nov 21, 2023
1 parent d21decb commit 1db1fc6
Show file tree
Hide file tree
Showing 3 changed files with 94 additions and 16 deletions.
30 changes: 16 additions & 14 deletions license_tools/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,37 +11,38 @@

def main() -> None:
parser = argparse.ArgumentParser(
description='Run selected license tools. Will determine all license from the given source by default.',
description="Run selected license tools. Will determine all license from the given source by default.",
)

source_group = parser.add_argument_group('Artifact source')
source_group = parser.add_argument_group("Artifact source")
source_group = source_group.add_mutually_exclusive_group(required=False)
source_group.add_argument('--directory', action='store', type=str, help='Directory to work on.')
source_group.add_argument('--file', action='store', type=str, help='File to work on.')
source_group.add_argument('--archive', action='store', type=str, help='Archive file to work on.')
source_group.add_argument('--package', action='store', type=str, help='Package specification to use.')
source_group.add_argument("--directory", action="store", type=str, help="Directory to work on.")
source_group.add_argument("--file", action="store", type=str, help="File to work on.")
source_group.add_argument("--archive", action="store", type=str, help="Archive file to work on.")
source_group.add_argument("--package", action="store", type=str, help="Package specification to use.")
source_group.add_argument("--url", action="store", type=str, help="Download URL to use.")

parser.add_argument(
'--index-url', action='store', type=str, required=False, default='', help='PyPI index URL to use.'
"--index-url", action="store", type=str, required=False, default="", help="PyPI index URL to use."
)
parser.add_argument(
'--jobs', action='store', type=int, required=False, default=4, help='Parallel jobs to use.'
"--jobs", action="store", type=int, required=False, default=4, help="Parallel jobs to use."
)

parser.add_argument(
'--retrieve-copyrights', action='store_true', required=False, default=False, help='Retrieve copyrights.'
"--retrieve-copyrights", action="store_true", required=False, default=False, help="Retrieve copyrights."
)
parser.add_argument(
'--retrieve-emails', action='store_true', required=False, default=False, help='Retrieve e-mails.'
"--retrieve-emails", action="store_true", required=False, default=False, help="Retrieve e-mails."
)
parser.add_argument(
'--retrieve-file-info', action='store_true', required=False, default=False, help='Retrieve file information.'
"--retrieve-file-info", action="store_true", required=False, default=False, help="Retrieve file information."
)
parser.add_argument(
'--retrieve-urls', action='store_true', required=False, default=False, help='Retrieve URLs.'
"--retrieve-urls", action="store_true", required=False, default=False, help="Retrieve URLs."
)
parser.add_argument(
'--retrieve-ldd-data', action='store_true', required=False, default=False, help='Retrieve shared object linking data.'
"--retrieve-ldd-data", action="store_true", required=False, default=False, help="Retrieve shared object linking data."
)

arguments = parser.parse_args()
Expand All @@ -51,6 +52,7 @@ def main() -> None:
file_path=arguments.file,
archive_path=arguments.archive,
package_definition=arguments.package,
download_url=arguments.url,
index_url=arguments.index_url,
job_count=arguments.jobs,
retrieve_copyrights=arguments.retrieve_copyrights,
Expand All @@ -61,5 +63,5 @@ def main() -> None:
)


if __name__ == '__main__':
if __name__ == "__main__":
main()
49 changes: 47 additions & 2 deletions license_tools/scancode_tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,10 @@
from collections import defaultdict
from dataclasses import dataclass, field as dataclass_field
from pathlib import Path
from tempfile import TemporaryDirectory
from tempfile import NamedTemporaryFile, TemporaryDirectory
from typing import cast, Generator

import requests
import scancode_config # type: ignore[import-untyped]
from commoncode import fileutils # type: ignore[import-untyped]
from joblib import Parallel, delayed # type: ignore[import-untyped]
Expand Down Expand Up @@ -454,6 +455,40 @@ def run_on_package_archive_file(
)


def run_on_downloaded_archive_file(
download_url: str,
job_count: int = 4,
retrieval_flags: int = 0,
) -> Generator[FileResults, None, None]:
"""
Run the analysis on the given archive file after downloading it.
:param download_url: The URL to download the archive from.
:param job_count: The number of parallel jobs to use.
:param retrieval_flags: Values to retrieve.
:return: The requested results.
"""
# Retrieving the correct suffixes is a bit tricky here, so we use some guessing as well.
# This basically uses the trailing URL part (usually the filename itself) and forwards
# guessing the suffixes to Python itself. Due to the way the suffixes are determined,
# we probably get some part of the (irrelevant) filename as well as the suffix (due to
# dotted package versions for example), but this should not hurt. We just have to make
# sure that we do not really loose important suffix information as Python will not be
# able to unpack this archive otherwise, for example because we supply `.gz` instead
# of `.tar.gz` only.
suffixes = Path(download_url.rsplit("/", maxsplit=1)[1]).suffixes
suffix = "".join(suffixes)
with NamedTemporaryFile(suffix=suffix) as downloaded_file:
response = requests.get(url=download_url)
downloaded_file.write(response.content)
downloaded_file.seek(0)
yield from run_on_package_archive_file(
archive_path=Path(downloaded_file.name),
job_count=job_count,
retrieval_flags=retrieval_flags,
)


def run_on_downloaded_package_file(
package_definition: str,
index_url: str | None = None,
Expand Down Expand Up @@ -518,6 +553,7 @@ def run(
file_path: Path | str | None = None,
archive_path: Path | str | None = None,
package_definition: str | None = None,
download_url: str | None = None,
index_url: str | None = None,
job_count: int = 4,
retrieve_copyrights: bool = False,
Expand All @@ -536,6 +572,7 @@ def run(
:param file_path: The file to run on.
:param archive_path: The package archive to run on.
:param package_definition: The package definition to run for.
:param download_url: The package URL to download and run on.
:param index_url: The PyPI index URL to use. Uses the default one from the `.pypirc` file if unset.
:param job_count: The number of parallel jobs to use.
:param retrieve_copyrights: Whether to retrieve copyright information.
Expand All @@ -549,7 +586,7 @@ def run(
atexit.register(cleanup, scancode_config.scancode_temp_dir)

assert _check_that_exactly_one_value_is_set(
[directory, file_path, archive_path, package_definition]
[directory, file_path, archive_path, package_definition, download_url]
), "Exactly one source is required."

license_counts: dict[str | None, int] = defaultdict(int)
Expand Down Expand Up @@ -587,6 +624,14 @@ def run(
job_count=job_count,
)
)
elif download_url:
results = list(
run_on_downloaded_archive_file(
download_url=download_url,
retrieval_flags=retrieval_flags,
job_count=job_count,
)
)
elif file_path:
results = [
run_on_file(
Expand Down
31 changes: 31 additions & 0 deletions tests/test_scancode_tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,29 @@ def test_non_wheel_file(self) -> None:
self._check_call(suffix=".tar.gz", url=url, expected_files=TYPING_EXTENSION_4_8_0__SOURCE_FILES)


class RunOnDownloadedArchiveFileTestCase(TestCase):
def _check_call(self, suffix: str, url: str) -> None:
directory_result = [object(), object(), object()]

def run_on_package_archive_file(archive_path: Path, job_count: int, retrieval_flags: int) -> Generator[Any, None, None]:
self.assertEqual(2, job_count)
self.assertEqual(42, retrieval_flags)
self.assertEqual(suffix, archive_path.name[-len(suffix):])
yield from directory_result

with mock.patch.object(scancode_tools, "run_on_package_archive_file", side_effect=run_on_package_archive_file):
result = list(scancode_tools.run_on_downloaded_archive_file(download_url=url, job_count=2, retrieval_flags=42))
self.assertEqual(directory_result, result)

def test_wheel_file(self) -> None:
url = "https://files.pythonhosted.org/packages/24/21/7d397a4b7934ff4028987914ac1044d3b7d52712f30e2ac7a2ae5bc86dd0/typing_extensions-4.8.0-py3-none-any.whl" # noqa: E501
self._check_call(suffix=".whl", url=url)

def test_non_wheel_file(self) -> None:
url = "https://files.pythonhosted.org/packages/1f/7a/8b94bb016069caa12fc9f587b28080ac33b4fbb8ca369b98bc0a4828543e/typing_extensions-4.8.0.tar.gz"
self._check_call(suffix=".tar.gz", url=url)


class RunOnDownloadedPackageFileTestCase(TestCase):
def test_valid_package_name(self) -> None:
stderr = StringIO()
Expand Down Expand Up @@ -424,6 +447,14 @@ def test_archive_path(self) -> None:
self.assertEqual(TYPING_EXTENSION_4_8_0__LICENSES, result)
self.assertEqual(TYPING_EXTENSION_4_8_0__EXPECTED_OUTPUT, str(stdout))

def test_download_url(self) -> None:
with self.record_stdout() as stdout:
with mock.patch.object(scancode_tools, "run_on_downloaded_archive_file", return_value=iter(TYPING_EXTENSION_4_8_0__LICENSES)) as run_mock:
result = scancode_tools.run(download_url="https://example.org/archive.tar.gz", retrieve_copyrights=True, job_count=1)
run_mock.assert_called_once_with(download_url="https://example.org/archive.tar.gz", retrieval_flags=1, job_count=1)
self.assertEqual(TYPING_EXTENSION_4_8_0__LICENSES, result)
self.assertEqual(TYPING_EXTENSION_4_8_0__EXPECTED_OUTPUT, str(stdout))

def test_file_path(self) -> None:
with self.record_stdout() as stdout:
result = scancode_tools.run(file_path=SETUP_PATH, job_count=1)
Expand Down

0 comments on commit 1db1fc6

Please sign in to comment.