feathersdk.comms.comms_manager Module

exception feathersdk.comms.comms_manager.CanNotEnabledError(*args: Any, **kwargs: Any)

Bases: Exception

Error raised when a CAN interface is not enabled.

exception feathersdk.comms.comms_manager.CanOverloadError

Bases: Exception

Error raised when the CAN bus is overloaded.

class feathersdk.comms.comms_manager.CommsManager(*args, **kwargs)

Bases: object

Handles sending and receiving messages over CAN and TCP.

Provides a single point of entry for sending and receiving messages over CAN and TCP. Any new instances of this class will point to the same instance.

The manager also checks for message overload on the CAN bus and will raise an error if the message rate is too high.

You can

add_callback(callback: Callable[[SocketResult], None]) None

Add a callback to be called for each message.

add_endpoint(endpoint: str | List[str], enable_cans: bool = True, allow_no_enable_can: bool = False, skip_duplicates: bool = False) None

Add an endpoint to the comms manager.

Parameters:
  • endpoint – The endpoint to add. Can be a string or a list of strings.

  • enable_cans – If True, will attempt to enable any CAN interface that is not already enabled.

  • allow_no_enable_can – If True, will not raise an error if any of the CAN interfaces are not enabled.

  • skip_duplicates – If True, will not raise an error if the endpoint is already added.

cansend(interface: str, extended: bool, can_id: int, data: bytes, motor_uid: str | None = None) None

Send a CAN message.

Parameters:
  • interface – The interface to send the message on.

  • extended – Whether the message is an extended (29-bit) CAN message, or a standard (11-bit) CAN message.

  • can_id – The CAN ID of the message. Must be in the range [0-0x1FFFFFFF] for extended messages, or [0-0x7FF] for standard messages.

  • data – The data to send in the message. Must be 8 bytes long.

  • motor_uid – The unique identifier of the motor that sent the message. Only needed in single message mode.

clear_endpoints() None

Clear all endpoints from the comms manager.

is_running() bool

Check if the background polling thread is currently running.

on_abort() None

Called when the comms manager is aborted.

on_fork() None

Called when the process forks.

register_single_msg_checker(iface: str, func: Callable[[SocketResult], str | None]) None

Register a function to check for expected responses in single message mode.

Parameters:

func – A function that takes a SocketResult and returns a string unique to the motor that sent the message, or None if the message is not expected and should be ignored.

remove_callback(callback: Callable[[SocketResult], None]) None

Remove a callback from being called for each message.

remove_endpoint(endpoint: str | List[str]) None

Remove an endpoint from the comms manager.

Parameters:

endpoint – The endpoint to remove. Can be a string or a list of strings.

reset() None

Reset the comms manager to empty initial state.

set_is_dry(is_dry: bool) None

Enable/disable ‘dry run’ mode.

In dry run mode, the comms manager will not send any messages, and instead just show info about them

set_single_message_mode(single_message_mode: bool) None

Set the comms manager to single message mode.

In single message mode, the comms manager will only send one message at a time to any one motor.

tcpsend_modbus(ip: str, tid: int, uid: int, fcode: int, reg_addr: int, reg_val: int) None

Send a Modbus TCP message.

exception feathersdk.comms.comms_manager.CommsManagerNotRunningError(*args: Any, **kwargs: Any)

Bases: Exception

Error raised when the comms manager is not running.

class feathersdk.comms.comms_manager.LoadAndTime(load, last_message_time)

Bases: tuple

last_message_time

Alias for field number 1

load

Alias for field number 0

exception feathersdk.comms.comms_manager.MissingMotorUIDError(*args: Any, **kwargs: Any)

Bases: Exception

Error raised when a motor UID is required but not provided.

exception feathersdk.comms.comms_manager.PendingReceiveMessageError(*args: Any, **kwargs: Any)

Bases: Exception

Error raised when a message is to be sent to a motor when we are awaiting a response in single message mode.

class feathersdk.comms.comms_manager.SendCanMessage(*args: Any, **kwargs: Any)

Bases: object

Data class representing a CAN message that is about to be sent. Only used for WireHawk

can_id: int
data: bytes
extended: bool
iface: str
motor_uid: str | None = None
class feathersdk.comms.comms_manager.SendTCPMessage(*args: Any, **kwargs: Any)

Bases: object

Data class representing a TCP message that is about to be sent. Only used for WireHawk

fcode: int
ip: str
reg_addr: int
reg_val: int
tid: int
uid: int
exception feathersdk.comms.comms_manager.SocketCANLibError

Bases: Exception

Error raised when the socketcan library returns a non-zero error code.

exception feathersdk.comms.comms_manager.UnknownInterfaceError(*args: Any, **kwargs: Any)

Bases: Exception

Error raised when the interface is not an interface, or is not being tracked by CommsManager.