"""Driver to program Xilinx FPGAs via JTAG using xsdb.
Supports both local execution (test runner == exporter) and remote
execution (test runner ssh's to the exporter host to invoke xsdb).
When any sibling resource on the target is a NetworkResource (has a
`host` attribute), xsdb is run there via ssh and TCL scripts are
pushed via scp; otherwise xsdb runs locally.
"""
import os
import subprocess
import tempfile
import time
import attr
from labgrid.driver.common import Driver
from labgrid.driver.exception import ExecutionError
from labgrid.factory import target_factory
from labgrid.step import step
[docs]
@target_factory.reg_driver
@attr.s(eq=False)
class XilinxJTAGDriver(Driver):
"""Program Xilinx FPGAs via JTAG using xsdb.
Bindings:
xilinxdevicejtag: XilinxDeviceJTAG resource (JTAG target IDs + bitstream/kernel paths
as seen by the host that runs xsdb).
xilinxvivado: XilinxVivadoTool resource (vivado_path / xsdb_path).
"""
bindings = {
"xilinxdevicejtag": {"XilinxDeviceJTAG"},
"xilinxvivado": {"XilinxVivadoTool"},
}
def __attrs_post_init__(self):
super().__attrs_post_init__()
self.logger.info("XilinxJTAGDriver initialized")
self.logger.debug(f"xsdb path: {self.xilinxvivado.xsdb_path}")
def _remote_host(self):
"""Return exporter host when sibling resources come from a NetworkResource,
else None (local execution)."""
for r in getattr(self.target, "resources", ()):
host = getattr(r, "host", None)
if host:
return host
return None
def _run_xsdb(self, tcl_script: str):
"""Execute ``tcl_script`` through xsdb, locally or via ssh.
Returns (stdout, stderr, returncode) as strings.
"""
host = self._remote_host()
xsdb = self.xilinxvivado.xsdb_path
with tempfile.NamedTemporaryFile(mode="w", suffix=".tcl", delete=False) as f:
f.write(tcl_script)
local_tcl = f.name
try:
if host is None:
result = subprocess.run(
[xsdb, local_tcl], capture_output=True, text=True, timeout=300
)
return result.stdout, result.stderr, result.returncode
remote_tcl = f"/tmp/lg_xsdb_{os.getpid()}_{int(time.time() * 1000)}.tcl"
try:
subprocess.check_call(["scp", "-q", local_tcl, f"{host}:{remote_tcl}"], timeout=30)
result = subprocess.run(
["ssh", host, xsdb, remote_tcl],
capture_output=True,
text=True,
timeout=300,
)
return result.stdout, result.stderr, result.returncode
finally:
subprocess.call(["ssh", host, "rm", "-f", remote_tcl], timeout=10)
finally:
try:
os.unlink(local_tcl)
except FileNotFoundError:
pass
[docs]
@Driver.check_active
@step()
def connect_jtag(self):
"""Connect to JTAG interface."""
self.logger.info("Connecting to JTAG")
tcl_script = """
connect
after 1000
puts "JTAG connected"
"""
stdout, stderr, returncode = self._run_xsdb(tcl_script)
if returncode != 0:
raise ExecutionError(f"JTAG connection failed: {stderr}")
self.logger.debug(f"JTAG connection output: {stdout}")
[docs]
@Driver.check_active
@step()
def flash_bitstream(self):
"""Flash the FPGA bitstream via JTAG."""
if not self.xilinxdevicejtag.bitstream_path:
raise ExecutionError("Bitstream path not configured in XilinxDeviceJTAG resource")
self.logger.info(f"Flashing bitstream: {self.xilinxdevicejtag.bitstream_path}")
tcl_script = f"""
connect
after 1000
targets {self.xilinxdevicejtag.root_target}
after 1000
fpga -f {self.xilinxdevicejtag.bitstream_path}
after 2000
puts "Bitstream flashed successfully"
"""
stdout, stderr, returncode = self._run_xsdb(tcl_script)
if returncode != 0:
raise ExecutionError(f"Bitstream flash failed: {stderr}")
self.logger.info("Bitstream flashed successfully")
self.logger.debug(f"Flash output: {stdout}")
[docs]
@Driver.check_active
@step()
def download_kernel(self):
"""Download Linux kernel image to Microblaze processor."""
if not self.xilinxdevicejtag.kernel_path:
raise ExecutionError("Kernel path not configured in XilinxDeviceJTAG resource")
self.logger.info(f"Downloading kernel: {self.xilinxdevicejtag.kernel_path}")
tcl_script = f"""
connect
after 1000
targets {self.xilinxdevicejtag.microblaze_target}
after 1000
dow {self.xilinxdevicejtag.kernel_path}
after 1000
puts "Kernel downloaded successfully"
"""
stdout, stderr, returncode = self._run_xsdb(tcl_script)
if returncode != 0:
raise ExecutionError(f"Kernel download failed: {stderr}")
self.logger.info("Kernel downloaded successfully")
self.logger.debug(f"Download output: {stdout}")
[docs]
@Driver.check_active
@step()
def start_execution(self):
"""Start kernel execution on Microblaze processor."""
self.logger.info("Starting kernel execution")
tcl_script = f"""
connect
after 1000
targets {self.xilinxdevicejtag.microblaze_target}
after 1000
con
after 500
puts "Kernel execution started"
"""
stdout, stderr, returncode = self._run_xsdb(tcl_script)
if returncode != 0:
raise ExecutionError(f"Kernel execution failed: {stderr}")
self.logger.info("Kernel execution started")
self.logger.debug(f"Execution output: {stdout}")
[docs]
@Driver.check_active
@step()
def load_bitstream_and_kernel_and_start(self):
"""Load bitstream + kernel, then run the Microblaze."""
tcl_script = f"""
connect
after 1000
targets {self.xilinxdevicejtag.root_target}
after 1000
fpga -f {self.xilinxdevicejtag.bitstream_path}
after 2000
targets {self.xilinxdevicejtag.microblaze_target}
after 1000
dow {self.xilinxdevicejtag.kernel_path}
after 1000
con
after 500
puts "System started"
"""
self.logger.debug(f"System start TCL script:\n{tcl_script}")
stdout, stderr, returncode = self._run_xsdb(tcl_script)
if returncode != 0:
raise ExecutionError(f"System start failed: {stderr}")
self.logger.debug(f"System start output: {stdout}")
[docs]
@Driver.check_active
@step()
def disconnect_jtag(self):
"""Disconnect from JTAG interface."""
self.logger.info("Disconnecting from JTAG")
tcl_script = """
disconnect
puts "JTAG disconnected"
"""
stdout, stderr, returncode = self._run_xsdb(tcl_script)
if returncode != 0:
self.logger.warning(f"JTAG disconnect warning: {stderr}")
self.logger.debug(f"JTAG disconnect output: {stdout}")
[docs]
@Driver.check_active
@step()
def load_zynq_uboot(
self,
ps7_init_tcl: str,
uboot_elf: str,
a9_target_name: str = "*Cortex-A9 MPCore #0",
bitstream_path: str | None = None,
fsbl_elf: str | None = None,
) -> None:
"""JTAG-bootstrap U-Boot on a Zynq-7000 device.
The board can be in any boot state — xsdb will ``rst -system`` first
to clear residual DDR/PS state before sourcing the board-specific
``ps7_init.tcl``. Used for SD-card recovery when BootROM cannot load
FSBL from a corrupted card.
The ``a9_target_name`` filter is used instead of an integer target
index because Zynq-7000 xsdb target ordering shifts when the PL is
loaded; the name-pattern form matches Xilinx's generated wrappers
and is stable across Vivado versions.
"""
self.logger.info(f"JTAG-bootstrapping Zynq-7000 U-Boot from {uboot_elf}")
optional_lines = []
if bitstream_path:
optional_lines.append(f"fpga -f {bitstream_path}")
optional_lines.append("after 2000")
optional_lines.append(f"source {ps7_init_tcl}")
optional_lines.append("ps7_init")
optional_lines.append("ps7_post_config")
if fsbl_elf:
optional_lines.append(f"dow {fsbl_elf}")
optional_lines.append("con")
optional_lines.append("after 2000")
optional_lines.append("stop")
optional_lines.append(f"dow {uboot_elf}")
optional_lines.append("con")
optional_block = "\n ".join(optional_lines)
tcl_script = f"""
connect
after 1000
targets -set -filter {{name =~ "{a9_target_name}"}}
after 500
rst -system
after 2000
{optional_block}
puts "U-Boot started via JTAG"
"""
self.logger.debug(f"Zynq U-Boot bootstrap TCL:\n{tcl_script}")
stdout, stderr, returncode = self._run_xsdb(tcl_script)
if returncode != 0:
raise ExecutionError(f"Zynq U-Boot bootstrap failed: {stderr}")
self.logger.info("Zynq U-Boot bootstrap completed")
self.logger.debug(f"Bootstrap output: {stdout}")
[docs]
@Driver.check_active
@step()
def stop_zynq_cpu(self, a9_target_name: str = "*Cortex-A9 MPCore #0") -> None:
"""Halt the A9 #0 core — used between failed bootstrap attempts."""
self.logger.info(f"Stopping Zynq A9 CPU ({a9_target_name})")
tcl_script = f"""
connect
after 500
targets -set -filter {{name =~ "{a9_target_name}"}}
stop
puts "A9 CPU stopped"
"""
stdout, stderr, returncode = self._run_xsdb(tcl_script)
if returncode != 0:
self.logger.warning(f"Stop CPU warning: {stderr}")
self.logger.debug(f"Stop CPU output: {stdout}")