Using Drivers¶
Drivers provide low-level hardware control and protocol implementations. They bind to resources and expose protocols that can be used by strategies or tests.
Overview¶
Drivers in adi-labgrid-plugins:
Control hardware devices via serial, network, or USB interfaces
Implement standardized protocols for interoperability (PowerProtocol, CommandProtocol, etc.)
Are activated/deactivated explicitly by strategies or tests
Provide high-level methods abstracting hardware complexity
The plugin provides six drivers for different hardware control scenarios:
Power Drivers: Control device power via smart outlets and PDUs
Shell Driver: Execute commands and transfer files via serial console or SSH
Storage Drivers: Mount and manage SD card filesystems
Kuiper Driver: Download and extract ADI Kuiper Linux releases
Driver Lifecycle¶
Drivers follow an explicit activation pattern:
# Get driver reference from target
driver = target.get_driver("DriverName")
# Activate driver (calls on_activate(), initializes connections)
target.activate(driver)
# Use driver methods
driver.method()
# Deactivate when done (calls on_deactivate(), closes connections)
target.deactivate(driver)
Or within a strategy context (automatic activation):
# Strategies handle activation/deactivation automatically
strategy = target.get_driver("BootFPGASoC")
strategy.transition("shell")
Network Drivers¶
TFTPServerDriver¶
Purpose: Hosting a TFTP server to serve boot files (kernel images, device trees) to targets (e.g. U-Boot).
Required Resource: TFTPServerResource
Bindings: None
Configuration
resources:
TFTPServerResource:
address: '10.0.0.1' # IP address of the host interface
port: 3069 # Port to bind (default: 3069)
root: '/var/lib/tftpboot' # Root directory for serving files
drivers:
TFTPServerDriver: {}
Key Parameters
address (required): IP address of the interface on the host machine where the TFTP server will bind.
port (optional): UDP port to listen on. Defaults to 3069.
root (optional): Local directory to serve files from. Defaults to
/var/lib/tftpboot.
Port Forwarding (Non-Root Usage)
Standard TFTP uses port 69. However, binding to ports below 1024 requires root privileges. The driver defaults to port 3069 to allow running as a standard user.
Since U-Boot often defaults to port 69, you must configure a port redirection rule on the host machine using iptables to forward traffic from port 69 to 3069.
Setup (Run once on Host):
# Redirect UDP port 69 to 3069
sudo iptables -t nat -A PREROUTING -p udp --dport 69 -j REDIRECT --to-port 3069
Cleanup (To remove the rule):
sudo iptables -t nat -D PREROUTING -p udp --dport 69 -j REDIRECT --to-port 3069
Methods
tftp = target.get_driver("TFTPServerDriver")
target.activate(tftp) # Starts the server thread
# The server runs in the background serving files from the configured 'root' directory.
# It handles RRQ (Read Request) operations compliant with RFC 1350.
target.deactivate(tftp) # Stops the server thread
Usage Example
Used within the BootFPGASoCTFTP strategy to serve boot files:
strategy = target.get_driver("BootFPGASoCTFTP")
# The strategy automatically:
# 1. Activates the TFTPServerDriver
# 2. Configures U-Boot environment (serverip, tftpport)
# 3. Copies necessary files (Image, system.dtb) to the TFTP root
# 4. Triggers the TFTP boot command on the target
strategy.transition("booted")
Troubleshooting
Permission Denied (Bind): Ensure you are not trying to bind to port 69 directly without root. Use port 3069 and iptables.
Timeout: Check firewall settings (
ufw). Ensure the target can ping the host IP.File Not Found: Verify the file exists in the
rootdirectory specified in configuration.
Power Control Drivers¶
VesyncPowerDriver¶
Purpose: Control devices via VeSync smart outlets (WiFi-based network power switches)
Required Resource: VesyncOutlet
Bindings: Implements PowerProtocol and PowerResetMixin
Configuration
targets:
test_device:
resources:
VesyncOutlet:
outlet_names: 'My Test Device,Lab Bench Power'
username: 'your_email@example.com'
password: 'your_password'
delay: 5.0 # Seconds to wait between off/on during cycle
drivers:
VesyncPowerDriver: {}
Key Parameters
outlet_names (required): Comma-separated list of outlet device names as they appear in VeSync mobile app
username (required): VeSync account email address
password (required): VeSync account password
delay (default=5.0): Delay in seconds between power off and on during reset/cycle
Methods
power = target.get_driver("VesyncPowerDriver")
target.activate(power)
# Basic control
power.on() # Turn on all configured outlets
power.off() # Turn off all outlets
power.cycle() # Power cycle: off → wait → on (uses delay parameter)
power.reset() # Same as cycle()
# Query state
is_on = power.get() # Returns True if all outlets are on
Usage Examples
Simple power cycling:
power = target.get_driver("VesyncPowerDriver")
target.activate(power)
power.off()
time.sleep(2)
power.on()
target.deactivate(power)
Multiple outlets:
# This config powers three devices together
# outlets: 'Device 1,Device 2,Device 3'
power.on() # All three outlets turn on
power.off() # All three outlets turn off
Troubleshooting
“Outlet not found”: Verify outlet names match exactly what appears in VeSync app (case-sensitive)
“Failed to login”: Check VeSync credentials are correct
“No outlets found”: Ensure outlets are added to VeSync account via mobile app first
Notes
Requires internet connection and VeSync account
Outlets must be preconfigured in VeSync mobile application
Supports multiple outlets controlled simultaneously
Delay should be tuned based on device power-up requirements
CyberPowerDriver¶
Purpose: Control devices via CyberPower PDU using SNMP protocol
Required Resource: CyberPowerOutlet
Bindings: Implements PowerProtocol and PowerResetMixin
Configuration
targets:
lab_device:
resources:
CyberPowerOutlet:
address: '192.168.1.100' # PDU IP address or hostname
outlet: 3 # Outlet number (1-8 for most models)
delay: 3.0 # Seconds between off/on
drivers:
CyberPowerDriver: {}
Key Parameters
address (required): IP address or hostname of the PDU
outlet (required): Outlet number to control (typically 1-8, check your PDU model)
delay (default=5.0): Delay in seconds for power cycling
Methods
power = target.get_driver("CyberPowerDriver")
target.activate(power)
power.on() # Turn on outlet
power.off() # Turn off outlet
power.cycle() # Power cycle
power.reset() # Same as cycle()
Usage Example
power = target.get_driver("CyberPowerDriver")
target.activate(power)
print("Powering off device...")
power.off()
time.sleep(1)
print("Powering on device...")
power.on()
time.sleep(5) # Wait for device to boot
target.deactivate(power)
Supported Models
PDU15SWHVIEC8FNET
Other CyberPower PDUs with SNMP support (may need adjustments)
Implementation Notes
Uses SNMP “private” community string
Compatible with both pysnmp < 7.0.0 (async) and >= 7.0.0 (sync) APIs
Automatically detects and uses appropriate API version
Requires network access to PDU IP address
Troubleshooting
Timeout/No response: Check network connectivity to PDU, verify IP address
Access denied: Confirm SNMP community string is “private” (standard for CyberPower)
Outlet out of range: Verify outlet number (typically 1-8, check your PDU documentation)
Shell and File Transfer Driver¶
ADIShellDriver¶
Purpose: Execute commands and transfer files on target device via serial console with optional SSH
Bindings: Implements CommandProtocol, ConsoleProtocol, FileTransferProtocol
Configuration
drivers:
ADIShellDriver:
prompt: 'root@analog:.*#' # Regex matching shell prompt
login_prompt: 'login:' # Regex matching login prompt
username: 'root' # Login username
password: 'analog' # Login password
login_timeout: 60 # Seconds to wait for login
console_ready: 'Press ENTER' # Optional: marker before login
await_login_timeout: 5 # Seconds to detect login requirement
keyfile: 'keys/id_rsa.pub' # Optional: SSH key to inject
Key Parameters
prompt (required): Regex pattern matching the shell prompt after login
login_prompt (required): Regex pattern matching login prompt
username (required): Login username
password (required): Login password
login_timeout (required): Maximum seconds to wait for login completion
console_ready (optional): Marker string to wait for before attempting login
keyfile (optional): Path to SSH public key to inject into device
Methods
shell = target.get_driver("ADIShellDriver")
target.activate(shell)
# Command execution
output = shell.run("uname -a") # Execute command, return output
shell.run("mkdir -p /tmp/test") # Command without capture
# File transfer via XMODEM (binary safe)
shell.put("/local/file.bin", "/tmp/file.bin") # Upload to device
shell.get("/tmp/output.log", "/local/log.txt") # Download from device
# Advanced
shell.put_bytes(binary_data, "/tmp/data.bin") # Upload binary data
data = shell.get_bytes("/tmp/data.bin") # Download as binary
# Query device networking
ips = shell.get_ip_addresses() # Returns dict of interfaces
# Deactivate
target.deactivate(shell)
Usage Examples
Basic command execution:
shell = target.get_driver("ADIShellDriver")
target.activate(shell)
# Get kernel version
kernel = shell.run("uname -r").strip()
print(f"Kernel: {kernel}")
# List IIO devices
iio_devices = shell.run("iio_info -s")
print(f"IIO Devices:\n{iio_devices}")
target.deactivate(shell)
File upload and execution:
# Upload script
shell.put("/local/test_script.sh", "/tmp/test.sh")
shell.run("chmod +x /tmp/test.sh")
# Run script and capture output
result = shell.run("/tmp/test.sh")
print(result)
SSH key injection for passwordless access:
# Config with SSH key
drivers:
ADIShellDriver:
prompt: 'root@.*#'
login_prompt: 'login:'
username: 'root'
password: 'analog'
keyfile: 'keys/id_rsa.pub' # Public key to inject
# After driver activation with keyfile configured:
# 1. Logs in with password
# 2. Creates /root/.ssh directory
# 3. Copies your public key to /root/.ssh/authorized_keys
# 4. Sets correct permissions (700, 600)
# Afterwards, SSH access works without password
ssh root@device
Features
Automatic login handling with regex prompt matching
XMODEM binary-safe file transfer protocol
SSH public key injection for passwordless access
Command execution with output capture
IP address detection for network interfaces
Console ready detection for handling boot prompts
Troubleshooting
Timeout during login: Increase
login_timeout, check serial connectionWrong prompt regex: Test regex against actual device prompt
File transfer hangs: Ensure XMODEM support on device
SSH key injection fails: Check file permissions and SSH directory
Notes
Requires active serial console connection
Uses XMODEM for file transfer (add binary protocol support if needed)
Regex patterns are Python regex, test with actual device output
Login is automatic on driver activation
Storage Management Drivers¶
MassStorageDriver¶
Purpose: Mount USB mass storage devices and manage file updates (typically SD cards via USB mux)
Required Resource: MassStorageDevice
Configuration
resources:
MassStorageDevice:
device: '/dev/sdb' # Block device path
partition: 1 # Partition number to mount
drivers:
MassStorageDriver: {}
Key Parameters
device (required): Block device path (e.g., /dev/sdb, /dev/sdc)
partition (required): Partition number to mount (typically 1 for boot partition)
Methods
storage = target.get_driver("MassStorageDriver")
target.activate(storage)
# Mount/unmount
storage.mount_partition() # Mount the configured partition
storage.unmount_partition() # Unmount partition
# File operations
storage.copy_file("/local/BOOT.BIN", "/BOOT/") # Copy to device
storage.update_files({
"/local/BOOT.BIN": "/BOOT.BIN",
"/local/image.ub": "/image.ub"
}) # Copy multiple files
target.deactivate(storage)
Usage Example
Updating boot files on SD card:
# Typically used with USBSDMuxDriver to switch SD to host
storage = target.get_driver("MassStorageDriver")
target.activate(storage)
# Mount SD card partition
storage.mount_partition()
# Copy boot files
storage.copy_file("/local/BOOT.BIN", "/BOOT.BIN")
storage.copy_file("/local/image.ub", "/image.ub")
# Unmount before switching back to device
storage.unmount_partition()
target.deactivate(storage)
Common Workflow
Typically used within BootFPGASoC strategy:
strategy = target.get_driver("BootFPGASoC")
# Strategy handles SD mux switching
strategy.transition("sd_mux_to_host")
# Now storage driver is activated, can mount and copy files
storage = target.get_driver("MassStorageDriver")
storage.mount_partition()
storage.copy_file("/new/BOOT.BIN", "/BOOT.BIN")
storage.unmount_partition()
# Switch back and boot
strategy.transition("sd_mux_to_dut")
strategy.transition("booted")
Important Notes
Requires USB SD card mux (usually USBSDMuxDriver) to switch card between host and device
Device path may change based on USB enumeration order
Consider using udev rules for stable device names
Partition number depends on SD card layout (typically 1 for first partition)
Troubleshooting
Device not found: Verify device path with
lsblk, may need to use different /dev entryPermission denied: Usually requires sudo or running as root
Mount fails: Check if partition is already mounted elsewhere
Kuiper Linux Driver¶
KuiperDLDriver¶
Purpose: Download ADI Kuiper Linux releases and extract boot files from disk images
Required Resource: KuiperRelease
Configuration
resources:
KuiperRelease:
release: '2023_R2_P1' # Release version
cache_dir: '/var/cache/kuiper' # Download cache directory
drivers:
KuiperDLDriver: {}
Key Parameters
release (required): Release version identifier
cache_dir (required): Directory for caching downloaded files
Supported Releases
‘2018_R2’
‘2019_R1’
‘2023_R2_P1’
Methods
kuiper = target.get_driver("KuiperDLDriver")
target.activate(kuiper)
# Download and extract boot files
files = kuiper.get_boot_files_from_release()
# Files returned: list of paths to extracted boot files
# Typically: ['/path/to/BOOT.BIN', '/path/to/image.ub', ...]
# Access boot files directly
boot_bin = kuiper._boot_files.get('BOOT.BIN')
target.deactivate(kuiper)
Usage Example
Downloading Kuiper release:
kuiper = target.get_driver("KuiperDLDriver")
target.activate(kuiper)
print("Downloading Kuiper release...")
boot_files = kuiper.get_boot_files_from_release()
print("Boot files available:")
for file in boot_files:
print(f" - {file}")
# Use with MassStorageDriver to copy files
storage = target.get_driver("MassStorageDriver")
target.activate(storage)
storage.mount_partition()
for boot_file in boot_files:
filename = os.path.basename(boot_file)
storage.copy_file(boot_file, f"/{filename}")
storage.unmount_partition()
target.deactivate(storage)
target.deactivate(kuiper)
Features
Automatic download with progress reporting (via tqdm)
MD5 checksum verification
Automatic extraction of .xz and .zip archives
File extraction from disk image without mounting (via pytsk3)
Caching to avoid re-downloading
Supports multiple Kuiper releases
Implementation Details
Extracts files from .img disk images without requiring mount (uses pytsk3 forensic toolkit)
Caches downloaded releases locally to avoid re-downloading
Verifies checksums against ADI provided values
Automatically handles .xz or .zip compression
Troubleshooting
Download failed: Check network connectivity and storage space in cache_dir
Checksum mismatch: Download may be corrupted, try clearing cache and re-downloading
Extraction fails: Ensure pytsk3 is installed with filesystem support
Notes
First run downloads the full release (can be large, 100+MB)
Subsequent runs use cached version (fast)
Boot files extracted to cache_dir automatically
Used by BootFPGASoC strategy for automatic Kuiper boot
See Also¶
Drivers API - Complete driver API reference
Configuring Resources - Resource configuration guide
Working with Strategies - Strategy usage guide
Quick Start - Quick start examples