Skip to content

Commit

Permalink
feat: Update purl to match specification when ingesting packages from…
Browse files Browse the repository at this point in the history
… Conda - thanks to @RodneyRichardson
  • Loading branch information
madpah authored Jun 16, 2022
2 parents b028c2b + 2999022 commit 072c8f1
Show file tree
Hide file tree
Showing 4 changed files with 69 additions and 23 deletions.
17 changes: 9 additions & 8 deletions cyclonedx_py/parser/conda.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,12 @@
from cyclonedx.model.component import Component
from cyclonedx.parser import BaseParser

# See https://github.com/package-url/packageurl-python/issues/65
from packageurl import PackageURL # type: ignore

from ..utils.conda import CondaPackage, parse_conda_json_to_conda_package, parse_conda_list_str_to_conda_package
from ..utils.conda import (
CondaPackage,
conda_package_to_purl,
parse_conda_json_to_conda_package,
parse_conda_list_str_to_conda_package,
)


class _BaseCondaParser(BaseParser, metaclass=ABCMeta):
Expand Down Expand Up @@ -60,11 +62,10 @@ def _conda_packages_to_components(self) -> None:
"""
for conda_package in self._conda_packages:
purl = conda_package_to_purl(conda_package)
c = Component(
name=conda_package['name'], version=str(conda_package['version']),
purl=PackageURL(
type='pypi', name=conda_package['name'], version=str(conda_package['version'])
)
name=conda_package['name'], version=conda_package['version'],
purl=purl
)
c.external_references.add(ExternalReference(
reference_type=ExternalReferenceType.DISTRIBUTION,
Expand Down
39 changes: 32 additions & 7 deletions cyclonedx_py/utils/conda.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@
from typing import Optional, Tuple
from urllib.parse import urlparse

# See https://github.com/package-url/packageurl-python/issues/65
from packageurl import PackageURL # type: ignore

if sys.version_info >= (3, 8):
from typing import TypedDict
else:
Expand All @@ -41,9 +44,29 @@ class CondaPackage(TypedDict):
name: str
platform: str
version: str
package_format: Optional[str]
md5_hash: Optional[str]


def conda_package_to_purl(pkg: CondaPackage) -> PackageURL:
"""
Return the purl for the specified package.
See https://github.com/package-url/purl-spec/blob/master/PURL-TYPES.rst#conda
"""
qualifiers = {
'build': pkg['build_string'],
'channel': pkg['channel'],
'subdir': pkg['platform'],
}
if pkg['package_format'] is not None:
qualifiers['type'] = str(pkg['package_format'])

purl = PackageURL(
type='conda', name=pkg['name'], version=pkg['version'], qualifiers=qualifiers
)
return purl


def parse_conda_json_to_conda_package(conda_json_str: str) -> Optional[CondaPackage]:
try:
package_data = json.loads(conda_json_str)
Expand All @@ -53,6 +76,7 @@ def parse_conda_json_to_conda_package(conda_json_str: str) -> Optional[CondaPack
if not isinstance(package_data, dict):
return None

package_data.setdefault('package_format', None)
package_data.setdefault('md5_hash', None)
return CondaPackage(package_data) # type: ignore # @FIXME write proper type safe dict at this point

Expand Down Expand Up @@ -87,17 +111,18 @@ def parse_conda_list_str_to_conda_package(conda_list_str: str) -> Optional[Conda
*_package_url_parts, package_arch, package_name_version_build_string = package_parts
package_url = urlparse('/'.join(_package_url_parts))

package_name, build_version, build_string = split_package_string(package_name_version_build_string)
package_name, build_version, build_string, package_format = split_package_string(package_name_version_build_string)
build_string, build_number = split_package_build_string(build_string)

return CondaPackage(
base_url=package_url.geturl(), build_number=build_number, build_string=build_string,
channel=package_url.path[1:], dist_name=f'{package_name}-{build_version}-{build_string}',
name=package_name, platform=package_arch, version=build_version, md5_hash=package_hash
name=package_name, platform=package_arch, version=build_version, package_format=package_format,
md5_hash=package_hash
)


def split_package_string(package_name_version_build_string: str) -> Tuple[str, str, str]:
def split_package_string(package_name_version_build_string: str) -> Tuple[str, str, str, str]:
"""Helper method for parsing package_name_version_build_string.
Returns:
Expand All @@ -110,12 +135,12 @@ def split_package_string(package_name_version_build_string: str) -> Tuple[str, s
*_package_name_parts, build_version, build_string = package_nvbs_parts
package_name = '-'.join(_package_name_parts)

# Split package_format (.conda or .tar.gz) at the end
_pos = build_string.find('.')
if _pos >= 0:
# Remove any .conda at the end if present or other package type eg .tar.gz
build_string = build_string[0:_pos]
package_format = build_string[_pos + 1:]
build_string = build_string[0:_pos]

return package_name, build_version, build_string
return package_name, build_version, build_string, package_format


def split_package_build_string(build_string: str) -> Tuple[str, Optional[int]]:
Expand Down
28 changes: 20 additions & 8 deletions tests/test_parser_conda.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,10 @@
class TestCondaParser(TestCase):

def test_conda_list_json(self) -> None:
conda_list_ouptut_file = os.path.join(os.path.dirname(__file__),
conda_list_output_file = os.path.join(os.path.dirname(__file__),
'fixtures/conda-list-output.json')

with (open(conda_list_ouptut_file, 'r')) as conda_list_output_fh:
with (open(conda_list_output_file, 'r')) as conda_list_output_fh:
parser = CondaListJsonParser(conda_data=conda_list_output_fh.read())

self.assertEqual(34, parser.component_count())
Expand All @@ -42,15 +42,17 @@ def test_conda_list_json(self) -> None:
self.assertIsNotNone(c_idna)
self.assertEqual('idna', c_idna.name)
self.assertEqual('2.10', c_idna.version)
self.assertEqual('pkg:conda/idna@2.10?build=pyhd3eb1b0_0&channel=pkgs/main&subdir=noarch',
c_idna.purl.to_string())
self.assertEqual(1, len(c_idna.external_references), f'{c_idna.external_references}')
self.assertEqual(0, len(c_idna.external_references.pop().hashes))
self.assertEqual(0, len(c_idna.hashes), f'{c_idna.hashes}')

def test_conda_list_explicit_md5(self) -> None:
conda_list_ouptut_file = os.path.join(os.path.dirname(__file__),
conda_list_output_file = os.path.join(os.path.dirname(__file__),
'fixtures/conda-list-explicit-md5.txt')

with (open(conda_list_ouptut_file, 'r')) as conda_list_output_fh:
with (open(conda_list_output_file, 'r')) as conda_list_output_fh:
parser = CondaListExplicitParser(conda_data=conda_list_output_fh.read())

self.assertEqual(34, parser.component_count())
Expand All @@ -60,6 +62,8 @@ def test_conda_list_explicit_md5(self) -> None:
self.assertIsNotNone(c_idna)
self.assertEqual('idna', c_idna.name)
self.assertEqual('2.10', c_idna.version)
self.assertEqual('pkg:conda/idna@2.10?build=pyhd3eb1b0_0&channel=pkgs/main&subdir=noarch&type=tar.bz2',
c_idna.purl.to_string())
self.assertEqual(1, len(c_idna.external_references), f'{c_idna.external_references}')
self.assertEqual(0, len(c_idna.external_references.pop().hashes))
self.assertEqual(1, len(c_idna.hashes), f'{c_idna.hashes}')
Expand All @@ -70,8 +74,8 @@ def test_conda_list_explicit_md5(self) -> None:
def test_conda_list_build_number_text(self) -> None:
conda_list_output_file = os.path.join(os.path.dirname(__file__), 'fixtures/conda-list-build-number-text.txt')

with (open(conda_list_output_file, 'r')) as conda_list_ouptut_fh:
parser = CondaListExplicitParser(conda_data=conda_list_ouptut_fh.read())
with (open(conda_list_output_file, 'r')) as conda_list_output_fh:
parser = CondaListExplicitParser(conda_data=conda_list_output_fh.read())

self.assertEqual(39, parser.component_count())
components = parser.get_components()
Expand All @@ -80,21 +84,29 @@ def test_conda_list_build_number_text(self) -> None:
self.assertIsNotNone(c_libgcc_mutex)
self.assertEqual('_libgcc_mutex', c_libgcc_mutex.name)
self.assertEqual('0.1', c_libgcc_mutex.version)
self.assertEqual('pkg:conda/_libgcc_mutex@0.1?build=main&channel=pkgs/main&subdir=linux-64&type=conda',
c_libgcc_mutex.purl.to_string())
self.assertEqual(0, len(c_libgcc_mutex.hashes), f'{c_libgcc_mutex.hashes}')

c_pycparser = next(filter(lambda c: c.name == 'pycparser', components), None)
self.assertIsNotNone(c_pycparser)
self.assertEqual('pycparser', c_pycparser.name)
self.assertEqual('2.21', c_pycparser.version)
self.assertEqual('pkg:conda/pycparser@2.21?build=pyhd3eb1b0_0&channel=pkgs/main&subdir=noarch&type=conda',
c_pycparser.purl.to_string())
self.assertEqual(0, len(c_pycparser.hashes), f'{c_pycparser.hashes}')

c_openmp_mutex = next(filter(lambda c: c.name == '_openmp_mutex', components), None)
self.assertIsNotNone(c_openmp_mutex)
self.assertEqual('_openmp_mutex', c_openmp_mutex.name)
self.assertEqual('4.5', c_openmp_mutex.version)
self.assertEqual('pkg:conda/_openmp_mutex@4.5?build=1_gnu&channel=pkgs/main&subdir=linux-64&type=tar.bz2',
c_openmp_mutex.purl.to_string())
self.assertEqual(0, len(c_openmp_mutex.hashes), f'{c_openmp_mutex.hashes}')

def test_conda_list_malformed(self) -> None:
conda_list_output_file = os.path.join(os.path.dirname(__file__), 'fixtures/conda-list-broken.txt')

with (open(conda_list_output_file, 'r')) as conda_list_ouptut_fh:
with (open(conda_list_output_file, 'r')) as conda_list_output_fh:
with self.assertRaisesRegex(ValueError, re.compile(r'^unexpected format', re.IGNORECASE)):
CondaListExplicitParser(conda_data=conda_list_ouptut_fh.read())
CondaListExplicitParser(conda_data=conda_list_output_fh.read())
8 changes: 8 additions & 0 deletions tests/test_utils_conda.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ def test_parse_conda_list_str_no_hash(self) -> None:
self.assertEqual('chardet', cp['name'])
self.assertEqual('osx-64', cp['platform'])
self.assertEqual('4.0.0', cp['version'])
self.assertEqual('conda', cp['package_format'])
self.assertIsNone(cp['md5_hash'])

def test_parse_conda_list_str_with_hash_1(self) -> None:
Expand All @@ -77,6 +78,7 @@ def test_parse_conda_list_str_with_hash_1(self) -> None:
self.assertEqual('tzdata', cp['name'])
self.assertEqual('noarch', cp['platform'])
self.assertEqual('2021a', cp['version'], )
self.assertEqual('conda', cp['package_format'])
self.assertEqual('d42e4db918af84a470286e4c300604a3', cp['md5_hash'])

def test_parse_conda_list_str_with_hash_2(self) -> None:
Expand All @@ -94,6 +96,7 @@ def test_parse_conda_list_str_with_hash_2(self) -> None:
self.assertEqual('ca-certificates', cp['name'])
self.assertEqual('osx-64', cp['platform'])
self.assertEqual('2021.7.5', cp['version'], )
self.assertEqual('conda', cp['package_format'])
self.assertEqual('c2d0ae65c08dacdcf86770b7b5bbb187', cp['md5_hash'])

def test_parse_conda_list_str_with_hash_3(self) -> None:
Expand All @@ -111,6 +114,7 @@ def test_parse_conda_list_str_with_hash_3(self) -> None:
self.assertEqual('idna', cp['name'])
self.assertEqual('noarch', cp['platform'])
self.assertEqual('2.10', cp['version'], )
self.assertEqual('tar.bz2', cp['package_format'])
self.assertEqual('153ff132f593ea80aae2eea61a629c92', cp['md5_hash'])

def test_parse_conda_list_str_with_hash_4(self) -> None:
Expand All @@ -128,6 +132,7 @@ def test_parse_conda_list_str_with_hash_4(self) -> None:
self.assertEqual('_libgcc_mutex', cp['name'])
self.assertEqual('linux-64', cp['platform'])
self.assertEqual('0.1', cp['version'])
self.assertEqual('tar.bz2', cp['package_format'])
self.assertEqual('d7c89558ba9fa0495403155b64376d81', cp['md5_hash'])

def test_parse_conda_list_build_number(self) -> None:
Expand All @@ -144,6 +149,7 @@ def test_parse_conda_list_build_number(self) -> None:
self.assertEqual('chardet', cp['name'])
self.assertEqual('osx-64', cp['platform'])
self.assertEqual('4.0.0', cp['version'])
self.assertEqual('conda', cp['package_format'])
self.assertIsNone(cp['md5_hash'])

def test_parse_conda_list_no_build_number(self) -> None:
Expand All @@ -160,6 +166,7 @@ def test_parse_conda_list_no_build_number(self) -> None:
self.assertEqual('_libgcc_mutex', cp['name'])
self.assertEqual('linux-64', cp['platform'])
self.assertEqual('0.1', cp['version'])
self.assertEqual('conda', cp['package_format'])
self.assertIsNone(cp['md5_hash'])

def test_parse_conda_list_no_build_number2(self) -> None:
Expand All @@ -176,4 +183,5 @@ def test_parse_conda_list_no_build_number2(self) -> None:
self.assertEqual('_openmp_mutex', cp['name'])
self.assertEqual('linux-64', cp['platform'])
self.assertEqual('4.5', cp['version'])
self.assertEqual('tar.bz2', cp['package_format'])
self.assertIsNone(cp['md5_hash'])

0 comments on commit 072c8f1

Please sign in to comment.