Source code for adi_lg_plugins.drivers.massstoragedriver

import os
import shutil
import subprocess
import time

import attr

# from labgrid.driver import Driver
from labgrid.driver.common import Driver

# from labgrid.driver.mixin.powerresetmixin import PowerResetMixin
# from labgrid.driver.protocol.powerprotocol import PowerProtocol
# from .powerdriver import PowerResetMixin
# from ..protocol import PowerProtocol
from labgrid.factory import target_factory

# import logging

# from pyvesync import VeSync


[docs] @target_factory.reg_driver @attr.s(eq=False) class MassStorageDriver(Driver): """MassStorageDriver - Driver that manipulates a USB mass storage device. This can be used to copy/remove files from/to the device.""" bindings = {"mass_storage": {"MassStorageDevice"}} def __attrs_post_init__(self): super().__attrs_post_init__() # Verify we have pmount and pumount available on the system for tool in ["pmount", "pumount"]: if not shutil.which(tool): raise RuntimeError( f"{tool} not found in system PATH. Please install it to use MassStorageDriver." ) self.mounted = False def __del__(self): try: self.unmount_partition() except Exception: pass
[docs] def mount_partition(self): """Mount the mass storage device partition to the specified mount point.""" if self.mounted: self.logger.debug("Mass storage device is already mounted, skipping mount.") return if os.path.ismount(os.path.join("/", "media", "lg_mass_storage")): self.logger.debug("Mount point already mounted, skipping mount.") self.mounted = True return device_path = self.mass_storage.path if not os.path.exists(device_path): raise RuntimeError(f"Mass storage device path {device_path} does not exist.") mount_cmd = ["pmount", device_path, "lg_mass_storage"] try: subprocess.run(mount_cmd, check=True) self.logger.debug(f"Mounted {device_path} to lg_mass_storage") except subprocess.CalledProcessError as e: self.logger.error(f"Failed to mount {device_path} to lg_mass_storage: {e}") raise time.sleep(2) # Check if mount was successful loc = os.path.join("/", "media", "lg_mass_storage") if not os.path.exists(loc): raise RuntimeError(f"Mounting {device_path} failed, mount point not found at {loc}.") self.mounted = True
[docs] def unmount_partition(self): """Unmount the mass storage device partition from the specified mount point.""" if not self.mounted: self.logger.debug("Mass storage device is not mounted, skipping unmount.") return if not os.path.exists(os.path.join("/", "media", "lg_mass_storage")): self.logger.debug("Mount point does not exist, skipping unmount.") self.mounted = False return subprocess.run(["sync"]) unmount_cmd = ["pumount", "lg_mass_storage"] try: subprocess.run(unmount_cmd, check=True) self.logger.debug("Unmounted lg_mass_storage successfully") except subprocess.CalledProcessError as e: self.logger.error(f"Failed to unmount lg_mass_storage: {e}") raise if os.path.exists(os.path.join("/", "media", "lg_mass_storage")): raise RuntimeError("Unmounting failed, mount point still exists.") self.mounted = False
[docs] def copy_file(self, src, dst): """Copy a single file to the mass storage device. Args: src: Source file path on the host system dst: Destination path relative to the mass storage mount point """ if not self.mounted: raise RuntimeError("Mass storage device is not mounted. Cannot copy file.") self.logger.info(f"Copying file {dst} on mass storage device from {src}") if not os.path.exists(src): self.logger.error(f"Source file {src} does not exist.") raise FileNotFoundError(f"Source file {src} does not exist.") full_dst_path = os.path.join("/", "media", "lg_mass_storage", dst.lstrip("/")) dst_dir = os.path.dirname(full_dst_path) if not os.path.exists(dst_dir): os.makedirs(dst_dir) shutil.copy(src, full_dst_path) self.logger.info(f"Copied {src} to {full_dst_path}")
[docs] def update_files(self): """Update files on the mass storage device as per file_updates dict.""" if not self.mounted: raise RuntimeError("Mass storage device is not mounted. Cannot update files.") for src, dst in self.mass_storage.file_updates.items(): self.copy_file(src, dst)
# @Driver.check_active # @step() # def on(self): # for outlet in self.outlets: # outlet.turn_on() # self.logger.debug("Powered ON via Vesync outlet") # @Driver.check_active # @step() # def off(self): # for outlet in self.outlets: # outlet.turn_off() # self.logger.debug("Powered OFF via Vesync outlet") # @Driver.check_active # @step() # def reset(self): # self.off() # self.logger.debug( # "Waiting %.1f seconds before powering ON", self.vesync_outlet.delay # ) # time.sleep(self.vesync_outlet.delay) # self.on() # @Driver.check_active # @step() # def cycle(self): # self.off() # time.sleep(self.vesync_outlet.delay) # self.on() # @Driver.check_active # @step() # def get(self): # return all(outlet.is_on for outlet in self.outlets)