Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions archinstall/lib/disk/device_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,13 @@
from archinstall.lib.exceptions import DiskError, SysCallError, UnknownFilesystemFormat
from archinstall.lib.log import debug, error, info, log
from archinstall.lib.models.device import (
DEFAULT_CIPHER,
DEFAULT_ITER_TIME,
BDevice,
BtrfsMountOption,
DeviceModification,
DiskEncryption,
EncryptionCipher,
FilesystemType,
LsblkInfo,
ModificationStatus,
Expand Down Expand Up @@ -280,14 +282,15 @@ def encrypt(
enc_password: Password | None,
lock_after_create: bool = True,
iter_time: int = DEFAULT_ITER_TIME,
cipher: EncryptionCipher = DEFAULT_CIPHER,
) -> Luks2:
luks_handler = Luks2(
dev_path,
mapper_name=mapper_name,
password=enc_password,
)

key_file = luks_handler.encrypt(iter_time=iter_time)
key_file = luks_handler.encrypt(iter_time=iter_time, cipher=cipher)

udev_sync()

Expand Down Expand Up @@ -318,7 +321,7 @@ def format_encrypted(
password=enc_conf.encryption_password,
)

key_file = luks_handler.encrypt(iter_time=enc_conf.iter_time)
key_file = luks_handler.encrypt(iter_time=enc_conf.iter_time, cipher=enc_conf.cipher)

udev_sync()

Expand All @@ -330,6 +333,8 @@ def format_encrypted(
info(f'luks2 formatting mapper dev: {luks_handler.mapper_dev}')
self.format(fs_type, luks_handler.mapper_dev)

udev_sync()

info(f'luks2 locking device: {dev_path}')
luks_handler.lock()

Expand Down
50 changes: 50 additions & 0 deletions archinstall/lib/disk/encryption_menu.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,11 @@
from archinstall.lib.menu.menu_helper import MenuHelper
from archinstall.lib.menu.util import get_password
from archinstall.lib.models.device import (
DEFAULT_CIPHER,
DEFAULT_ITER_TIME,
DeviceModification,
DiskEncryption,
EncryptionCipher,
EncryptionType,
Fido2Device,
LvmConfiguration,
Expand Down Expand Up @@ -64,6 +66,14 @@ def _define_menu_options(self) -> list[MenuItem]:
preview_action=self._prev_password,
key='encryption_password',
),
MenuItem(
text=tr('Encryption cipher'),
action=self._select_cipher,
value=self._enc_config.cipher,
dependencies=[self._check_dep_enc_type],
preview_action=self._prev_cipher,
key='cipher',
),
MenuItem(
text=tr('Iteration time'),
action=select_iteration_time,
Expand Down Expand Up @@ -103,6 +113,9 @@ async def _select_lvm_vols(self, preset: list[LvmVolume]) -> list[LvmVolume]:
return await select_lvm_vols_to_encrypt(self._lvm_config, preset=preset)
return []

async def _select_cipher(self, preset: EncryptionCipher | None) -> EncryptionCipher | None:
return await select_encryption_cipher(preset)

def _check_dep_enc_type(self) -> bool:
enc_type: EncryptionType | None = self._item_group.find_by_key('encryption_type').value
if enc_type and enc_type != EncryptionType.NO_ENCRYPTION:
Expand All @@ -129,6 +142,7 @@ async def show(self) -> DiskEncryption | None:

enc_type: EncryptionType | None = self._item_group.find_by_key('encryption_type').value
enc_password: Password | None = self._item_group.find_by_key('encryption_password').value
cipher: EncryptionCipher | None = self._item_group.find_by_key('cipher').value
iter_time: int | None = self._item_group.find_by_key('iter_time').value
enc_partitions = self._item_group.find_by_key('partitions').value
enc_lvm_vols = self._item_group.find_by_key('lvm_volumes').value
Expand All @@ -151,6 +165,7 @@ async def show(self) -> DiskEncryption | None:
lvm_volumes=enc_lvm_vols,
hsm_device=enc_config.hsm_device,
iter_time=iter_time or DEFAULT_ITER_TIME,
cipher=cipher or DEFAULT_CIPHER,
)

return None
Expand All @@ -164,6 +179,9 @@ def _preview(self, item: MenuItem) -> str | None:
if (enc_pwd := self._prev_password(item)) is not None:
output += f'\n{enc_pwd}'

if (cipher := self._prev_cipher(item)) is not None:
output += f'\n{cipher}'

if (iter_time := self._prev_iter_time(item)) is not None:
output += f'\n{iter_time}'

Expand Down Expand Up @@ -196,6 +214,12 @@ def _prev_password(self, item: MenuItem) -> str | None:

return None

def _prev_cipher(self, item: MenuItem) -> str | None:
cipher: EncryptionCipher | None = self._item_group.find_by_key('cipher').value
if cipher:
return f'{tr("Encryption cipher")}: {cipher.value}'
return None

def _prev_partitions(self, item: MenuItem) -> str | None:
if item.value:
output = tr('Partitions to be encrypted') + '\n'
Expand Down Expand Up @@ -404,3 +428,29 @@ def validate_iter_time(value: str) -> str | None:
return int(result.get_value())
case ResultType.Reset:
return None


async def select_encryption_cipher(preset: EncryptionCipher | None = None) -> EncryptionCipher | None:
options = list(EncryptionCipher)

if not preset:
preset = DEFAULT_CIPHER

items = [MenuItem(o.value, value=o) for o in options]
group = MenuItemGroup(items)
group.set_focus_by_value(preset)

result = await Selection[EncryptionCipher](
group,
header=tr('Select encryption cipher'),
allow_skip=True,
allow_reset=True,
).show()

match result.type_:
case ResultType.Reset:
return None
case ResultType.Skip:
return preset
case ResultType.Selection:
return result.get_value()
66 changes: 60 additions & 6 deletions archinstall/lib/disk/luks.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from archinstall.lib.disk.utils import get_lsblk_info, umount
from archinstall.lib.exceptions import DiskError, SysCallError
from archinstall.lib.log import debug, info
from archinstall.lib.models.device import DEFAULT_ITER_TIME
from archinstall.lib.models.device import DEFAULT_CIPHER, DEFAULT_ITER_TIME, EncryptionCipher
from archinstall.lib.models.users import Password
from archinstall.lib.utils.util import generate_password

Expand Down Expand Up @@ -69,10 +69,10 @@ def _get_passphrase_args(

def encrypt(
self,
key_size: int = 512,
hash_type: str = 'sha512',
iter_time: int = DEFAULT_ITER_TIME,
key_file: Path | None = None,
cipher: EncryptionCipher = DEFAULT_CIPHER,
) -> Path | None:
debug(f'Luks2 encrypting: {self.luks_dev_path}')

Expand All @@ -86,10 +86,15 @@ def encrypt(
'luks2',
'--pbkdf',
'argon2id',
'--cipher',
cipher.value,
'--hash',
hash_type,
'--key-size',
str(key_size),
str(cipher.key_size),
]

cmd += [
'--iter-time',
str(iter_time),
*key_file_arg,
Expand Down Expand Up @@ -137,6 +142,18 @@ def unlock(self, key_file: Path | None = None) -> None:
if not self.mapper_name:
raise ValueError('mapper name missing')

# If a mapper device with this name already exists (e.g. left over from a
# previous failed run), close it before trying to open a new one.
# cryptsetup open returns exit code 5 / "Device already exists" otherwise.
if self.is_unlocked():
debug(f'Mapper {self.mapper_name} already open, closing before re-opening')
try:
SysCommand(f'cryptsetup close {self.mapper_name}')
except SysCallError as close_err:
raise DiskError(
f'Could not close existing mapper "{self.mapper_name}" before unlock: {close_err}'
)

key_file_arg, passphrase = self._get_passphrase_args(key_file)

cmd = [
Expand All @@ -161,6 +178,7 @@ def unlock(self, key_file: Path | None = None) -> None:
raise DiskError(f'Failed to open luks2 device: {self.luks_dev_path}')

def lock(self) -> None:
import time
umount(self.luks_dev_path)

# Get crypt-information about the device by doing a reverse lookup starting with the partition path
Expand All @@ -169,14 +187,50 @@ def lock(self) -> None:

# For each child (sub-partition/sub-device)
for child in lsblk_info.children:
# Unmount the child location
for mountpoint in child.mountpoints:
debug(f'Unmounting {mountpoint}')
umount(mountpoint, recursive=True)
# mountpoint is a directory path, not a block device — umount()
# internally calls get_lsblk_info() which runs lsblk on the path
# and fails with "not a block device". Use run() directly to call
# umount(8) on the directory instead.
try:
run(['umount', '--recursive', str(mountpoint)])
except Exception as e:
debug(f'Could not unmount {mountpoint}: {e}')

# Wait for udev to finish processing events so the kernel drops
# any lingering reference on the mapper device before we close it.
try:
run(['udevadm', 'settle', '--timeout=5'])
except Exception:
pass

# And close it if possible.
debug(f'Closing crypt device {child.name}')
SysCommand(f'cryptsetup close {child.name}')

mapper_dev = Path(f'/dev/mapper/{child.name}')
try:
SysCommand(f'cryptsetup close {child.name}')
except SysCallError as err:
debug(f'cryptsetup close failed ({err}), retrying with --deferred')
try:
SysCommand(f'cryptsetup close --deferred {child.name}')
debug(f'cryptsetup close --deferred issued for {child.name}')
except SysCallError as deferred_err:
raise DiskError(
f'Could not close luks2 device "{child.name}": {deferred_err}'
) from deferred_err

# Wait until the mapper device node actually disappears before returning.
# Subsequent commands (wipefs, mkfs, etc.) will fail with "Device busy"
# if we return while the node still exists.
for _ in range(15):
if not mapper_dev.exists():
break
debug(f'Waiting for {mapper_dev} to disappear...')
time.sleep(1)
else:
raise DiskError(f'Mapper device {mapper_dev} did not disappear after close')

def create_keyfile(self, target_path: Path, override: bool = False) -> None:
"""
Expand Down
41 changes: 41 additions & 0 deletions archinstall/lib/models/device.py
Original file line number Diff line number Diff line change
Expand Up @@ -1462,12 +1462,45 @@ def type_to_text(self) -> str:
return type_to_text[self]


class EncryptionCipher(Enum):
# None passed to cryptsetup means its built-in default (aes-xts-plain64).
# Adiantum is for CPUs without AES acceleration. It is a composite mode:
# spec must name both the stream cipher and block cipher
# (xchacha12,aes) or the kernel rejects it as unsupported.
AES_XTS_PLAIN64 = 'aes-xts-plain64'
# xchacha12 = faster (Android default), xchacha20 = wider margin.
ADIANTUM_XCHACHA12_PLAIN64 = 'xchacha12,aes-adiantum-plain64'
ADIANTUM_XCHACHA20_PLAIN64 = 'xchacha20,aes-adiantum-plain64'
# AES finalist, conservative margin (32 rounds), bitslices well
# on AVX2 despite no dedicated hw acceleration.
SERPENT_XTS_PLAIN64 = 'serpent-xts-plain64'
# Wide-block AES mode (AES-NI accelerated), single 256-bit key.
AES_HCTR2_PLAIN64 = 'aes-hctr2-plain64'
# Non-NIST standard (ISO/NESSIE/CRYPTREC), AVX2 accelerated.
CAMELLIA_XTS_PLAIN64 = 'camellia-xts-plain64'
# Legacy CBC mode — weaker than XTS against watermarking attacks,
# slower due to per-sector ESSIV/SHA256. Included for compatibility.
AES_CBC_ESSIV_SHA256 = 'aes-cbc-essiv:sha256'

@property
def key_size(self) -> int:
# XTS uses two keys, so 512 bits => 256-bit cipher. Adiantum
# and HCTR2 use a single 256-bit key; 512 makes cryptsetup fail.
if '-xts-' in self.value:
return 512
return 256


DEFAULT_CIPHER = EncryptionCipher.AES_XTS_PLAIN64


class _DiskEncryptionSerialization(TypedDict):
encryption_type: str
partitions: list[str]
lvm_volumes: list[str]
hsm_device: NotRequired[_Fido2DeviceSerialization]
iter_time: NotRequired[int]
cipher: NotRequired[str]


@dataclass
Expand All @@ -1478,6 +1511,7 @@ class DiskEncryption:
lvm_volumes: list[LvmVolume] = field(default_factory=list)
hsm_device: Fido2Device | None = None
iter_time: int = DEFAULT_ITER_TIME
cipher: EncryptionCipher = DEFAULT_CIPHER

def __post_init__(self) -> None:
if self.encryption_type in [EncryptionType.LUKS, EncryptionType.LVM_ON_LUKS] and not self.partitions:
Expand Down Expand Up @@ -1505,6 +1539,9 @@ def json(self) -> _DiskEncryptionSerialization:
if self.iter_time != DEFAULT_ITER_TIME: # Only include if not default
obj['iter_time'] = self.iter_time

if self.cipher != DEFAULT_CIPHER: # Only include if not default
obj['cipher'] = self.cipher.value

return obj

@staticmethod
Expand Down Expand Up @@ -1549,11 +1586,15 @@ def parse_arg(
if vol.obj_id in disk_encryption.get('lvm_volumes', []):
volumes.append(vol)

cipher_str = disk_encryption.get('cipher', None)
cipher = EncryptionCipher(cipher_str) if cipher_str else DEFAULT_CIPHER

enc = cls(
EncryptionType(disk_encryption['encryption_type']),
password,
enc_partitions,
volumes,
cipher=cipher,
)

if hsm := disk_encryption.get('hsm_device', None):
Expand Down