"""Strategy to boot SelMap based dual FPGA design."""
import enum
import os
import time
import attr
from labgrid.factory import target_factory
from labgrid.step import step
from labgrid.strategy import Strategy, StrategyError, never_retry
[docs]
class Status(enum.Enum):
"""Boot strategy state machine states for dual FPGA SelMap boot.
Attributes:
unknown: Initial state before any operations.
powered_off: Both FPGAs are powered off.
booting_zynq: Primary Zynq FPGA is booting.
booted_zynq: Zynq FPGA has booted Linux successfully.
update_zynq_boot_files: Updating Zynq boot files before booting.
update_virtex_boot_files: Updating Virtex bitstream files.
trigger_selmap_boot: Triggering SelMap boot of secondary Virtex FPGA.
wait_for_virtex_boot: Waiting for Virtex FPGA boot to complete.
booted_virtex: Secondary Virtex FPGA has booted successfully.
shell: Interactive shell session available on Zynq.
soft_off: Device being shut down gracefully.
"""
unknown = 0
powered_off = 1
booting_zynq = 2
booted_zynq = 3
update_zynq_boot_files = 4
update_virtex_boot_files = 5
trigger_selmap_boot = 6
wait_for_virtex_boot = 7
booted_virtex = 8
shell = 9
soft_off = 10
[docs]
@target_factory.reg_driver
@attr.s(eq=False)
class BootSelMap(Strategy):
"""BootSelMap - Strategy to boot SelMap based dual FPGA design.
This strategy does not replace the kernel. It focuses on booting the secondary
FPGA via the SelMap interface after the primary FPGA has booted Linux.
"""
bindings = {
"power": "PowerProtocol",
"shell": "ADIShellDriver",
"ssh": "SSHDriver",
# "sdmux": "USBSDMuxDriver",
# 'mass_storage': 'MassStorageDriver',
}
status = attr.ib(default=Status.unknown)
reached_linux_marker = attr.ib(default="analog")
ethernet_interface = attr.ib(default=None)
iio_jesd_driver_name = attr.ib(default="axi-ad9081-rx-hpc")
iio_jesd_data_mode = attr.ib(default="DATA")
iio_jesd_link_mode_attr = attr.ib(default="jesd204_link_mode")
pre_boot_boot_files = attr.ib(default=None)
post_boot_boot_files = attr.ib(default=None)
boot_log = attr.ib(default="", init=False)
def __attrs_post_init__(self):
super().__attrs_post_init__()
self._copied_pre_boot_files = False
self._copied_post_boot_files = False
[docs]
@never_retry
@step()
def transition(self, status, *, step):
"""Transition the strategy to a new state.
This method manages state transitions for dual FPGA SelMap boot. It handles
booting the primary Zynq FPGA, updating boot files for both FPGAs, and
triggering the SelMap boot of the secondary Virtex FPGA.
Args:
status (Status or str): Target state to transition to. Can be a Status enum
value or its string representation (e.g., "shell", "booted_virtex").
step: Labgrid step decorator context (injected automatically).
Raises:
StrategyError: If the transition is invalid or fails.
Example:
>>> strategy.transition("booted_zynq") # Boot primary Zynq FPGA
>>> strategy.transition("trigger_selmap_boot") # Boot secondary Virtex FPGA
>>> strategy.transition("shell") # Get shell access
Note:
This strategy manages a complex dual-FPGA system where the primary
Zynq FPGA boots Linux and then triggers the secondary Virtex FPGA
boot via the SelMap interface.
"""
if not isinstance(status, Status):
status = Status[status]
self.logger.info(f"Transitioning to {status} (Existing status: {self.status})")
if status == Status.unknown:
raise StrategyError(f"can not transition to {status}")
elif status == self.status:
step.skip("nothing to do")
return # nothing to do
elif status == Status.powered_off:
self.target.deactivate(self.shell)
self.target.activate(self.power)
self.power.off()
self.logger.info("System powered off")
elif status == Status.booting_zynq:
self.transition(Status.powered_off)
self.target.activate(self.power)
self.logger.info("Powering on Zynq (primary FPGA)...")
time.sleep(5)
self.power.on()
self.logger.info("Zynq powered on, booting Linux...")
elif status == Status.booted_zynq:
self.transition(Status.booting_zynq)
self.boot_log = "" # Reset boot log for this boot
self.shell.bypass_login = True
self.target.activate(self.shell)
# Check kernel start
self.logger.info(f"Waiting for Linux boot and '{self.reached_linux_marker}' prompt...")
_, before, _, _ = self.shell.console.expect("Linux", timeout=30)
if before:
self.boot_log += before.decode("utf-8", errors="replace")
# Check device prompt
_, before, _, _ = self.shell.console.expect(self.reached_linux_marker, timeout=30)
if before:
self.boot_log += before.decode("utf-8", errors="replace")
self.shell.bypass_login = False
self.target.deactivate(self.shell)
time.sleep(5)
self.logger.info("Zynq (primary FPGA) booted successfully")
elif status == Status.update_zynq_boot_files:
self.transition(Status.booted_zynq)
self.logger.info("Updating Zynq boot files via SSH...")
self.target.activate(self.shell)
address = self.shell.get_ip_addresses(self.ethernet_interface)
assert address, f"No IP address found on {self.ethernet_interface}"
ip = str(address[0].ip)
self.target.deactivate(self.shell)
# Check the same as SSHDriver
if self.ssh.networkservice.address == ip:
self.logger.info(f"Syncing SSHDriver IP to {ip}")
self.ssh.networkservice.address = ip
if not self._copied_pre_boot_files:
if self.pre_boot_boot_files:
self.target.activate(self.ssh)
for local_path, remote_path in self.pre_boot_boot_files.items():
if os.path.isfile(local_path) is False:
raise StrategyError(f"Local boot file {local_path} does not exist")
folder_in_boot_path = "/".join(remote_path.split("/")[:-1])
if folder_in_boot_path and folder_in_boot_path != "/boot":
self.ssh.run(f"mkdir -p {folder_in_boot_path}")
self.logger.info(
f"Uploading Zynq boot file {local_path} to {remote_path}..."
)
self.ssh.put(local_path, remote_path)
self.target.deactivate(self.ssh)
self._copied_pre_boot_files = True
# Restart to apply new boot files
self.logger.info("Restarting Zynq to apply new boot files...")
self.transition(Status.powered_off)
self.transition(Status.booting_zynq)
self.transition(Status.booted_zynq)
self.status = Status.powered_off
return # Exit here to restart the boot process
self.logger.info("Zynq boot files updated successfully")
elif status == Status.update_virtex_boot_files:
self.transition(Status.update_zynq_boot_files)
self.logger.info("Updating Virtex (secondary FPGA) bitstream files...")
self.target.activate(self.shell)
address = self.shell.get_ip_addresses(self.ethernet_interface)
assert address, f"No IP address found on {self.ethernet_interface}"
ip = str(address[0].ip)
self.target.deactivate(self.shell)
# Check the same as SSHDriver
if self.ssh.networkservice.address == ip:
self.ssh.networkservice.address = ip
if not self._copied_post_boot_files:
if self.post_boot_boot_files:
self.target.activate(self.ssh)
for local_path, remote_path in self.post_boot_boot_files.items():
if os.path.isfile(local_path) is False:
raise StrategyError(f"Local boot file {local_path} does not exist")
folder_in_boot_path = "/".join(remote_path.split("/")[:-1])
if folder_in_boot_path and folder_in_boot_path != "/boot":
self.ssh.run(f"mkdir -p {folder_in_boot_path}")
self.logger.info(
f"Uploading Virtex boot file {local_path} to {remote_path}..."
)
self.ssh.put(local_path, remote_path)
self.target.deactivate(self.ssh)
self._copied_post_boot_files = True
self.logger.info("Virtex boot files updated successfully")
elif status == Status.trigger_selmap_boot:
self.transition(Status.update_virtex_boot_files)
self.logger.info("Triggering SelMap boot for secondary Virtex FPGA...")
self.target.activate(self.ssh)
self.ssh.run("cd /boot/ci && ./selmap_dtbo.sh -d vu11p.dtbo -b vu11p.bin")
self.target.deactivate(self.ssh)
self.logger.info("SelMap boot trigger script executed")
elif status == Status.wait_for_virtex_boot:
self.transition(Status.trigger_selmap_boot)
self.shell.bypass_login = True
self.target.activate(self.shell)
# Check for device to register
found_device = False
self.logger.info(
f"Waiting for IIO JESD device ({self.iio_jesd_driver_name}) to appear..."
)
for t in range(30):
stdout, stderr, returncode = self.shell.run(
f"iio_attr -d {self.iio_jesd_driver_name} jesd204_fsm_state", timeout=4
)
if "could not find device" in stdout:
self.logger.info(f"Still waiting for IIO JESD device... ({t + 1}/30)")
else:
self.logger.info(f"IIO JESD device found: {stdout.strip()}")
found_device = True
break
time.sleep(1)
if not found_device:
raise StrategyError(
"Virtex did not boot successfully within timeout (device not found)"
)
jesd_finished = False
self.logger.info("Waiting for JESD FSM to reach post_running_stage...")
data_mode_ready = False
for t in range(120):
stdout, stderr, returncode = self.shell.run(
f"iio_attr -d {self.iio_jesd_driver_name} jesd204_fsm_state", timeout=4
)
if "opt_post_running_stage" in stdout:
if self.check_jesd_links_data_mode():
jesd_finished = True
data_mode_ready = True
self.logger.info(
"JESD FSM reached post_running_stage and links are in DATA mode"
)
break
self.logger.warning(
"JESD FSM reached post_running_stage but links are not DATA yet (%d/120)",
t + 1,
)
else:
if t % 10 == 0:
self.logger.info(f"JESD FSM state: {stdout.strip()} ({t + 1}/120)")
time.sleep(1)
if not jesd_finished:
raise StrategyError("Virtex JESD did not finish successfully within timeout")
if not data_mode_ready:
raise StrategyError("Virtex JESD links are not in DATA mode")
# Restart IIOD
self.logger.info("Restarting IIOD service...")
self.shell.run("systemctl restart iiod.service")
self.shell.bypass_login = False
self.target.deactivate(self.shell)
self.logger.info("Virtex (secondary FPGA) booted successfully")
elif status == Status.shell:
self.transition(Status.wait_for_virtex_boot)
self.logger.info("Preparing interactive shell...")
self.target.activate(self.shell)
# Post boot stuff...
self.logger.info("Shell access ready")
elif status == Status.soft_off:
self.transition(Status.shell)
try:
self.shell.run("poweroff")
self.shell.console.expect("Power down", timeout=30)
self.target.deactivate(self.shell)
time.sleep(10)
except Exception as e:
self.logger.debug(f"DEBUG Soft off failed: {e}")
time.sleep(5)
self.target.deactivate(self.shell)
self.target.activate(self.power)
self.power.off()
self.logger.debug("DEBUG Soft powered off")
else:
raise StrategyError(f"no transition found from {self.status} to {status}")
self.status = status
def _parse_jesd_link_modes(self, output: str) -> list[str]:
"""Parse JESD link mode output from iio_attr."""
modes: list[str] = []
for line in output.splitlines():
line = line.strip()
if not line:
continue
value = line
for sep in (":", "="):
if sep in value:
value = value.split(sep, 1)[1].strip()
break
tokens = [token.strip() for token in value.replace(",", " ").split() if token.strip()]
modes.extend(tokens)
return modes
[docs]
def check_jesd_links_data_mode(self, timeout: int = 4) -> bool:
"""
Check JESD link modes and return True when all linked modes are DATA.
Returns False when no modes are found or if any mode is not DATA.
"""
stdout, _, return_code = self.shell.run(
f"iio_attr -d {self.iio_jesd_driver_name} {self.iio_jesd_link_mode_attr}",
timeout=timeout,
)
if return_code != 0:
return False
link_modes = self._parse_jesd_link_modes(stdout)
if not link_modes:
self.logger.warning("No JESD link modes detected for %s", self.iio_jesd_driver_name)
return False
self.logger.info("JESD link modes: %s", ", ".join(link_modes))
return all(mode.upper() == self.iio_jesd_data_mode for mode in link_modes)