"""Cloudsmith Downloader Driver for Labgrid.
Resolves and downloads boot artifacts (e.g. ``BOOT.BIN``) from a Cloudsmith
package repository (default ``adi/sdg-boot-partition``), mirroring the
KuiperDLDriver pattern. The downloaded file is exposed to the FPGA SoC boot
strategies through the same ``get_boot_files_from_release()`` /
``_boot_files`` contract used by KuiperDLDriver, so it drops into those
strategies in place of the Kuiper downloader.
The Cloudsmith API resolution logic is ported from a standalone reference
script; the streaming download + retry session is copied from
``kuiperdldriver.py`` (kept independent on purpose).
"""
import hashlib
import json
import logging
import os
import time
import attr
import requests
from labgrid.driver.common import Driver
from labgrid.factory import target_factory
from requests.adapters import HTTPAdapter
from requests.packages.urllib3.util.retry import Retry
from tqdm import tqdm
logger = logging.getLogger(__name__)
CLOUDSMITH_API = "https://api.cloudsmith.io/packages/{owner}/{repo}/"
# Maps a Cloudsmith ``system`` segment prefix to the short FPGA carrier name
# used by the place-tag schema (carrier=zcu102, etc.).
CARRIER_PREFIXES = [
("zynqmp-zcu102-rev10-", "zcu102"),
("zynqmp-adrv9009-zu11eg-revb-", "zu11eg"),
("zynqmp-adrv9009-zu11eg-", "zu11eg"),
("zynqmp-jupiter-sdr", "jupiter-sdr"),
("zynq-zc706-adv7511-", "zc706"),
("zynq-zc702-adv7511-", "zc702"),
("zynq-zed-adv7511-", "zed"),
("zynq-coraz7s-", "coraz7s"),
("zynq-adrv9361-z7035-", "adrv9361-z7035"),
("zynq-adrv9364-z7020-", "adrv9364-z7020"),
("versal-vck190-reva-", "vck190"),
("versal-vpk180-reva-", "vpk180"),
("ad9081_fmca_ebz_vpk180", "vpk180"),
("ad9082_fmca_ebz_vpk180", "vpk180"),
]
# ``system`` segments that are not real carrier builds and must be skipped.
TO_IGNORE = [
"zynq-zed-otg",
"zynq-zed-adv7511",
"pulsar_adc_zed",
"pulsar_adc_zc706",
"pulsar_adc_zc702",
"zynq-zc702-adv7511",
"zynq-zc706-adv7511",
]
[docs]
class Downloader:
"""Streaming downloader with retry + sha256 verification.
Copied from KuiperDLDriver's Downloader (kept independent). Only the
pieces needed for individual-file downloads are retained: a retrying
session and a streaming ``download`` that returns the sha256 digest.
"""
[docs]
def retry_session(
self,
retries=3,
backoff_factor=0.3,
status_forcelist=(429, 500, 502, 504),
session=None,
):
session = session or requests.Session()
retry = Retry(
total=retries,
read=retries,
connect=retries,
backoff_factor=backoff_factor,
status_forcelist=status_forcelist,
)
adapter = HTTPAdapter(max_retries=retry)
session.mount("http://", adapter)
session.mount("https://", adapter)
return session
[docs]
def download(self, url, fname, headers=None):
"""Stream ``url`` to ``fname`` and return the sha256 hexdigest."""
resp = self.retry_session().get(url, stream=True, headers=headers)
if not resp.ok:
raise Exception(os.path.basename(fname) + " - File not found!")
total = int(resp.headers.get("content-length", 0))
sha256_hash = hashlib.sha256()
with (
open(fname, "wb") as file,
tqdm(
desc=fname,
total=total,
unit="iB",
unit_scale=True,
unit_divisor=1024,
) as bar,
):
for data in resp.iter_content(chunk_size=1024):
size = file.write(data)
sha256_hash.update(data)
bar.update(size)
return sha256_hash.hexdigest()
def verify_sha256(fname, expected):
"""Raise if the sha256 of ``fname`` does not match ``expected``."""
h = hashlib.sha256()
with open(fname, "rb") as f:
for chunk in iter(lambda: f.read(1024 * 1024), b""):
h.update(chunk)
actual = h.hexdigest()
if expected and actual != expected:
raise Exception(
f"sha256 mismatch for {os.path.basename(fname)}: expected {expected}, got {actual}"
)
return actual
def search_cloudsmith_packages(owner, repo, query, token, page_size=100):
"""Query the Cloudsmith packages API, paging through all results."""
url = CLOUDSMITH_API.format(owner=owner, repo=repo)
headers = {
"Authorization": f"Bearer {token}",
"Accept": "application/json",
}
all_packages = []
page = 1
while True:
params = {"query": query, "page": page, "page_size": page_size}
response = requests.get(url, headers=headers, params=params, timeout=30)
response.raise_for_status()
packages = response.json()
print(f"Cloudsmith API page {page}: {len(packages)} packages")
if not packages:
break
all_packages.extend(packages)
logger.debug("Fetched page %d with %d packages", page, len(packages))
page += 1
if len(packages) < page_size:
break
return all_packages
[docs]
def parse_version_info(version_str, tags, repo=None):
"""Parse a Cloudsmith version string into structured metadata.
Returns ``None`` (rather than raising) for unparseable strings,
ignored systems, or unknown carriers, so one unexpected package never
crashes resolution.
"""
parts = [p for p in version_str.strip("/").split("/") if p]
if len(parts) < 2:
return None
if repo == "sdg-hdl":
# hdl/releases/hdl_2022_r2/hdl_output/2024_02_15-06_47_17/adrv9371x_kcu105/
last = parts[-1]
pparts = last.split("_")
if len(pparts) < 2:
return None
project = pparts[0]
branch = parts[2]
else:
project = parts[0]
branch = parts[1]
# kuiper_partition releases have no timestamp segment.
if repo == "sdg-hdl":
timestamp = parts[4]
remaining = ModuleNotFoundError
else:
if branch.startswith("kuiper_partition"):
remaining = parts[2:]
timestamp = None
else:
if len(parts) < 3:
return None
timestamp = parts[2]
remaining = parts[3:]
# Skip a repeated "boot_partition" segment if present.
if repo == "sdg-hdl":
fpga_carrier = pparts[1]
# system = f"{fpga_carrier}-{project}"
system = None
variant = None
carrier = "Xilinx"
else:
if remaining and remaining[0] == "boot_partition":
remaining = remaining[1:]
# An "adi-" prefixed segment is the carrier family, not the system.
carrier = None
if remaining and remaining[0].startswith("adi-"):
carrier = remaining[0]
remaining = remaining[1:]
system = remaining[0] if remaining else None
remaining = remaining[1:] if remaining else []
variant = remaining[0] if remaining else None
if system:
for ignore in TO_IGNORE:
if system.startswith(ignore):
return None
fpga_carrier = None
if system:
for prefix, carrier_name in CARRIER_PREFIXES:
if system.startswith(prefix):
fpga_carrier = carrier_name
project = system[len(prefix) :] if len(system) > len(prefix) else system
break
else:
logger.debug("Unknown carrier in version string, skipping: %s", system)
return None
hdl_sha = None
linux_sha = None
if tags:
for tag in tags:
if "hdl" in tag:
hdl_sha = tag.split("-")[-1]
if "linux" in tag:
linux_sha = tag.split("-")[-1]
return {
"project": project,
"branch": branch,
"timestamp": timestamp,
"carrier": fpga_carrier,
"carrier_family": carrier,
"system": system,
"variant": variant,
"hdl_git_sha": hdl_sha,
"linux_git_sha": linux_sha,
}
def _as_list(value):
"""Normalize a filter field to a list of terms.
``None`` -> ``[]``, a bare ``str`` -> ``[str]``, any other iterable
(list/tuple) -> ``list(value)``. Lets ``vfilter``/``vnot`` accept a
single term or several.
"""
if value is None:
return []
if isinstance(value, str):
return [value]
return list(value)
[docs]
def get_latest_bootfiles(
owner,
repo,
fpga_carrier=None,
daughter_card=None,
vfilter=None,
vnot=None,
filename=None,
token=None,
pin=None,
):
"""Resolve the matching Cloudsmith package, returning the raw package dict.
Without ``pin`` the newest matching package (by ``uploaded_at``) is
returned; with ``pin`` the package whose ``version`` equals ``pin`` is
returned. The returned dict carries ``cdn_url``, ``checksums``
``version`` and ``uploaded_at`` plus a parsed ``_info`` entry.
"""
# query = f"filename:{filename} AND version:*{fpga_carrier}*{daughter_card}*"
# query = f"filename:{filename} AND version:*{daughter_card}*"
query = f"filename:{filename}"
if fpga_carrier:
query += f" AND version:*{fpga_carrier}*"
if daughter_card:
query += f" AND version:*{daughter_card}*"
for term in _as_list(vfilter):
query += f" AND version:*{term}*"
for term in _as_list(vnot):
query += f" AND version:~{term}"
packages = search_cloudsmith_packages(owner, repo, query, token, page_size=1000)
if not packages:
return None
print(f"Found {len(packages)} candidates")
candidates = []
for package in packages:
tags = package.get("tags", {})
if isinstance(tags, dict):
tags = tags.get("info", [])
info = parse_version_info(package.get("version", ""), tags, repo)
if info is None:
print(f"Skipping unparseable or ignored package version: {package.get('version')}")
continue
if fpga_carrier:
if fpga_carrier in info["carrier"]:
candidates.append({**package, "_info": info})
else:
candidates.append({**package, "_info": info})
if not candidates:
print("No candiates found anymore")
return None
if pin is not None:
for package in candidates:
if package.get("version") == pin:
return package
raise Exception(
f"Pinned version {pin!r} not found for carrier {fpga_carrier!r} / {daughter_card!r}"
)
candidates.sort(key=lambda p: p.get("uploaded_at", ""), reverse=True)
return candidates[0]
[docs]
@target_factory.reg_driver
@attr.s(eq=False)
class CloudsmithDLDriver(Driver):
"""Driver to resolve and download Cloudsmith boot artifacts.
Exposes the same ``get_boot_files_from_release()`` / ``_boot_files``
contract as KuiperDLDriver so it drops into the FPGA SoC boot strategies.
"""
bindings = {"cloudsmith_resource": {"CloudsmithRelease"}}
cache_datafile = "cache_info.json"
def __attrs_post_init__(self):
super().__attrs_post_init__()
self._boot_files = []
def _cache_path(self):
return os.path.expanduser(self.cloudsmith_resource.cache_path)
def _resolve(self):
"""Resolve the matching Cloudsmith package from the resource config."""
res = self.cloudsmith_resource
token = res.api_token
if not token:
raise Exception("No Cloudsmith API token (set CLOUDSMITH_API_TOKEN or api_token).")
package = get_latest_bootfiles(
res.owner,
res.repo,
res.fpga_carrier,
res.daughter_card,
res.vfilter,
res.vnot,
res.filename,
token,
pin=res.version,
)
if package is None:
raise Exception(
f"No {res.filename} found for carrier {res.fpga_carrier!r} / "
f"daughter card {res.daughter_card!r}"
)
return package
[docs]
def check_cached(self, version):
"""Return the cached boot-file path for ``version`` if present, else None."""
cache_path = self._cache_path()
cache_file_path = os.path.join(cache_path, self.cache_datafile)
if not os.path.exists(cache_file_path):
return None
with open(cache_file_path) as f:
cache_data = json.load(f)
entry = cache_data.get(version)
if entry and os.path.exists(entry["boot_file_path"]):
return entry["boot_file_path"]
return None
[docs]
def download_release(self, version=None):
"""Resolve, download, and cache the boot artifact; return its local path."""
res = self.cloudsmith_resource
package = self._resolve()
version = package.get("version")
cached = self.check_cached(version)
if cached:
self.logger.info(f"Cloudsmith artifact {version} is already cached.")
res.boot_file_path = cached
return cached
cache_path = self._cache_path()
# Use a filesystem-safe subdirectory derived from the version string.
safe_version = version.strip("/").replace("/", "_")
dest_dir = os.path.join(cache_path, safe_version)
os.makedirs(dest_dir, exist_ok=True)
boot_file_path = os.path.join(dest_dir, res.filename)
url = package.get("cdn_url")
self.logger.info(f"Downloading {res.filename} {version} from {url}")
headers = {"Authorization": f"Bearer {res.api_token}"}
Downloader().download(url, boot_file_path, headers=headers)
expected = (package.get("checksums") or {}).get("sha256")
verify_sha256(boot_file_path, expected)
cache_file_path = os.path.join(cache_path, self.cache_datafile)
cache_data = {}
if os.path.exists(cache_file_path):
with open(cache_file_path) as f:
cache_data = json.load(f)
cache_data[version] = {
"boot_file_path": boot_file_path,
"cdn_url": url,
"sha256": expected,
"download_time": time.ctime(),
"download_date": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()),
}
with open(cache_file_path, "w") as f:
json.dump(cache_data, f, indent=4)
self.logger.info(f"Cloudsmith artifact {version} cached successfully.")
res.boot_file_path = boot_file_path
return boot_file_path
[docs]
def get_boot_file_path(self, version=None):
"""Ensure the artifact is downloaded and return its local path."""
return self.download_release(version=version)
[docs]
def get_boot_files_from_release(self):
"""Strategy-facing contract: populate and return ``_boot_files``."""
self._boot_files = [self.get_boot_file_path()]
return self._boot_files