"""Kuiper Downloader Driver for Labgrid."""
import hashlib
import json
import logging
import lzma
import os
import pathlib
import shutil
import time
import zipfile
import attr
import requests
from labgrid.driver.common import Driver
from labgrid.factory import target_factory
from requests.adapters import HTTPAdapter
from requests.packages.urllib3.util.retry import Retry
from tqdm import tqdm
from .imageextractor import IMGFileExtractor
[docs]
class Downloader:
"""Utility class for downloading and verifying Kuiper Linux releases.
This class handles downloading release archives from ADI's servers,
verifying MD5 checksums, and extracting compressed archives. It supports
both .xz and .zip compressed formats and displays progress bars using tqdm.
The Downloader is used internally by KuiperDLDriver but can also be used
standalone via the kuiperdl CLI tool.
Example:
>>> dl = Downloader()
>>> rel = dl.releases("2023_R2_P1")
>>> dl.download(rel["link"], rel["zipname"])
>>> dl.check(rel["zipname"], rel["zipmd5"])
>>> dl.extract(rel["zipname"], rel["imgname"])
"""
[docs]
def releases(self, release="2019_R1"):
rel = {}
valid_releases = ["2018_R2", "2019_R1", "2023_R2_P1"]
if release == "2018_R2":
rel["imgname"] = "2018_R2-2019_05_23.img"
rel["xzmd5"] = "c377ca95209f0f3d6901fd38ef2b4dfd"
rel["imgmd5"] = "59c2fe68118c3b635617e36632f5db0b"
elif release == "2019_R1":
rel["imgname"] = "2019_R1-2020_02_04.img"
rel["xzmd5"] = "49c121d5e7072ab84760fed78812999f"
rel["imgmd5"] = "40aa0cd80144a205fc018f479eff5fce"
elif release == "2023_R2_P1":
# https://swdownloads.analog.com/cse/kuiper/image_2025-03-18-ADI-Kuiper-full.zip
rel["imgname"] = "image_2025-03-18-ADI-Kuiper-full"
# rel["imgname"] = "2023_R2_P1-2025_03_18.img"
rel["zipmd5"] = "6c92259dd61520d08244012f6c92d7c6"
rel["imgmd5"] = "873b4977617e40725025aa4958f3ca7e"
else:
raise Exception(f"Unknown release version {release}. Valid releases: {valid_releases}")
if "xzmd5" in rel:
rel["link"] = "http://swdownloads.analog.com/cse/" + rel["imgname"] + ".xz"
rel["xzname"] = rel["imgname"] + ".xz"
elif "zipmd5" in rel:
rel["link"] = "https://swdownloads.analog.com/cse/kuiper/" + rel["imgname"] + ".zip"
rel["zipname"] = rel["imgname"] + ".zip"
return rel
[docs]
def retry_session(
self,
retries=3,
backoff_factor=0.3,
status_forcelist=(429, 500, 502, 504),
session=None,
):
session = session or requests.Session()
retry = Retry(
total=retries,
read=retries,
connect=retries,
backoff_factor=backoff_factor,
status_forcelist=status_forcelist,
)
adapter = HTTPAdapter(max_retries=retry)
session.mount("http://", adapter)
session.mount("https://", adapter)
return session
[docs]
def download(self, url, fname):
resp = self.retry_session().get(url, stream=True)
if not resp.ok:
raise Exception(os.path.basename(fname) + " - File not found!")
total = int(resp.headers.get("content-length", 0))
sha256_hash = hashlib.sha256()
with (
open(fname, "wb") as file,
tqdm(
desc=fname,
total=total,
unit="iB",
unit_scale=True,
unit_divisor=1024,
) as bar,
):
for data in resp.iter_content(chunk_size=1024):
size = file.write(data)
sha256_hash.update(data)
bar.update(size)
hash = sha256_hash.hexdigest()
with open(os.path.join(os.path.dirname(fname), "hashes.txt"), "a") as h:
h.write(f"{os.path.basename(fname)},{hash}\n")
[docs]
def check(self, fname, ref, find_img=False):
print("Checking " + fname + " against reference MD5: " + ref)
hash_md5 = hashlib.md5()
if find_img and not os.path.isfile(fname):
# Search for img file in same directory
dirpath = os.path.abspath(fname)
# dirpath = os.path.dirname(fname)
for file in os.listdir(dirpath):
if file.endswith(".img"):
fname = os.path.join(dirpath, file)
print(f"Found image file {fname} for MD5 check")
break
if not os.path.isfile(fname):
raise Exception("No image file found for MD5 check")
else:
print("Using file " + fname + " for MD5 check")
tlfile = pathlib.Path(fname)
total = os.path.getsize(tlfile)
with (
open(fname, "rb") as f,
tqdm(
desc="Hashing: " + fname,
total=total,
unit="iB",
unit_scale=True,
unit_divisor=1024,
) as bar,
):
for chunk in iter(lambda: f.read(4096), b""):
hash_md5.update(chunk)
size = len(chunk)
bar.update(size)
h = hash_md5.hexdigest()
if h == ref:
print("MD5 Check: PASSED")
else:
print("MD5 Check: FAILED")
raise Exception("MD5 hash check failed")
return fname
def _cache_lookup(cache_path: str, release_version: str) -> str | None:
"""Return the cached .img path for a release if recorded + on disk, else None."""
cache_file = os.path.join(cache_path, "cache_info.json")
if not os.path.exists(cache_file):
return None
with open(cache_file) as f:
cache_data = json.load(f)
entry = cache_data.get(release_version)
if entry and os.path.exists(entry["image_path"]):
return entry["image_path"]
return None
def download_release_image(release_version: str, cache_path: str, *, logger=None) -> str:
"""Download + verify + extract the Kuiper full image for ``release_version``
into ``cache_path`` (idempotent: returns the cached .img if already present).
Shared by ``KuiperDLDriver.download_release`` (resource-bound) and the CI
``kuiper_xsa`` helper (no target). Returns the cached ``.img`` path."""
log = logger or logging.getLogger(__name__)
cached = _cache_lookup(cache_path, release_version)
if cached is not None:
log.info("Kuiper release %s already cached at %s", release_version, cached)
return cached
os.makedirs(cache_path, exist_ok=True)
downloader = Downloader()
rel_info = downloader.releases(release_version)
log.info("Downloading Kuiper release %s from %s", release_version, rel_info["link"])
name_archive = rel_info["xzname"] if "xzname" in rel_info else rel_info["zipname"]
md5_archive = rel_info["xzmd5"] if "xzmd5" in rel_info else rel_info["zipmd5"]
tarball_path = os.path.join(cache_path, name_archive)
downloader.download(rel_info["link"], name_archive)
downloader.check(name_archive, md5_archive)
downloader.extract(name_archive, rel_info["imgname"])
img_file = downloader.check(rel_info["imgname"], rel_info["imgmd5"], find_img=True)
img_filename = os.path.basename(img_file)
target_path = os.path.join(cache_path, img_filename)
shutil.move(img_file, target_path)
if os.path.exists(tarball_path):
os.remove(tarball_path)
if os.path.isfile(name_archive):
os.remove(name_archive)
if os.path.isdir(rel_info["imgname"]):
os.rmdir(rel_info["imgname"])
cache_file = os.path.join(cache_path, "cache_info.json")
cache_data = {}
if os.path.exists(cache_file):
with open(cache_file) as f:
cache_data = json.load(f)
cache_data[release_version] = {
"image_path": target_path,
"download_time": time.ctime(),
"download_date": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()),
}
with open(cache_file, "w") as f:
json.dump(cache_data, f, indent=4)
log.info("Kuiper release %s cached at %s", release_version, target_path)
return target_path
[docs]
@target_factory.reg_driver
@attr.s(eq=False)
class KuiperDLDriver(Driver):
"""KuiperDLDriver - Driver to download and manage Kuiper releases and provide
files to the target device.
"""
bindings = {"kuiper_resource": {"KuiperRelease"}}
cache_datafile = "cache_info.json"
def __attrs_post_init__(self):
super().__attrs_post_init__()
self._boot_files = []
[docs]
def check_cached(self, release_version=None):
"""Check if the specified Kuiper release version is cached locally.
Args:
release_version (str): Version of the Kuiper release to check. If None, uses the version from kuiper_resource.
Returns:
bool: True if the release is cached, False otherwise.
"""
cache_path = self.kuiper_resource.cache_path
if not os.path.exists(cache_path):
os.makedirs(cache_path)
cache_file_path = os.path.join(cache_path, self.cache_datafile)
if not os.path.exists(cache_file_path):
return False
if release_version is None:
release_version = self.kuiper_resource.release_version
# Read cache file and check version
with open(cache_file_path) as f:
cache_data = json.load(f)
for release in cache_data:
if release == release_version:
# Verify that the tarball path exists
image_path = cache_data[release]["image_path"]
if os.path.exists(image_path):
return True
return False
[docs]
def download_release(self, release_version=None):
"""Download the specified Kuiper release version if not already cached."""
if release_version is None:
release_version = self.kuiper_resource.release_version
download_release_image(release_version, self.kuiper_resource.cache_path, logger=self.logger)
[docs]
def get_full_image_path(self, release_version=None):
"""Return the cached full SD image (.img) path for the configured release.
Downloads + extracts the release first if it isn't cached. Caller is
responsible for activating/deactivating the driver.
"""
if release_version is None:
release_version = self.kuiper_resource.release_version
if not self.check_cached(release_version):
self.download_release(release_version)
cache_file_path = os.path.join(self.kuiper_resource.cache_path, self.cache_datafile)
with open(cache_file_path) as f:
cache_data = json.load(f)
return cache_data[release_version]["image_path"]
[docs]
def get_boot_files_from_release(self, get_all_files=False):
if not self.check_cached():
self.download_release()
with open(os.path.join(self.kuiper_resource.cache_path, self.cache_datafile)) as f:
cache_data = json.load(f)
release_info = cache_data[self.kuiper_resource.release_version]
img = IMGFileExtractor(release_info["image_path"], logger=self.logger)
for i, part in enumerate(img.get_partitions()):
self.logger.debug(f" {i}: {part['description']} - Offset: {part['start']} bytes")
# List files in FAT partition
partitions_info = img.get_partitions()
fat_partition = None
for part in partitions_info:
if "FAT" in part["description"]:
fat_partition = part
break
if fat_partition is None:
raise Exception("No FAT partition found in Kuiper image")
fs = img.open_filesystem(fat_partition["start"])
files = img.list_files(fs, "/")
files_str = ""
for f in files:
files_str += f"{f['type']}: {f['path']} ({f['size']} bytes)\n"
if get_all_files:
return files
# Extract boot files
output_dir = os.path.join(self.kuiper_resource.cache_path, "boot_files")
if not os.path.exists(output_dir):
os.makedirs(output_dir)
def process_path(path, name):
"""Process a path that may be a release reference or file path."""
from_img = False
if path and "release:" in path:
path = path.replace("release:", "")
from_img = True
if path[0] != "/":
path = f"/{path}"
elif path and "release:" not in path:
if not os.path.isfile(path):
raise ValueError(f"Specified {name} path {path} does not exist")
return path, from_img
kernel, kernel_from_img = process_path(self.kuiper_resource.kernel_path, "kernel")
bootbin, bootbin_from_img = process_path(self.kuiper_resource.BOOTBIN_path, "BOOTBIN")
device_tree, device_tree_from_img = process_path(
self.kuiper_resource.device_tree_path, "device tree"
)
self.logger.debug(f"\nExtracting boot files to {output_dir}")
files_to_extract = [
# "/README.txt",
kernel if kernel_from_img else None,
bootbin if bootbin_from_img else None,
device_tree if device_tree_from_img else None,
]
copy_files = []
for file_path in files_to_extract:
if file_path is None:
continue
if not img.extract_file(
fs, file_path, os.path.join(output_dir, os.path.basename(file_path))
):
img.close()
raise Exception(f"Available files {files_str}\n\nFailed to extract {file_path}")
copy_files.append(os.path.join(output_dir, os.path.basename(file_path)))
img.close()
files_to_copy = [
None if kernel_from_img else kernel,
None if bootbin_from_img else bootbin,
None if device_tree_from_img else device_tree,
]
for file_path in files_to_copy:
if file_path:
target_path = os.path.join(output_dir, os.path.basename(file_path))
assert os.path.isfile(file_path), f"File {file_path} does not exist"
shutil.copyfile(file_path, target_path)
copy_files.append(target_path)
self.logger.info("Boot files extracted successfully:")
self._boot_files = copy_files
return self._boot_files
[docs]
def add_files_to_target(self, filename):
"""Add a file to the target device.
Args:
filename (str): Path to the file to add to the target.
"""
if not os.path.isfile(filename):
raise ValueError(f"File {filename} does not exist")
self._boot_files.append(filename)