Using Drivers

Drivers provide low-level hardware control and protocol implementations. They bind to resources and expose protocols that can be used by strategies or tests.

Overview

Drivers in adi-labgrid-plugins:

  • Control hardware devices via serial, network, or USB interfaces

  • Implement standardized protocols for interoperability (PowerProtocol, CommandProtocol, etc.)

  • Are activated/deactivated explicitly by strategies or tests

  • Provide high-level methods abstracting hardware complexity

The plugin provides six drivers for different hardware control scenarios:

  • Power Drivers: Control device power via smart outlets and PDUs

  • Shell Driver: Execute commands and transfer files via serial console or SSH

  • Storage Drivers: Mount and manage SD card filesystems

  • Kuiper Driver: Download and extract ADI Kuiper Linux releases

Driver Lifecycle

Drivers follow an explicit activation pattern:

# Get driver reference from target
driver = target.get_driver("DriverName")

# Activate driver (calls on_activate(), initializes connections)
target.activate(driver)

# Use driver methods
driver.method()

# Deactivate when done (calls on_deactivate(), closes connections)
target.deactivate(driver)

Or within a strategy context (automatic activation):

# Strategies handle activation/deactivation automatically
strategy = target.get_driver("BootFPGASoC")
strategy.transition("shell")

Network Drivers

TFTPServerDriver

Purpose: Hosting a TFTP server to serve boot files (kernel images, device trees) to targets (e.g. U-Boot).

Required Resource: TFTPServerResource

Bindings: None

Configuration

resources:
  TFTPServerResource:
    address: '10.0.0.1'           # IP address of the host interface
    port: 3069                    # Port to bind (default: 3069)
    root: '/var/lib/tftpboot'     # Root directory for serving files

drivers:
  TFTPServerDriver: {}

Key Parameters

  • address (required): IP address of the interface on the host machine where the TFTP server will bind.

  • port (optional): UDP port to listen on. Defaults to 3069.

  • root (optional): Local directory to serve files from. Defaults to /var/lib/tftpboot.

Port Forwarding (Non-Root Usage)

Standard TFTP uses port 69. However, binding to ports below 1024 requires root privileges. The driver defaults to port 3069 to allow running as a standard user.

Since U-Boot often defaults to port 69, you must configure a port redirection rule on the host machine using iptables to forward traffic from port 69 to 3069.

Setup (Run once on Host):

# Redirect UDP port 69 to 3069
sudo iptables -t nat -A PREROUTING -p udp --dport 69 -j REDIRECT --to-port 3069

Cleanup (To remove the rule):

sudo iptables -t nat -D PREROUTING -p udp --dport 69 -j REDIRECT --to-port 3069

Methods

tftp = target.get_driver("TFTPServerDriver")
target.activate(tftp) # Starts the server thread

# The server runs in the background serving files from the configured 'root' directory.
# It handles RRQ (Read Request) operations compliant with RFC 1350.

target.deactivate(tftp) # Stops the server thread

Usage Example

Used within the BootFPGASoCTFTP strategy to serve boot files:

strategy = target.get_driver("BootFPGASoCTFTP")

# The strategy automatically:
# 1. Activates the TFTPServerDriver
# 2. Configures U-Boot environment (serverip, tftpport)
# 3. Copies necessary files (Image, system.dtb) to the TFTP root
# 4. Triggers the TFTP boot command on the target

strategy.transition("booted")

Troubleshooting

  • Permission Denied (Bind): Ensure you are not trying to bind to port 69 directly without root. Use port 3069 and iptables.

  • Timeout: Check firewall settings (ufw). Ensure the target can ping the host IP.

  • File Not Found: Verify the file exists in the root directory specified in configuration.

Power Control Drivers

VesyncPowerDriver

Purpose: Control devices via VeSync smart outlets (WiFi-based network power switches)

Required Resource: VesyncOutlet

Bindings: Implements PowerProtocol and PowerResetMixin

Configuration

targets:
  test_device:
    resources:
      VesyncOutlet:
        outlet_names: 'My Test Device,Lab Bench Power'
        username: 'your_email@example.com'
        password: 'your_password'
        delay: 5.0  # Seconds to wait between off/on during cycle

    drivers:
      VesyncPowerDriver: {}

Key Parameters

  • outlet_names (required): Comma-separated list of outlet device names as they appear in VeSync mobile app

  • username (required): VeSync account email address

  • password (required): VeSync account password

  • delay (default=5.0): Delay in seconds between power off and on during reset/cycle

Methods

power = target.get_driver("VesyncPowerDriver")
target.activate(power)

# Basic control
power.on()              # Turn on all configured outlets
power.off()             # Turn off all outlets
power.cycle()           # Power cycle: off → wait → on (uses delay parameter)
power.reset()           # Same as cycle()

# Query state
is_on = power.get()     # Returns True if all outlets are on

Usage Examples

Simple power cycling:

power = target.get_driver("VesyncPowerDriver")
target.activate(power)

power.off()
time.sleep(2)
power.on()

target.deactivate(power)

Multiple outlets:

# This config powers three devices together
# outlets: 'Device 1,Device 2,Device 3'

power.on()   # All three outlets turn on
power.off()  # All three outlets turn off

Troubleshooting

  • “Outlet not found”: Verify outlet names match exactly what appears in VeSync app (case-sensitive)

  • “Failed to login”: Check VeSync credentials are correct

  • “No outlets found”: Ensure outlets are added to VeSync account via mobile app first

Notes

  • Requires internet connection and VeSync account

  • Outlets must be preconfigured in VeSync mobile application

  • Supports multiple outlets controlled simultaneously

  • Delay should be tuned based on device power-up requirements

CyberPowerDriver

Purpose: Control devices via CyberPower PDU using SNMP protocol

Required Resource: CyberPowerOutlet

Bindings: Implements PowerProtocol and PowerResetMixin

Configuration

targets:
  lab_device:
    resources:
      CyberPowerOutlet:
        address: '192.168.1.100'    # PDU IP address or hostname
        outlet: 3                    # Outlet number (1-8 for most models)
        delay: 3.0                   # Seconds between off/on

    drivers:
      CyberPowerDriver: {}

Key Parameters

  • address (required): IP address or hostname of the PDU

  • outlet (required): Outlet number to control (typically 1-8, check your PDU model)

  • delay (default=5.0): Delay in seconds for power cycling

Methods

power = target.get_driver("CyberPowerDriver")
target.activate(power)

power.on()      # Turn on outlet
power.off()     # Turn off outlet
power.cycle()   # Power cycle
power.reset()   # Same as cycle()

Usage Example

power = target.get_driver("CyberPowerDriver")
target.activate(power)

print("Powering off device...")
power.off()
time.sleep(1)

print("Powering on device...")
power.on()
time.sleep(5)  # Wait for device to boot

target.deactivate(power)

Supported Models

  • PDU15SWHVIEC8FNET

  • Other CyberPower PDUs with SNMP support (may need adjustments)

Implementation Notes

  • Uses SNMP “private” community string

  • Compatible with both pysnmp < 7.0.0 (async) and >= 7.0.0 (sync) APIs

  • Automatically detects and uses appropriate API version

  • Requires network access to PDU IP address

Troubleshooting

  • Timeout/No response: Check network connectivity to PDU, verify IP address

  • Access denied: Confirm SNMP community string is “private” (standard for CyberPower)

  • Outlet out of range: Verify outlet number (typically 1-8, check your PDU documentation)

Shell and File Transfer Driver

ADIShellDriver

Purpose: Execute commands and transfer files on target device via serial console with optional SSH

Bindings: Implements CommandProtocol, ConsoleProtocol, FileTransferProtocol

Configuration

drivers:
  ADIShellDriver:
    prompt: 'root@analog:.*#'              # Regex matching shell prompt
    login_prompt: 'login:'                 # Regex matching login prompt
    username: 'root'                       # Login username
    password: 'analog'                     # Login password
    login_timeout: 60                      # Seconds to wait for login
    console_ready: 'Press ENTER'           # Optional: marker before login
    await_login_timeout: 5                 # Seconds to detect login requirement
    keyfile: 'keys/id_rsa.pub'             # Optional: SSH key to inject

Key Parameters

  • prompt (required): Regex pattern matching the shell prompt after login

  • login_prompt (required): Regex pattern matching login prompt

  • username (required): Login username

  • password (required): Login password

  • login_timeout (required): Maximum seconds to wait for login completion

  • console_ready (optional): Marker string to wait for before attempting login

  • keyfile (optional): Path to SSH public key to inject into device

Methods

shell = target.get_driver("ADIShellDriver")
target.activate(shell)

# Command execution
output = shell.run("uname -a")                  # Execute command, return output
shell.run("mkdir -p /tmp/test")                 # Command without capture

# File transfer via XMODEM (binary safe)
shell.put("/local/file.bin", "/tmp/file.bin")  # Upload to device
shell.get("/tmp/output.log", "/local/log.txt")  # Download from device

# Advanced
shell.put_bytes(binary_data, "/tmp/data.bin")  # Upload binary data
data = shell.get_bytes("/tmp/data.bin")        # Download as binary

# Query device networking
ips = shell.get_ip_addresses()                 # Returns dict of interfaces

# Deactivate
target.deactivate(shell)

Usage Examples

Basic command execution:

shell = target.get_driver("ADIShellDriver")
target.activate(shell)

# Get kernel version
kernel = shell.run("uname -r").strip()
print(f"Kernel: {kernel}")

# List IIO devices
iio_devices = shell.run("iio_info -s")
print(f"IIO Devices:\n{iio_devices}")

target.deactivate(shell)

File upload and execution:

# Upload script
shell.put("/local/test_script.sh", "/tmp/test.sh")
shell.run("chmod +x /tmp/test.sh")

# Run script and capture output
result = shell.run("/tmp/test.sh")
print(result)

SSH key injection for passwordless access:

# Config with SSH key
drivers:
  ADIShellDriver:
    prompt: 'root@.*#'
    login_prompt: 'login:'
    username: 'root'
    password: 'analog'
    keyfile: 'keys/id_rsa.pub'    # Public key to inject
# After driver activation with keyfile configured:
# 1. Logs in with password
# 2. Creates /root/.ssh directory
# 3. Copies your public key to /root/.ssh/authorized_keys
# 4. Sets correct permissions (700, 600)

# Afterwards, SSH access works without password
ssh root@device

Features

  • Automatic login handling with regex prompt matching

  • XMODEM binary-safe file transfer protocol

  • SSH public key injection for passwordless access

  • Command execution with output capture

  • IP address detection for network interfaces

  • Console ready detection for handling boot prompts

Troubleshooting

  • Timeout during login: Increase login_timeout, check serial connection

  • Wrong prompt regex: Test regex against actual device prompt

  • File transfer hangs: Ensure XMODEM support on device

  • SSH key injection fails: Check file permissions and SSH directory

Notes

  • Requires active serial console connection

  • Uses XMODEM for file transfer (add binary protocol support if needed)

  • Regex patterns are Python regex, test with actual device output

  • Login is automatic on driver activation

Storage Management Drivers

MassStorageDriver

Purpose: Mount USB mass storage devices and manage file updates (typically SD cards via USB mux)

Required Resource: MassStorageDevice

Configuration

resources:
  MassStorageDevice:
    device: '/dev/sdb'              # Block device path
    partition: 1                     # Partition number to mount

drivers:
  MassStorageDriver: {}

Key Parameters

  • device (required): Block device path (e.g., /dev/sdb, /dev/sdc)

  • partition (required): Partition number to mount (typically 1 for boot partition)

Methods

storage = target.get_driver("MassStorageDriver")
target.activate(storage)

# Mount/unmount
storage.mount_partition()              # Mount the configured partition
storage.unmount_partition()            # Unmount partition

# File operations
storage.copy_file("/local/BOOT.BIN", "/BOOT/")  # Copy to device
storage.update_files({
    "/local/BOOT.BIN": "/BOOT.BIN",
    "/local/image.ub": "/image.ub"
})                                      # Copy multiple files

target.deactivate(storage)

Usage Example

Updating boot files on SD card:

# Typically used with USBSDMuxDriver to switch SD to host
storage = target.get_driver("MassStorageDriver")
target.activate(storage)

# Mount SD card partition
storage.mount_partition()

# Copy boot files
storage.copy_file("/local/BOOT.BIN", "/BOOT.BIN")
storage.copy_file("/local/image.ub", "/image.ub")

# Unmount before switching back to device
storage.unmount_partition()

target.deactivate(storage)

Common Workflow

Typically used within BootFPGASoC strategy:

strategy = target.get_driver("BootFPGASoC")

# Strategy handles SD mux switching
strategy.transition("sd_mux_to_host")

# Now storage driver is activated, can mount and copy files
storage = target.get_driver("MassStorageDriver")
storage.mount_partition()
storage.copy_file("/new/BOOT.BIN", "/BOOT.BIN")
storage.unmount_partition()

# Switch back and boot
strategy.transition("sd_mux_to_dut")
strategy.transition("booted")

Important Notes

  • Requires USB SD card mux (usually USBSDMuxDriver) to switch card between host and device

  • Device path may change based on USB enumeration order

  • Consider using udev rules for stable device names

  • Partition number depends on SD card layout (typically 1 for first partition)

Troubleshooting

  • Device not found: Verify device path with lsblk, may need to use different /dev entry

  • Permission denied: Usually requires sudo or running as root

  • Mount fails: Check if partition is already mounted elsewhere

Kuiper Linux Driver

KuiperDLDriver

Purpose: Download ADI Kuiper Linux releases and extract boot files from disk images

Required Resource: KuiperRelease

Configuration

resources:
  KuiperRelease:
    release: '2023_R2_P1'               # Release version
    cache_dir: '/var/cache/kuiper'    # Download cache directory

drivers:
  KuiperDLDriver: {}

Key Parameters

  • release (required): Release version identifier

  • cache_dir (required): Directory for caching downloaded files

Supported Releases

  • ‘2018_R2’

  • ‘2019_R1’

  • ‘2023_R2_P1’

Methods

kuiper = target.get_driver("KuiperDLDriver")
target.activate(kuiper)

# Download and extract boot files
files = kuiper.get_boot_files_from_release()

# Files returned: list of paths to extracted boot files
# Typically: ['/path/to/BOOT.BIN', '/path/to/image.ub', ...]

# Access boot files directly
boot_bin = kuiper._boot_files.get('BOOT.BIN')

target.deactivate(kuiper)

Usage Example

Downloading Kuiper release:

kuiper = target.get_driver("KuiperDLDriver")
target.activate(kuiper)

print("Downloading Kuiper release...")
boot_files = kuiper.get_boot_files_from_release()

print("Boot files available:")
for file in boot_files:
    print(f"  - {file}")

# Use with MassStorageDriver to copy files
storage = target.get_driver("MassStorageDriver")
target.activate(storage)
storage.mount_partition()

for boot_file in boot_files:
    filename = os.path.basename(boot_file)
    storage.copy_file(boot_file, f"/{filename}")

storage.unmount_partition()
target.deactivate(storage)
target.deactivate(kuiper)

Features

  • Automatic download with progress reporting (via tqdm)

  • MD5 checksum verification

  • Automatic extraction of .xz and .zip archives

  • File extraction from disk image without mounting (via pytsk3)

  • Caching to avoid re-downloading

  • Supports multiple Kuiper releases

Implementation Details

  • Extracts files from .img disk images without requiring mount (uses pytsk3 forensic toolkit)

  • Caches downloaded releases locally to avoid re-downloading

  • Verifies checksums against ADI provided values

  • Automatically handles .xz or .zip compression

Troubleshooting

  • Download failed: Check network connectivity and storage space in cache_dir

  • Checksum mismatch: Download may be corrupted, try clearing cache and re-downloading

  • Extraction fails: Ensure pytsk3 is installed with filesystem support

Notes

  • First run downloads the full release (can be large, 100+MB)

  • Subsequent runs use cached version (fast)

  • Boot files extracted to cache_dir automatically

  • Used by BootFPGASoC strategy for automatic Kuiper boot

See Also