testsuite: add a NAR generator with some evil NARs
This also rewrites a lot of the command handling in the fixtures
library, since we want to more precisely control which way that the nix
store is set up in the tests, rather than the previous method of
renaming /nix/store to some temp dir (which allows builds but does not
allow any /nix/store paths or stability across runs, which is a
significant issue for snapshot testing).
It uses a builder to reduce the amount of state carelessly thrown
around.
The evil NARs are inspired by CVE-2024-45593
(https://github.com/NixOS/nix/security/advisories/GHSA-h4vv-h3jq-v493).
No bugs were found in this endeavor.
Change-Id: Iee41b055fa96529c5a3c761f680ed1d0667ba5da
This commit is contained in:
parent
4180b84a67
commit
3571817e3a
0
tests/functional2/store/__init__.py
Normal file
0
tests/functional2/store/__init__.py
Normal file
119
tests/functional2/store/test_evil_nars.py
Normal file
119
tests/functional2/store/test_evil_nars.py
Normal file
|
@ -0,0 +1,119 @@
|
||||||
|
import pytest
|
||||||
|
import os
|
||||||
|
from ..testlib.nar import *
|
||||||
|
from ..testlib.fixtures import Nix
|
||||||
|
from io import BytesIO
|
||||||
|
|
||||||
|
meow_orig = 'méow'
|
||||||
|
meow_nfc_ = unicodedata.normalize('NFC', meow_orig)
|
||||||
|
meow_nfd_ = unicodedata.normalize('NFD', meow_orig)
|
||||||
|
meow_nfc = meow_nfc_.encode('utf-8')
|
||||||
|
meow_nfd = meow_nfd_.encode('utf-8')
|
||||||
|
assert meow_nfc != meow_nfd
|
||||||
|
|
||||||
|
EVIL_NARS: list[tuple[str, NarItem]] = [
|
||||||
|
('valid-dir-1', Directory([
|
||||||
|
(b'a-nested', Directory([
|
||||||
|
(b'loopy', Symlink(b'../abc-nested'))
|
||||||
|
])),
|
||||||
|
(b'b-file', Regular(False, b'meow kbity')),
|
||||||
|
(b'c-exe', Regular(True, b'#!/usr/bin/env cat\nmeow kbity')),
|
||||||
|
])),
|
||||||
|
('invalid-slashes-1', Directory([
|
||||||
|
(b'meow', Symlink(b'meowmeow')),
|
||||||
|
(b'meow/nya', Regular(False, b'eepy')),
|
||||||
|
])),
|
||||||
|
('invalid-dot-1', Directory([
|
||||||
|
(b'.', Symlink(b'meowmeow')),
|
||||||
|
])),
|
||||||
|
('invalid-dot-2', Directory([
|
||||||
|
(b'..', Symlink(b'meowmeow')),
|
||||||
|
])),
|
||||||
|
('invalid-nul-1', Directory([
|
||||||
|
(b'meow\0nya', Symlink(b'meowmeow')),
|
||||||
|
])),
|
||||||
|
('invalid-misorder-1', Directory([
|
||||||
|
(b'zzz', Regular(False, b'eepy')),
|
||||||
|
(b'kbity', Regular(False, b'meow')),
|
||||||
|
])),
|
||||||
|
('invalid-dupe-1', Directory([
|
||||||
|
(b'zzz', Regular(False, b'eepy')),
|
||||||
|
(b'zzz', Regular(False, b'meow')),
|
||||||
|
])),
|
||||||
|
('invalid-dupe-2', Directory([
|
||||||
|
(b'zzz', Directory([
|
||||||
|
(b'meow', Regular(False, b'kbity'))
|
||||||
|
])),
|
||||||
|
(b'zzz', Regular(False, b'meow')),
|
||||||
|
])),
|
||||||
|
('invalid-dupe-3', Directory([
|
||||||
|
(b'zzz', Directory([
|
||||||
|
(b'meow', Regular(False, b'kbity'))
|
||||||
|
])),
|
||||||
|
(b'zzz', Directory([
|
||||||
|
(b'meow', Regular(False, b'kbityy'))
|
||||||
|
])),
|
||||||
|
])),
|
||||||
|
('invalid-dupe-4', Directory([
|
||||||
|
(b'zzz', Symlink(b'../kbity')),
|
||||||
|
(b'zzz', Directory([
|
||||||
|
(b'meow', Regular(False, b'kbityy'))
|
||||||
|
])),
|
||||||
|
])),
|
||||||
|
# FIXME: ban casehacked filenames being extracted from NARs
|
||||||
|
# ('invalid-casehack-1', Directory([
|
||||||
|
# (b'ZZZ~nix~case~hack~2', Regular(False, b'meow')),
|
||||||
|
# (b'zzz~nix~case~hack~1', Regular(False, b'eepy')),
|
||||||
|
# ])),
|
||||||
|
# ('invalid-casehack-2', Directory([
|
||||||
|
# (b'ZZZ~nix~case~hack~1', Regular(False, b'meow')),
|
||||||
|
# (b'zzz~nix~case~hack~1', Regular(False, b'eepy')),
|
||||||
|
# ])),
|
||||||
|
]
|
||||||
|
|
||||||
|
@pytest.mark.parametrize(['name', 'nar'], EVIL_NARS)
|
||||||
|
def test_evil_nar(nix: Nix, name: str, nar: NarItem):
|
||||||
|
bio = BytesIO()
|
||||||
|
|
||||||
|
listener = NarListener(bio)
|
||||||
|
write_with_export_header(nar, name.encode(), listener)
|
||||||
|
print(nar)
|
||||||
|
|
||||||
|
if name.startswith('valid-'):
|
||||||
|
expected_rc = 0
|
||||||
|
elif name.startswith('invalid-'):
|
||||||
|
expected_rc = 1
|
||||||
|
else:
|
||||||
|
raise ValueError('bad name', name)
|
||||||
|
|
||||||
|
res = nix.nix_store(['--import']).with_stdin(bio.getvalue()).run().expect(expected_rc)
|
||||||
|
print(res)
|
||||||
|
|
||||||
|
def test_unicode_evil_nar(nix: Nix, tmp_path: Path):
|
||||||
|
"""
|
||||||
|
Depending on the filesystem in use, filenames that are equal modulo unicode
|
||||||
|
normalization may hit the same file or not.
|
||||||
|
|
||||||
|
On macOS, such collisions will result in hitting the same file. We detect
|
||||||
|
if the fs is like this before checking what Lix does.
|
||||||
|
"""
|
||||||
|
with open(os.path.join(bytes(tmp_path), meow_nfc), 'wb') as fh:
|
||||||
|
fh.write(b'meow')
|
||||||
|
|
||||||
|
try:
|
||||||
|
with open(os.path.join(bytes(tmp_path), meow_nfd), 'rb') as fh:
|
||||||
|
assert fh.read() == b'meow'
|
||||||
|
except FileNotFoundError:
|
||||||
|
# normalization is not applied to this system
|
||||||
|
pytest.skip('filesystem does not use unicode normalization')
|
||||||
|
|
||||||
|
test_evil_nar(nix, 'invalid-unicode-normalization-1', Directory([
|
||||||
|
# méow
|
||||||
|
(meow_nfd, Regular(False, b'eepy')),
|
||||||
|
(meow_nfc, Symlink(b'meowmeow')),
|
||||||
|
]))
|
||||||
|
test_evil_nar(nix, 'invalid-unicode-normalization-2', Directory([
|
||||||
|
# méow
|
||||||
|
(meow_nfd, Symlink(b'meowmeow')),
|
||||||
|
(meow_nfc, Regular(False, b'eepy')),
|
||||||
|
]))
|
|
@ -3,6 +3,7 @@ import json
|
||||||
import subprocess
|
import subprocess
|
||||||
from typing import Any
|
from typing import Any
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
|
from functools import partial, partialmethod
|
||||||
import dataclasses
|
import dataclasses
|
||||||
|
|
||||||
|
|
||||||
|
@ -24,6 +25,14 @@ class CommandResult:
|
||||||
output=self.stdout)
|
output=self.stdout)
|
||||||
return self
|
return self
|
||||||
|
|
||||||
|
def expect(self, rc: int):
|
||||||
|
if self.rc != rc:
|
||||||
|
raise subprocess.CalledProcessError(returncode=self.rc,
|
||||||
|
cmd=self.cmd,
|
||||||
|
stderr=self.stderr,
|
||||||
|
output=self.stdout)
|
||||||
|
return self
|
||||||
|
|
||||||
def json(self) -> Any:
|
def json(self) -> Any:
|
||||||
self.ok()
|
self.ok()
|
||||||
return json.loads(self.stdout)
|
return json.loads(self.stdout)
|
||||||
|
@ -33,6 +42,20 @@ class CommandResult:
|
||||||
class NixSettings:
|
class NixSettings:
|
||||||
"""Settings for invoking Nix"""
|
"""Settings for invoking Nix"""
|
||||||
experimental_features: set[str] | None = None
|
experimental_features: set[str] | None = None
|
||||||
|
store: str | None = None
|
||||||
|
"""
|
||||||
|
The store to operate on (may be a path or other thing, see nix help-stores).
|
||||||
|
|
||||||
|
Note that this can be set to the test's store directory if you want to use
|
||||||
|
/nix/store paths inside that test rather than NIX_STORE_DIR renaming
|
||||||
|
/nix/store to some unstable name (assuming that no builds are invoked).
|
||||||
|
"""
|
||||||
|
nix_store_dir: Path | None = None
|
||||||
|
"""
|
||||||
|
Alternative name to use for /nix/store: breaks all references and NAR imports if
|
||||||
|
set, but does allow builds in tests (since builds do not require chroots if
|
||||||
|
the store is relocated).
|
||||||
|
"""
|
||||||
|
|
||||||
def feature(self, *names: str):
|
def feature(self, *names: str):
|
||||||
self.experimental_features = (self.experimental_features
|
self.experimental_features = (self.experimental_features
|
||||||
|
@ -57,8 +80,60 @@ class NixSettings:
|
||||||
config += f'{name} = {serialiser(value)}\n'
|
config += f'{name} = {serialiser(value)}\n'
|
||||||
|
|
||||||
field_may('experimental-features', self.experimental_features)
|
field_may('experimental-features', self.experimental_features)
|
||||||
|
field_may('store', self.store)
|
||||||
|
assert self.store or self.nix_store_dir, 'Failing to set either nix_store_dir or store will cause accidental use of the system store.'
|
||||||
return config
|
return config
|
||||||
|
|
||||||
|
def to_env_overlay(self) -> dict[str, str]:
|
||||||
|
ret = {'NIX_CONFIG': self.to_config()}
|
||||||
|
if self.nix_store_dir:
|
||||||
|
ret['NIX_STORE_DIR'] = str(self.nix_store_dir)
|
||||||
|
return ret
|
||||||
|
|
||||||
|
|
||||||
|
@dataclasses.dataclass
|
||||||
|
class Command:
|
||||||
|
argv: list[str]
|
||||||
|
env: dict[str, str] = dataclasses.field(default_factory=dict)
|
||||||
|
stdin: bytes | None = None
|
||||||
|
cwd: Path | None = None
|
||||||
|
|
||||||
|
def with_env(self, **kwargs) -> 'Command':
|
||||||
|
self.env.update(kwargs)
|
||||||
|
return self
|
||||||
|
|
||||||
|
def with_stdin(self, stdin: bytes) -> 'Command':
|
||||||
|
self.stdin = stdin
|
||||||
|
return self
|
||||||
|
|
||||||
|
def run(self) -> CommandResult:
|
||||||
|
proc = subprocess.Popen(
|
||||||
|
self.argv,
|
||||||
|
stdout=subprocess.PIPE,
|
||||||
|
stderr=subprocess.PIPE,
|
||||||
|
stdin=subprocess.PIPE if self.stdin else subprocess.DEVNULL,
|
||||||
|
cwd=self.cwd,
|
||||||
|
env=self.env,
|
||||||
|
)
|
||||||
|
(stdout, stderr) = proc.communicate(input=self.stdin)
|
||||||
|
rc = proc.returncode
|
||||||
|
return CommandResult(cmd=self.argv,
|
||||||
|
rc=rc,
|
||||||
|
stdout=stdout,
|
||||||
|
stderr=stderr)
|
||||||
|
|
||||||
|
|
||||||
|
@dataclasses.dataclass
|
||||||
|
class NixCommand(Command):
|
||||||
|
settings: NixSettings = dataclasses.field(default_factory=NixSettings)
|
||||||
|
|
||||||
|
def apply_nix_config(self):
|
||||||
|
self.env.update(self.settings.to_env_overlay())
|
||||||
|
|
||||||
|
def run(self) -> CommandResult:
|
||||||
|
self.apply_nix_config()
|
||||||
|
return super().run()
|
||||||
|
|
||||||
|
|
||||||
@dataclasses.dataclass
|
@dataclasses.dataclass
|
||||||
class Nix:
|
class Nix:
|
||||||
|
@ -69,7 +144,6 @@ class Nix:
|
||||||
home = self.test_root / 'test-home'
|
home = self.test_root / 'test-home'
|
||||||
home.mkdir(parents=True, exist_ok=True)
|
home.mkdir(parents=True, exist_ok=True)
|
||||||
return {
|
return {
|
||||||
'NIX_STORE_DIR': self.test_root / 'store',
|
|
||||||
'NIX_LOCALSTATE_DIR': self.test_root / 'var',
|
'NIX_LOCALSTATE_DIR': self.test_root / 'var',
|
||||||
'NIX_LOG_DIR': self.test_root / 'var/log/nix',
|
'NIX_LOG_DIR': self.test_root / 'var/log/nix',
|
||||||
'NIX_STATE_DIR': self.test_root / 'var/nix',
|
'NIX_STATE_DIR': self.test_root / 'var/nix',
|
||||||
|
@ -87,35 +161,48 @@ class Nix:
|
||||||
d.update(self.hermetic_env())
|
d.update(self.hermetic_env())
|
||||||
return d
|
return d
|
||||||
|
|
||||||
def call(self, cmd: list[str], extra_env: dict[str, str] = {}):
|
def cmd(self, argv: list[str]):
|
||||||
"""
|
return Command(argv=argv, cwd=self.test_root, env=self.make_env())
|
||||||
Calls a process in the test environment.
|
|
||||||
"""
|
|
||||||
env = self.make_env()
|
|
||||||
env.update(extra_env)
|
|
||||||
proc = subprocess.Popen(
|
|
||||||
cmd,
|
|
||||||
stdout=subprocess.PIPE,
|
|
||||||
stderr=subprocess.PIPE,
|
|
||||||
cwd=self.test_root,
|
|
||||||
env=env,
|
|
||||||
)
|
|
||||||
(stdout, stderr) = proc.communicate()
|
|
||||||
rc = proc.returncode
|
|
||||||
return CommandResult(cmd=cmd, rc=rc, stdout=stdout, stderr=stderr)
|
|
||||||
|
|
||||||
def nix(self,
|
def settings(self, allow_builds: bool = False):
|
||||||
cmd: list[str],
|
"""
|
||||||
settings: NixSettings = NixSettings(),
|
Parameters:
|
||||||
extra_env: dict[str, str] = {}):
|
- allow_builds: relocate the Nix store so that builds work (however, makes store paths non-reproducible across test runs!)
|
||||||
extra_env = extra_env.copy()
|
"""
|
||||||
extra_env.update({'NIX_CONFIG': settings.to_config()})
|
settings = NixSettings()
|
||||||
return self.call(['nix', *cmd], extra_env)
|
store_path = self.test_root / 'store'
|
||||||
|
if allow_builds:
|
||||||
|
settings.nix_store_dir = store_path
|
||||||
|
else:
|
||||||
|
settings.store = str(store_path)
|
||||||
|
return settings
|
||||||
|
|
||||||
|
def nix_cmd(self, argv: list[str], allow_builds: bool = False):
|
||||||
|
"""
|
||||||
|
Constructs a NixCommand with the appropriate settings.
|
||||||
|
"""
|
||||||
|
return NixCommand(argv=argv,
|
||||||
|
cwd=self.test_root,
|
||||||
|
env=self.make_env(),
|
||||||
|
settings=self.settings())
|
||||||
|
|
||||||
|
def nix(self, cmd: list[str], nix_exe: str = 'nix') -> NixCommand:
|
||||||
|
return self.nix_cmd([nix_exe, *cmd])
|
||||||
|
|
||||||
|
nix_build = partialmethod(nix, nix_exe='nix-build')
|
||||||
|
nix_shell = partialmethod(nix, nix_exe='nix-shell')
|
||||||
|
nix_store = partialmethod(nix, nix_exe='nix-store')
|
||||||
|
nix_env = partialmethod(nix, nix_exe='nix-env')
|
||||||
|
nix_instantiate = partialmethod(nix, nix_exe='nix-instantiate')
|
||||||
|
nix_channel = partialmethod(nix, nix_exe='nix-channel')
|
||||||
|
nix_prefetch_url = partialmethod(nix, nix_exe='nix-prefetch-url')
|
||||||
|
|
||||||
def eval(
|
def eval(
|
||||||
self, expr: str,
|
self, expr: str,
|
||||||
settings: NixSettings = NixSettings()) -> CommandResult:
|
settings: NixSettings | None = None) -> CommandResult:
|
||||||
# clone due to reference-shenanigans
|
# clone due to reference-shenanigans
|
||||||
settings = dataclasses.replace(settings).feature('nix-command')
|
settings = dataclasses.replace(settings or self.settings()).feature('nix-command')
|
||||||
|
|
||||||
return self.nix(['eval', '--json', '--expr', expr], settings=settings)
|
cmd = self.nix(['eval', '--json', '--expr', expr])
|
||||||
|
cmd.settings = settings
|
||||||
|
return cmd.run()
|
||||||
|
|
132
tests/functional2/testlib/nar.py
Normal file
132
tests/functional2/testlib/nar.py
Normal file
|
@ -0,0 +1,132 @@
|
||||||
|
"""
|
||||||
|
See "The Purely Functional Software Deployment Model", fig. 5.2 [1].
|
||||||
|
|
||||||
|
[1]: E. Dolstra, “The purely functional software deployment model,” Ph.D., Universeit Utrecht, Utrecht, NL, 2006. [Online]. Available: https://edolstra.github.io/pubs/phd-thesis.pdf
|
||||||
|
"""
|
||||||
|
|
||||||
|
from abc import ABCMeta, abstractmethod
|
||||||
|
import dataclasses
|
||||||
|
import struct
|
||||||
|
from pathlib import Path
|
||||||
|
from typing import Protocol
|
||||||
|
import unicodedata
|
||||||
|
|
||||||
|
|
||||||
|
class Writable(Protocol):
|
||||||
|
"""Realistically could just be IOBase but this is more constrained"""
|
||||||
|
def write(self, data: bytes, /) -> int: ...
|
||||||
|
|
||||||
|
|
||||||
|
@dataclasses.dataclass
|
||||||
|
class NarListener:
|
||||||
|
data: Writable
|
||||||
|
|
||||||
|
def literal(self, data: bytes):
|
||||||
|
self.data.write(data)
|
||||||
|
|
||||||
|
def int_(self, v: int):
|
||||||
|
self.literal(struct.pack('<Q', v))
|
||||||
|
|
||||||
|
def add_pad(self, data_len: int):
|
||||||
|
npad = 8 - data_len % 8
|
||||||
|
if npad == 8:
|
||||||
|
npad = 0
|
||||||
|
# FIXME: implement nonzero padding
|
||||||
|
self.literal(b'\0' * npad)
|
||||||
|
|
||||||
|
def str_(self, data: bytes):
|
||||||
|
self.int_(len(data))
|
||||||
|
self.literal(data)
|
||||||
|
self.add_pad(len(data))
|
||||||
|
|
||||||
|
|
||||||
|
class NarItem(metaclass=ABCMeta):
|
||||||
|
type_: bytes
|
||||||
|
|
||||||
|
def serialize(self, out: NarListener):
|
||||||
|
out.str_(b'type')
|
||||||
|
out.str_(self.type_)
|
||||||
|
self.serialize_type(out)
|
||||||
|
|
||||||
|
@abstractmethod
|
||||||
|
def serialize_type(self, out: NarListener):
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
@dataclasses.dataclass
|
||||||
|
class Regular(NarItem):
|
||||||
|
executable: bool
|
||||||
|
contents: bytes
|
||||||
|
type_ = b'regular'
|
||||||
|
|
||||||
|
def serialize_type(self, out: NarListener):
|
||||||
|
if self.executable:
|
||||||
|
out.str_(b'executable')
|
||||||
|
out.str_(b'')
|
||||||
|
out.str_(b'contents')
|
||||||
|
out.str_(self.contents)
|
||||||
|
|
||||||
|
|
||||||
|
@dataclasses.dataclass
|
||||||
|
class Directory(NarItem):
|
||||||
|
entries: list[tuple[bytes, NarItem]]
|
||||||
|
"""Entries in the directory, not required to be in order because this nar is evil"""
|
||||||
|
type_ = b'directory'
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def entry(out: NarListener, name: bytes, item: 'NarItem'):
|
||||||
|
# lol this format
|
||||||
|
out.str_(b'entry')
|
||||||
|
out.str_(b'(')
|
||||||
|
out.str_(b'name')
|
||||||
|
out.str_(name)
|
||||||
|
out.str_(b'node')
|
||||||
|
out.str_(b'(')
|
||||||
|
item.serialize(out)
|
||||||
|
out.str_(b')')
|
||||||
|
out.str_(b')')
|
||||||
|
|
||||||
|
def serialize_type(self, out: NarListener):
|
||||||
|
for (name, entry) in self.entries:
|
||||||
|
self.entry(out, name, entry)
|
||||||
|
|
||||||
|
|
||||||
|
@dataclasses.dataclass
|
||||||
|
class Symlink(NarItem):
|
||||||
|
target: bytes
|
||||||
|
type_ = b'symlink'
|
||||||
|
|
||||||
|
def serialize_type(self, out: NarListener):
|
||||||
|
out.str_(b'target')
|
||||||
|
out.str_(self.target)
|
||||||
|
|
||||||
|
|
||||||
|
def serialize_nar(toplevel: NarItem, out: NarListener):
|
||||||
|
out.str_(b'nix-archive-1')
|
||||||
|
out.str_(b'(')
|
||||||
|
toplevel.serialize(out)
|
||||||
|
out.str_(b')')
|
||||||
|
|
||||||
|
|
||||||
|
def write_with_export_header(nar: NarItem, name: bytes, out: NarListener):
|
||||||
|
# n.b. this is *not* actually a nar serialization, it just happens that nix
|
||||||
|
# used exactly the same format for ints and strings in its protocol (and
|
||||||
|
# nix-store --export) as it did in NARs lol
|
||||||
|
EXPORT_MAGIC = 0x4558494e
|
||||||
|
|
||||||
|
# Store::exportPaths
|
||||||
|
# For each path, put 1 then exportPath
|
||||||
|
out.int_(1)
|
||||||
|
|
||||||
|
# Store::exportPath
|
||||||
|
serialize_nar(nar, out)
|
||||||
|
out.int_(EXPORT_MAGIC)
|
||||||
|
out.str_(b'/nix/store/aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa-' + name)
|
||||||
|
# no references
|
||||||
|
out.int_(0)
|
||||||
|
# no deriver
|
||||||
|
out.str_(b'')
|
||||||
|
# end of path
|
||||||
|
out.int_(0)
|
||||||
|
|
||||||
|
out.int_(0)
|
Loading…
Reference in a new issue