fix(blockdevices): handle nested lsblk

remove-rest
Houkime 2023-10-09 19:22:43 +00:00
parent 9a3800ac7b
commit 9d7857cb3f
2 changed files with 132 additions and 86 deletions

View File

@ -1,4 +1,5 @@
"""Wrapper for block device functions."""
"""A block device API wrapping lsblk"""
from __future__ import annotations
import subprocess
import json
import typing
@ -11,6 +12,7 @@ def get_block_device(device_name):
"""
Return a block device by name.
"""
# TODO: remove the function and related tests: dublicated by singleton
lsblk_output = subprocess.check_output(
[
"lsblk",
@ -43,22 +45,37 @@ class BlockDevice:
A block device.
"""
def __init__(self, block_device):
self.name = block_device["name"]
self.path = block_device["path"]
self.fsavail = str(block_device["fsavail"])
self.fssize = str(block_device["fssize"])
self.fstype = block_device["fstype"]
self.fsused = str(block_device["fsused"])
self.mountpoints = block_device["mountpoints"]
self.label = block_device["label"]
self.uuid = block_device["uuid"]
self.size = str(block_device["size"])
self.model = block_device["model"]
self.serial = block_device["serial"]
self.type = block_device["type"]
def __init__(self, device_dict: dict):
self.update_from_dict(device_dict)
def update_from_dict(self, device_dict: dict):
self.name = device_dict["name"]
self.path = device_dict["path"]
self.fsavail = str(device_dict["fsavail"])
self.fssize = str(device_dict["fssize"])
self.fstype = device_dict["fstype"]
self.fsused = str(device_dict["fsused"])
self.mountpoints = device_dict["mountpoints"]
self.label = device_dict["label"]
self.uuid = device_dict["uuid"]
self.size = str(device_dict["size"])
self.model = device_dict["model"]
self.serial = device_dict["serial"]
self.type = device_dict["type"]
self.locked = False
self.children: typing.List[BlockDevice] = []
if "children" in device_dict.keys():
for child in device_dict["children"]:
self.children.append(BlockDevice(child))
def all_children(self) -> typing.List[BlockDevice]:
result = []
for child in self.children:
result.extend(child.all_children())
result.append(child)
return result
def __str__(self):
return self.name
@ -82,17 +99,7 @@ class BlockDevice:
Update current data and return a dictionary of stats.
"""
device = get_block_device(self.name)
self.fsavail = str(device["fsavail"])
self.fssize = str(device["fssize"])
self.fstype = device["fstype"]
self.fsused = str(device["fsused"])
self.mountpoints = device["mountpoints"]
self.label = device["label"]
self.uuid = device["uuid"]
self.size = str(device["size"])
self.model = device["model"]
self.serial = device["serial"]
self.type = device["type"]
self.update_from_dict(device)
return {
"name": self.name,
@ -110,6 +117,14 @@ class BlockDevice:
"type": self.type,
}
def is_usable_partition(self):
# Ignore devices with type "rom"
if self.type == "rom":
return False
if self.fstype == "ext4":
return True
return False
def resize(self):
"""
Resize the block device.
@ -165,41 +180,16 @@ class BlockDevices(metaclass=SingletonMetaclass):
"""
Update the list of block devices.
"""
devices = []
lsblk_output = subprocess.check_output(
[
"lsblk",
"-J",
"-b",
"-o",
"NAME,PATH,FSAVAIL,FSSIZE,FSTYPE,FSUSED,MOUNTPOINTS,LABEL,UUID,SIZE,MODEL,SERIAL,TYPE",
]
)
lsblk_output = lsblk_output.decode("utf-8")
lsblk_output = json.loads(lsblk_output)
for device in lsblk_output["blockdevices"]:
# Ignore devices with type "rom"
if device["type"] == "rom":
continue
# Ignore iso9660 devices
if device["fstype"] == "iso9660":
continue
if device["fstype"] is None:
if "children" in device:
for child in device["children"]:
if child["fstype"] == "ext4":
device = child
break
devices.append(device)
# Add new devices and delete non-existent devices
devices = BlockDevices.lsblk_devices()
children = []
for device in devices:
if device["name"] not in [
block_device.name for block_device in self.block_devices
]:
self.block_devices.append(BlockDevice(device))
for block_device in self.block_devices:
if block_device.name not in [device["name"] for device in devices]:
self.block_devices.remove(block_device)
children.extend(device.all_children())
devices.extend(children)
valid_devices = [device for device in devices if device.is_usable_partition()]
self.block_devices = valid_devices
def get_block_device(self, name: str) -> typing.Optional[BlockDevice]:
"""
@ -236,3 +226,25 @@ class BlockDevices(metaclass=SingletonMetaclass):
if "/" in block_device.mountpoints:
return block_device
raise RuntimeError("No root block device found")
@staticmethod
def lsblk_device_dicts() -> typing.List[dict]:
lsblk_output_bytes = subprocess.check_output(
[
"lsblk",
"-J",
"-b",
"-o",
"NAME,PATH,FSAVAIL,FSSIZE,FSTYPE,FSUSED,MOUNTPOINTS,LABEL,UUID,SIZE,MODEL,SERIAL,TYPE",
]
)
lsblk_output = lsblk_output_bytes.decode("utf-8")
return json.loads(lsblk_output)["blockdevices"]
@staticmethod
def lsblk_devices() -> typing.List[BlockDevice]:
devices = []
for device in BlockDevices.lsblk_device_dicts():
devices.append(device)
return [BlockDevice(device) for device in devices]

View File

@ -13,6 +13,7 @@ from selfprivacy_api.utils.block_devices import (
resize_block_device,
)
from tests.common import read_json
from tests.test_common import dummy_service, raw_dummy_service
SINGLE_LSBLK_OUTPUT = b"""
{
@ -416,32 +417,37 @@ def lsblk_full_mock(mocker):
def test_get_block_devices(lsblk_full_mock, authorized_client):
block_devices = BlockDevices().get_block_devices()
assert len(block_devices) == 2
assert block_devices[0].name == "sda1"
assert block_devices[0].path == "/dev/sda1"
assert block_devices[0].fsavail == "4605702144"
assert block_devices[0].fssize == "19814920192"
assert block_devices[0].fstype == "ext4"
assert block_devices[0].fsused == "14353719296"
assert block_devices[0].mountpoints == ["/nix/store", "/"]
assert block_devices[0].label is None
assert block_devices[0].uuid == "ec80c004-baec-4a2c-851d-0e1807135511"
assert block_devices[0].size == "20210236928"
assert block_devices[0].model is None
assert block_devices[0].serial is None
assert block_devices[0].type == "part"
assert block_devices[1].name == "sdb"
assert block_devices[1].path == "/dev/sdb"
assert block_devices[1].fsavail == "11888545792"
assert block_devices[1].fssize == "12573614080"
assert block_devices[1].fstype == "ext4"
assert block_devices[1].fsused == "24047616"
assert block_devices[1].mountpoints == ["/volumes/sdb"]
assert block_devices[1].label is None
assert block_devices[1].uuid == "fa9d0026-ee23-4047-b8b1-297ae16fa751"
assert block_devices[1].size == "12884901888"
assert block_devices[1].model == "Volume"
assert block_devices[1].serial == "21378102"
assert block_devices[1].type == "disk"
devices_by_name = {device.name: device for device in block_devices}
sda1 = devices_by_name["sda1"]
sdb = devices_by_name["sdb"]
assert sda1.name == "sda1"
assert sda1.path == "/dev/sda1"
assert sda1.fsavail == "4605702144"
assert sda1.fssize == "19814920192"
assert sda1.fstype == "ext4"
assert sda1.fsused == "14353719296"
assert sda1.mountpoints == ["/nix/store", "/"]
assert sda1.label is None
assert sda1.uuid == "ec80c004-baec-4a2c-851d-0e1807135511"
assert sda1.size == "20210236928"
assert sda1.model is None
assert sda1.serial is None
assert sda1.type == "part"
assert sdb.name == "sdb"
assert sdb.path == "/dev/sdb"
assert sdb.fsavail == "11888545792"
assert sdb.fssize == "12573614080"
assert sdb.fstype == "ext4"
assert sdb.fsused == "24047616"
assert sdb.mountpoints == ["/volumes/sdb"]
assert sdb.label is None
assert sdb.uuid == "fa9d0026-ee23-4047-b8b1-297ae16fa751"
assert sdb.size == "12884901888"
assert sdb.model == "Volume"
assert sdb.serial == "21378102"
assert sdb.type == "disk"
def test_get_block_device(lsblk_full_mock, authorized_client):
@ -506,3 +512,31 @@ def test_get_root_block_device(lsblk_full_mock, authorized_client):
assert block_device.model is None
assert block_device.serial is None
assert block_device.type == "part"
# Unassuming sanity check, yes this did fail
def test_get_real_devices():
block_devices = BlockDevices().get_block_devices()
assert block_devices is not None
assert len(block_devices) > 0
# Unassuming sanity check
def test_get_real_root_device():
BlockDevices().update()
devices = BlockDevices().get_block_devices()
try:
block_device = BlockDevices().get_root_block_device()
except Exception as e:
raise Exception("cannot get root device:", e, "devices found:", devices)
assert block_device is not None
assert block_device.name is not None
assert block_device.name != ""
def test_get_real_root_device_raw(authorized_client):
block_device = BlockDevices().get_root_block_device()
assert block_device is not None
assert block_device.name is not None
assert block_device.name != ""