Skip to content

Commit

Permalink
Merge pull request #2507 from DaanDeMeyer/flock
Browse files Browse the repository at this point in the history
Be more careful about concurrent access to outputs
  • Loading branch information
DaanDeMeyer authored Mar 14, 2024
2 parents fcae016 + 480673f commit fff6f24
Show file tree
Hide file tree
Showing 4 changed files with 72 additions and 34 deletions.
77 changes: 49 additions & 28 deletions mkosi/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@
from mkosi.util import (
flatten,
flock,
flock_or_die,
format_rlimit,
make_executable,
one_zero,
Expand Down Expand Up @@ -3316,6 +3317,18 @@ def setup_workspace(args: Args, config: Config) -> Iterator[Path]:
raise


@contextlib.contextmanager
def lock_repository_metadata(config: Config) -> Iterator[None]:
subdir = config.distribution.package_manager(config).subdir(config)

with contextlib.ExitStack() as stack:
for d in ("cache", "lib"):
if (src := config.package_cache_dir_or_default() / d / subdir).exists():
stack.enter_context(flock(src))

yield


def copy_repository_metadata(context: Context) -> None:
if have_cache(context.config):
return
Expand All @@ -3330,29 +3343,32 @@ def copy_repository_metadata(context: Context) -> None:
logging.debug(f"Found repository metadata in {context.package_cache_dir}, not copying repository metadata")
return

for d in ("cache", "lib"):
src = context.config.package_cache_dir_or_default() / d / subdir
if not src.exists():
logging.debug(f"{src} does not exist, not copying repository metadata from it")
continue
with lock_repository_metadata(context.config):
for d in ("cache", "lib"):
src = context.config.package_cache_dir_or_default() / d / subdir
if not src.exists():
logging.debug(f"{src} does not exist, not copying repository metadata from it")
continue

caches = context.config.distribution.package_manager(context.config).cache_subdirs(src) if d == "cache" else []
if d == "cache":
caches = context.config.distribution.package_manager(context.config).cache_subdirs(src)
else:
caches = []

with tempfile.TemporaryDirectory() as tmp:
os.chmod(tmp, 0o755)
with tempfile.TemporaryDirectory() as tmp:
os.chmod(tmp, 0o755)

# cp doesn't support excluding directories but we can imitate it by bind mounting an empty directory over
# the directories we want to exclude.
exclude = flatten(["--ro-bind", tmp, os.fspath(p)] for p in caches)
# cp doesn't support excluding directories but we can imitate it by bind mounting an empty directory
# over the directories we want to exclude.
exclude = flatten(["--ro-bind", tmp, os.fspath(p)] for p in caches)

dst = context.package_cache_dir / d / subdir
with umask(~0o755):
dst.mkdir(parents=True, exist_ok=True)
dst = context.package_cache_dir / d / subdir
with umask(~0o755):
dst.mkdir(parents=True, exist_ok=True)

def sandbox(*, options: Sequence[PathString] = ()) -> list[PathString]:
return context.sandbox(options=[*options, *exclude])
def sandbox(*, options: Sequence[PathString] = ()) -> list[PathString]:
return context.sandbox(options=[*options, *exclude])

with flock(src):
copy_tree(
src, dst,
tools=context.config.tools(),
Expand Down Expand Up @@ -3618,7 +3634,7 @@ def run_shell(args: Args, config: Config) -> None:
if config.ephemeral:
fname = stack.enter_context(copy_ephemeral(config, config.output_dir_or_cwd() / config.output))
else:
fname = config.output_dir_or_cwd() / config.output
fname = stack.enter_context(flock_or_die(config.output_dir_or_cwd() / config.output))

if config.output_format == OutputFormat.disk and args.verb == Verb.boot:
run(
Expand Down Expand Up @@ -3940,7 +3956,12 @@ def run_clean(args: Args, config: Config, *, resources: Path) -> None:
remove_package_cache = args.force > 2

if outputs := list(config.output_dir_or_cwd().glob(f"{config.output}*")):
with complete_step(f"Removing output files of {config.name()} image…"):
with (
complete_step(f"Removing output files of {config.name()} image…"),
flock_or_die(config.output_dir_or_cwd() / config.output)
if (config.output_dir_or_cwd() / config.output).exists()
else contextlib.nullcontext()
):
rmtree(*outputs)

if remove_build_cache:
Expand All @@ -3965,7 +3986,10 @@ def run_clean(args: Args, config: Config, *, resources: Path) -> None:
):
subdir = config.distribution.package_manager(config).subdir(config)

with complete_step(f"Clearing out package cache of {config.name()} image…"):
with (
complete_step(f"Clearing out package cache of {config.name()} image…"),
lock_repository_metadata(config),
):
rmtree(
*(
config.package_cache_dir_or_default() / d / subdir
Expand All @@ -3990,14 +4014,11 @@ def sync_repository_metadata(context: Context) -> None:
if have_cache(context.config) or context.config.cacheonly != Cacheonly.none:
return

with complete_step(f"Syncing package manager metadata for {context.config.name()} image"):
subdir = context.config.distribution.package_manager(context.config).subdir(context.config)

with (
flock(context.config.package_cache_dir_or_default() / "cache" / subdir),
flock(context.config.package_cache_dir_or_default() / "lib" / subdir),
):
context.config.distribution.package_manager(context.config).sync(context)
with (
complete_step(f"Syncing package manager metadata for {context.config.name()} image"),
lock_repository_metadata(context.config),
):
context.config.distribution.package_manager(context.config).sync(context)


def run_sync(args: Args, config: Config, *, resources: Path) -> None:
Expand Down
7 changes: 4 additions & 3 deletions mkosi/qemu.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
from mkosi.tree import copy_tree, rmtree
from mkosi.types import PathString
from mkosi.user import INVOKING_USER, become_root
from mkosi.util import StrEnum, flatten
from mkosi.util import StrEnum, flatten, flock, flock_or_die
from mkosi.versioncomp import GenericVersion

QEMU_KVM_DEVICE_VERSION = GenericVersion("9.0")
Expand Down Expand Up @@ -426,7 +426,8 @@ def copy() -> None:
sandbox=config.sandbox,
)

fork_and_wait(copy)
with flock(src):
fork_and_wait(copy)
yield tmp
finally:
def rm() -> None:
Expand Down Expand Up @@ -705,7 +706,7 @@ def run_qemu(args: Args, config: Config) -> None:
copy_ephemeral(config, config.output_dir_or_cwd() / config.output_with_compression)
)
else:
fname = config.output_dir_or_cwd() / config.output_with_compression
fname = stack.enter_context(flock_or_die(config.output_dir_or_cwd() / config.output_with_compression))

if config.output_format == OutputFormat.disk and config.runtime_size:
run(
Expand Down
19 changes: 17 additions & 2 deletions mkosi/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import contextlib
import copy
import enum
import errno
import fcntl
import functools
import importlib
Expand All @@ -20,6 +21,7 @@
from types import ModuleType
from typing import Any, Callable, Optional, TypeVar, no_type_check

from mkosi.log import die
from mkosi.types import PathString

T = TypeVar("T")
Expand Down Expand Up @@ -130,18 +132,31 @@ def make_executable(*paths: Path) -> None:


@contextlib.contextmanager
def flock(path: Path) -> Iterator[int]:
def flock(path: Path, flags: int = fcntl.LOCK_EX) -> Iterator[int]:
fd = os.open(path, os.O_CLOEXEC|os.O_RDONLY)
try:
fcntl.fcntl(fd, fcntl.FD_CLOEXEC)
logging.debug(f"Acquiring lock on {path}")
fcntl.flock(fd, fcntl.LOCK_EX)
fcntl.flock(fd, flags)
logging.debug(f"Acquired lock on {path}")
yield fd
finally:
os.close(fd)


@contextlib.contextmanager
def flock_or_die(path: Path) -> Iterator[Path]:
try:
with flock(path, fcntl.LOCK_EX|fcntl.LOCK_NB):
yield path
except OSError as e:
if e.errno != errno.EWOULDBLOCK:
raise e

die(f"Cannot lock {path} as it is locked by another process",
hint="Maybe another mkosi process is still using it?")


@contextlib.contextmanager
def scopedenv(env: Mapping[str, Any]) -> Iterator[None]:
old = copy.deepcopy(os.environ)
Expand Down
3 changes: 2 additions & 1 deletion mkosi/vmspawn.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
)
from mkosi.run import run
from mkosi.types import PathString
from mkosi.util import flock_or_die


def run_vmspawn(args: Args, config: Config) -> None:
Expand Down Expand Up @@ -79,7 +80,7 @@ def run_vmspawn(args: Args, config: Config) -> None:
if config.ephemeral:
fname = stack.enter_context(copy_ephemeral(config, config.output_dir_or_cwd() / config.output))
else:
fname = config.output_dir_or_cwd() / config.output
fname = stack.enter_context(flock_or_die(config.output_dir_or_cwd() / config.output))

if config.output_format == OutputFormat.disk and config.runtime_size:
run(
Expand Down

0 comments on commit fff6f24

Please sign in to comment.