Source code for adi_lg_plugins.drivers.massstoragedriver
import os
import shutil
import subprocess
import time
import attr
from labgrid.driver.common import Driver
from labgrid.factory import target_factory
from ._remote import RemoteExecMixin
[docs]
@target_factory.reg_driver
@attr.s(eq=False)
class MassStorageDriver(RemoteExecMixin, Driver):
"""Mount and copy files to a USB mass storage device.
Supports both local-only (test runner == exporter) and remote-exporter
bindings. When the bound resource is proxied from a coordinator,
pmount/pumount/mkdir run on the exporter host and file copies are staged
there over a single reused ssh connection (see :class:`RemoteExecMixin`).
Specify `partition` when the bound resource points at a whole block
device (e.g. /dev/sdb) rather than a specific partition; its value
is the absolute partition path on the exporter host — a raw device
(/dev/sdb1) or a stable symlink (/dev/disk/by-partuuid/...).
"""
bindings = {
"mass_storage": {"MassStorageDevice"},
}
# RemoteExecMixin: the resource that locates the exporter host.
_remote_binding = "mass_storage"
partition = attr.ib(
default=None,
validator=attr.validators.optional(attr.validators.instance_of(str)),
)
mount_label = attr.ib(default="lg_mass_storage")
def __attrs_post_init__(self):
super().__attrs_post_init__()
self.mounted = False
def __del__(self):
try:
self.unmount_partition()
except Exception:
pass
def _device_path(self):
return self.partition or self.mass_storage.path
def _mount_dir(self):
return f"/media/{self.mount_label}"
def _path_exists(self, path):
return self._remote_run(["test", "-e", path]).returncode == 0
def _is_mountpoint(self, path):
return self._remote_run(["mountpoint", "-q", path]).returncode == 0
[docs]
def mount_partition(self):
"""Mount the configured partition at /media/<mount_label>."""
if self.mounted:
self.logger.debug("Already mounted; skipping.")
return
mnt = self._mount_dir()
if self._is_mountpoint(mnt):
self.logger.debug(f"{mnt} already mounted; treating as mounted.")
self.mounted = True
return
device_path = self._device_path()
if not self._path_exists(device_path):
raise RuntimeError(f"Mass storage device path {device_path} does not exist.")
try:
self._remote_check(["pmount", device_path, self.mount_label])
except subprocess.CalledProcessError as e:
self.logger.error(f"Failed to mount {device_path}: {e}")
raise
time.sleep(2)
if not self._is_mountpoint(mnt):
raise RuntimeError(f"Mounting {device_path} failed; {mnt} is not a mount point.")
self.logger.debug(f"Mounted {device_path} at {mnt}")
self.mounted = True
[docs]
def unmount_partition(self):
"""Unmount the mass storage device partition."""
if not self.mounted:
return
mnt = self._mount_dir()
self._remote_run(["sync"], check=False)
try:
self._remote_check(["pumount", self.mount_label])
except subprocess.CalledProcessError as e:
self.logger.error(f"Failed to unmount {self.mount_label}: {e}")
raise
if self._is_mountpoint(mnt):
raise RuntimeError(f"Unmount failed; {mnt} is still a mount point.")
self.mounted = False
[docs]
def copy_file(self, src, dst):
"""Copy a local file onto the mass storage device.
Args:
src: source file path on the test runner host.
dst: destination path relative to the mount point.
"""
if not self.mounted:
raise RuntimeError("Mass storage device is not mounted. Cannot copy file.")
if not os.path.exists(src):
raise FileNotFoundError(f"Source file {src} does not exist.")
full_dst = os.path.join(self._mount_dir(), dst.lstrip("/"))
dst_dir = os.path.dirname(full_dst)
self._remote_check(["mkdir", "-p", dst_dir])
if self._is_remote:
self._remote_put(src, full_dst)
else:
shutil.copy(src, full_dst)
self.logger.info(f"Copied {src} to {full_dst}")
[docs]
def update_files(self):
"""Batch-copy files listed in mass_storage.file_updates (local-only path mapping)."""
if not self.mounted:
raise RuntimeError("Mass storage device is not mounted. Cannot update files.")
for src, dst in self.mass_storage.file_updates.items():
self.copy_file(src, dst)