Source code for adi_lg_plugins.drivers.cyberpowerdriver

"""
Driver to control power via a CyberPower PDU using SNMP.
"""

import asyncio
import time

import attr
from labgrid.driver.common import Driver
from labgrid.driver.powerdriver import PowerResetMixin
from labgrid.factory import target_factory
from labgrid.protocol import PowerProtocol
from labgrid.step import step
from packaging.version import Version

try:
    from pysnmp import __version__ as __pysnmp_version__

    # Ensure we have a string version (not a mock object)
    if not isinstance(__pysnmp_version__, str):
        __pysnmp_version__ = "7.0.0"
except (ImportError, AttributeError):
    # During Sphinx doc building or when pysnmp is not available,
    # default to the newer API
    __pysnmp_version__ = "7.0.0"

if Version(__pysnmp_version__) < Version("7.0.0"):
    from pysnmp.hlapi import (
        CommunityData,
        ContextData,
        Integer32,
        ObjectIdentity,
        ObjectType,
        SnmpEngine,
        UdpTransportTarget,
        setCmd,
    )
else:
    from pysnmp.hlapi.v1arch.asyncio import (
        CommunityData,
        SnmpDispatcher,
        UdpTransportTarget,
        set_cmd,
    )
    from pysnmp.proto.api.v2c import Integer32
    from pysnmp.smi.rfc1902 import ObjectIdentity, ObjectType


class CyberPowerPduException(Exception):
    pass


[docs] class CyberPowerPdu: """ Class to query & control a CyberPower PDU via SNMP. Tested on the PDU15SWHVIEC8FNET. I don't understand SNMP well enough to have any idea if this would be expected to work on other models. This class is basically just a piece of copy-pasted pysnmp code and a depository for comments. :param host: IP address or hostname of the PDU on the network :type host: str """ outlet_state_oids = { "immediateOn": 1, "immediateOff": 2, "immediateReboot": 3, "delayedOn": 4, "delayedOff": 5, "delayedReboot": 6, "cancelPendingCommand": 7, "outletIdentify": 8, }
[docs] def __init__(self, host): self.host = host
[docs] async def async_set_outlet_on(self, outlet, on): """ Set an outlet on or off (async version for pysnmp >= 7.0.0) :param outlet: Which outlet to set the power for (for my model this is in the range 1 through 8) :param on: INVALID ATM True means turn it on, False means turn it off """ oid = ObjectIdentity(f"1.3.6.1.4.1.3808.1.1.3.3.3.1.1.4.{outlet}") if isinstance(on, bool): target_state = "immediateOn" if on else "immediateOff" else: target_state = on # Create transport target asynchronously ut = await UdpTransportTarget.create((self.host, 161)) # Use set_cmd and await it (v1arch API for pysnmp >= 7.0.0) errorIndication, errorStatus, errorIndex, varBinds = await set_cmd( SnmpDispatcher(), CommunityData("private"), ut, ObjectType(oid, Integer32(self.outlet_state_oids[target_state])), ) if errorIndication: raise CyberPowerPduException(errorIndication) elif errorStatus: raise CyberPowerPduException( "{} at {}".format( errorStatus.prettyPrint(), errorIndex and varBinds[int(errorIndex) - 1][0] or "?", ) )
[docs] def set_outlet_on(self, outlet, on): """ Set an outlet on or off (synchronous wrapper) :param outlet: Which outlet to set the power for (for my model this is in the range 1 through 8) :param on: INVALID ATM True means turn it on, False means turn it off """ if Version(__pysnmp_version__) >= Version("7.0.0"): # For pysnmp >= 7.0.0, use async version return asyncio.run(self.async_set_outlet_on(outlet, on)) else: # For pysnmp < 7.0.0, use synchronous version oid = ObjectIdentity(f"1.3.6.1.4.1.3808.1.1.3.3.3.1.1.4.{outlet}") if isinstance(on, bool): target_state = "immediateOn" if on else "immediateOff" else: target_state = on errorIndication, errorStatus, errorIndex, varBinds = next( setCmd( SnmpEngine(), CommunityData("private"), UdpTransportTarget((self.host, 161)), ContextData(), ObjectType(oid, Integer32(self.outlet_state_oids[target_state])), ) ) if errorIndication: raise CyberPowerPduException(errorIndication) elif errorStatus: raise CyberPowerPduException( "{} at {}".format( errorStatus.prettyPrint(), errorIndex and varBinds[int(errorIndex) - 1][0] or "?", ) )
[docs] @target_factory.reg_driver @attr.s(eq=False) class CyberPowerDriver(Driver, PowerResetMixin, PowerProtocol): """CyberPowerDriver - Driver using a CyberPower PDU to control a target's power """ bindings = {"cyberpower_outlet": {"CyberPowerOutlet"}} def __attrs_post_init__(self): super().__attrs_post_init__() self.pdu_dev = CyberPowerPdu(self.cyberpower_outlet.address) self.outlet = self.cyberpower_outlet.outlet
[docs] @Driver.check_active @step() def on(self): """Turn on the configured CyberPower PDU outlet. Uses SNMP to send an 'immediateOn' command to the outlet specified in the CyberPowerOutlet resource configuration. Raises: CyberPowerPduException: If SNMP communication fails. """ self.pdu_dev.set_outlet_on(self.outlet, True) self.logger.debug(f"Powered ON via CyberPower outlet {self.outlet}")
[docs] @Driver.check_active @step() def off(self): """Turn off the configured CyberPower PDU outlet. Uses SNMP to send an 'immediateOff' command to the outlet specified in the CyberPowerOutlet resource configuration. Raises: CyberPowerPduException: If SNMP communication fails. """ self.pdu_dev.set_outlet_on(self.outlet, False) self.logger.debug(f"Powered OFF via CyberPower outlet {self.outlet}")
[docs] @Driver.check_active @step() def reset(self): """Perform a power reset cycle on the outlet. This method turns off the outlet, waits for the configured delay period, then turns it back on. Useful for hard-resetting hardware. The delay duration is configured in the CyberPowerOutlet resource. Raises: CyberPowerPduException: If SNMP communication fails. """ self.off() self.logger.debug("Waiting %.1f seconds before powering ON", self.cyberpower_outlet.delay) time.sleep(self.cyberpower_outlet.delay) self.on()
[docs] @Driver.check_active @step() def cycle(self): """Power cycle the outlet (same as reset). Alias for reset(). Turns off the outlet, waits for the configured delay, then turns it back on. Raises: CyberPowerPduException: If SNMP communication fails. """ self.off() time.sleep(self.cyberpower_outlet.delay) self.on()
# @Driver.check_active # @step() # def get(self): # return all(outlet.is_on for outlet in self.outlets)