"""Strategy to boot logic-only Xilinx FPGAs (Virtex/Artix/Kintex) with Microblaze."""
import enum
import os
import time
import attr
from labgrid.factory import target_factory
from labgrid.step import step
from labgrid.strategy import Strategy, StrategyError, never_retry
[docs]
class Status(enum.Enum):
"""Boot strategy state machine states for Virtex/Artix FPGA boot.
Attributes:
unknown: Initial state before any operations.
powered_off: FPGA is powered off.
powered_on: FPGA is powered on but not configured.
bitstream_flashed: FPGA bitstream has been flashed via JTAG.
kernel_downloaded: Linux kernel has been downloaded to Microblaze.
booting: Kernel execution has started.
booted: Kernel has booted and is ready for interaction.
shell: Interactive shell session available on Microblaze.
soft_off: Device being shut down gracefully.
"""
unknown = 0
powered_off = 1
powered_on = 2
flash_fpga = 3
booted = 4
fixup_networking = 5
shell = 6
soft_off = 7
[docs]
@target_factory.reg_driver
@attr.s(eq=False)
class BootFabric(Strategy):
"""BootFabric - Strategy to boot logic-only Xilinx FPGAs with Microblaze.
This strategy manages the boot process of logic-only FPGA devices
(Virtex, Artix, Kintex) using JTAG. It handles powering on the device,
flashing the bitstream via JTAG, downloading the Linux kernel, and
providing shell access via serial console.
Boot Sequence:
1. Power on FPGA
2. Flash bitstream via JTAG (configures FPGA fabric and Microblaze processor)
3. Download Linux kernel image via JTAG
4. Start kernel execution
5. Wait for boot completion and verify shell access
6. Fixup networking
7. Provide interactive shell access
Bindings:
power: PowerProtocol (optional) - Power control (on/off)
jtag: XilinxJTAGDriver - JTAG programming driver
shell: ADIShellDriver (optional) - Serial console access
ssh: SSHDriver (optional) - SSH access
Resources:
XilinxDeviceJTAG: JTAG target IDs and file paths
XilinxVivadoTool: Vivado installation path for xsdb
Attributes:
reached_boot_marker (str): String to expect in console when boot complete.
Default: "login:" (standard Linux login prompt).
wait_for_boot_timeout (int): Timeout in seconds to wait for boot marker.
Default: 120 seconds.
verify_iio_device (str, optional): IIO device name to verify after boot.
If specified, driver checks if device is available after booting.
Example: "axi-ad9081-rx-hpc" for AD9081 transceiver.
"""
bindings = {
"power": {"PowerProtocol", None},
"jtag": "XilinxJTAGDriver",
"shell": {"ADIShellDriver", None}, # Optional serial console
"ssh": {"SSHDriver", None}, # Optional SSH access
}
status = attr.ib(default=Status.unknown)
reached_boot_marker = attr.ib(
default="login:",
validator=attr.validators.instance_of(str),
)
wait_for_boot_timeout = attr.ib(
default=120,
validator=attr.validators.instance_of(int),
)
verify_iio_device = attr.ib(
default=None,
validator=attr.validators.optional(attr.validators.instance_of(str)),
)
def _write_uart_log(self, content: bytes) -> str:
"""Write UART output to a local log file and return its absolute path."""
uart_log_filename = os.path.abspath(f"uart_log_{int(time.time())}.txt")
with open(uart_log_filename, "wb") as f:
f.write(content)
self.uart_log_path = uart_log_filename
self.logger.info(f"Wrote log file to {uart_log_filename}")
return uart_log_filename
trigger_dhcp_reset = attr.ib(
default=False,
validator=attr.validators.instance_of(bool),
)
power_off_delay = attr.ib(
default=2,
validator=attr.validators.instance_of(int),
)
boot_log = attr.ib(default="", init=False)
debug_write_boot_log = attr.ib(default=False)
[docs]
def __attrs_post_init__(self):
"""Initialize strategy."""
super().__attrs_post_init__()
self.boot_log = ""
self.uart_log_path = ""
self.logger.info("BootFabric strategy initialized")
[docs]
@never_retry
@step()
def transition(self, status, *, step):
"""Transition the strategy to a new state.
This method manages state transitions for Virtex/Artix FPGA boot via JTAG.
It handles power control, bitstream flashing, kernel downloading, and boot
verification.
Args:
status (Status or str): Target state to transition to. Can be a Status enum
value or its string representation (e.g., "shell", "booted").
step: Labgrid step decorator context (injected automatically).
Raises:
StrategyError: If the transition is invalid or fails.
Example:
>>> strategy.transition("shell") # Boot FPGA and get shell
>>> strategy.transition("soft_off") # Power off FPGA
Note:
State transitions are sequential. Requesting a state that requires
intermediate states will automatically transition through them.
"""
if not isinstance(status, Status):
status = Status[status]
self.logger.info(f"Transitioning to {status} (Current: {self.status})")
if status == Status.unknown:
raise StrategyError(f"Cannot transition to {status}")
elif status == self.status:
step.skip("nothing to do")
return
elif status == Status.powered_off:
# Deactivate shell if active
if self.shell:
self.target.deactivate(self.shell)
# Power off FPGA
if self.power:
self.target.activate(self.power)
self.power.off()
self.logger.info("FPGA power-off command sent")
else:
self.logger.info("Skipping power off (no power resource configured)")
elif status == Status.powered_on:
self.transition(Status.powered_off)
if self.power:
self.target.activate(self.power)
self.logger.info(f"Waiting for power to stabilize ({self.power_off_delay}s)...")
time.sleep(self.power_off_delay)
self.power.on()
self.logger.info("FPGA powered on, waiting for initialization (5s)...")
time.sleep(5) # Wait for power stabilization
else:
self.logger.info("Skipping power on (no power resource configured)")
elif status == Status.flash_fpga:
self.transition(Status.powered_on)
self.logger.info(
"Initializing JTAG and flashing bitstream/kernel (this may take several minutes)..."
)
self.target.activate(self.jtag)
self.jtag.load_bitstream_and_kernel_and_start()
self.logger.info("Bitstream flashed and kernel started via JTAG successfully")
# elif status == Status.bitstream_flashed:
# self.transition(Status.powered_on)
# self.target.activate(self.jtag)
# self.jtag.flash_bitstream()
# self.logger.info("Bitstream flashed via JTAG")
# elif status == Status.kernel_downloaded:
# self.transition(Status.bitstream_flashed)
# self.jtag.download_kernel()
# self.logger.info("Kernel downloaded to Microblaze")
# elif status == Status.booting:
# self.transition(Status.kernel_downloaded)
# self.jtag.start_execution()
# self.jtag.disconnect_jtag()
# self.target.deactivate(self.jtag)
# self.logger.info("Kernel execution started")
elif status == Status.booted:
# self.transition(Status.booting)
self.transition(Status.flash_fpga)
self.boot_log = "" # Reset boot log for this boot
self.uart_log_path = ""
if self.shell:
self.logger.info(
f"Waiting for Linux boot and '{self.reached_boot_marker}' prompt..."
)
self.shell.bypass_login = True
self.target.activate(self.shell)
# Wait for Linux kernel boot
try:
_, before, _, _ = self.shell.console.expect("Linux", timeout=30)
if before:
self.boot_log += before.decode("utf-8", errors="replace")
# Wait for login prompt or marker
_, before, _, _ = self.shell.console.expect(
self.reached_boot_marker, timeout=self.wait_for_boot_timeout
)
if before:
self.boot_log += before.decode("utf-8", errors="replace")
self.shell.bypass_login = False
self.target.deactivate(self.shell)
self._write_uart_log(self.boot_log.encode("utf-8", errors="replace"))
self.logger.info("Microblaze kernel booted successfully")
except Exception as e:
if self.debug_write_boot_log:
partial = self.boot_log.encode("utf-8", errors="replace")
partial += getattr(self.shell.console._expect, "before", b"")
self._write_uart_log(partial)
raise e
else:
raise StrategyError(
"BootFabric cannot verify boot completion without a shell driver"
)
elif status == Status.fixup_networking:
self.transition(Status.booted)
if self.shell:
if self.trigger_dhcp_reset:
self.logger.info("Fixing up networking (triggering DHCP reset)...")
self.target.activate(self.shell)
self.shell.run("ifconfig eth0 down")
self.shell.run("ifconfig eth0 up")
self.shell.run("udhcpc -i eth0")
time.sleep(2)
# Update the IP address in the target configuration
addresses = self.shell.get_ip_addresses()
if addresses:
ip_address = str(addresses[0].ip)
if self.ssh:
self.ssh.networkservice.address = ip_address
self.logger.info(f"Networking fixed up, IP: {ip_address}")
else:
self.logger.warning("Could not obtain IP address")
# Shell intentionally left active so Status.shell inherits _status=1
# and skips _await_login(), avoiding a 60s timeout from post-DHCP
# UART output flooding the console.
else:
raise StrategyError("Networking fixup requested but no shell driver configured")
elif status == Status.shell:
self.transition(Status.fixup_networking)
if self.shell:
self.logger.info("Preparing interactive shell...")
self.target.activate(self.shell)
# Optional: Verify IIO device if configured
if self.verify_iio_device:
self._verify_iio_device()
self.logger.info("Shell access ready")
else:
raise StrategyError("Shell access requested but no shell driver configured")
elif status == Status.soft_off:
# Graceful shutdown if shell available
if self.shell:
try:
self.target.activate(self.shell)
self.shell.run("poweroff")
self.shell.console.expect("Power down", timeout=30)
self.target.deactivate(self.shell)
time.sleep(10)
except Exception as e:
self.logger.warning(f"Graceful shutdown failed: {e}")
self.target.deactivate(self.shell)
# Hard power off
self.transition(Status.powered_off)
if self.power:
self.logger.info("FPGA shut down")
else:
raise StrategyError(f"No transition found from {self.status} to {status}")
self.status = status
def _verify_iio_device(self):
"""Verify IIO device is available after boot.
Polls the IIO device for up to 30 seconds, checking if the device
becomes available after boot.
Raises:
StrategyError: If device is not found within timeout.
"""
self.logger.info(f"Verifying IIO device: {self.verify_iio_device}")
for _attempt in range(30):
stdout, stderr, returncode = self.shell.run(
f"iio_attr -d {self.verify_iio_device}",
timeout=5,
)
if returncode == 0 and "could not find" not in stdout:
self.logger.info(f"IIO device {self.verify_iio_device} found and ready")
return
time.sleep(1)
raise StrategyError(f"IIO device {self.verify_iio_device} not found after boot")