feathersdk.comms.comms_manager Module¶
- exception feathersdk.comms.comms_manager.CanNotEnabledError(*args: Any, **kwargs: Any)¶
Bases:
ExceptionError raised when a CAN interface is not enabled.
- exception feathersdk.comms.comms_manager.CanOverloadError¶
Bases:
ExceptionError raised when the CAN bus is overloaded.
- class feathersdk.comms.comms_manager.CommsManager(*args, **kwargs)¶
Bases:
objectHandles sending and receiving messages over CAN and TCP.
Provides a single point of entry for sending and receiving messages over CAN and TCP. Any new instances of this class will point to the same instance.
The manager also checks for message overload on the CAN bus and will raise an error if the message rate is too high.
You can
- add_callback(callback: Callable[[SocketResult], None]) None¶
Add a callback to be called for each message.
- add_endpoint(endpoint: str | List[str], enable_cans: bool = True, allow_no_enable_can: bool = False, skip_duplicates: bool = False) None¶
Add an endpoint to the comms manager.
- Parameters:
endpoint – The endpoint to add. Can be a string or a list of strings.
enable_cans – If True, will attempt to enable any CAN interface that is not already enabled.
allow_no_enable_can – If True, will not raise an error if any of the CAN interfaces are not enabled.
skip_duplicates – If True, will not raise an error if the endpoint is already added.
- cansend(interface: str, extended: bool, can_id: int, data: bytes, motor_uid: str | None = None) None¶
Send a CAN message.
- Parameters:
interface – The interface to send the message on.
extended – Whether the message is an extended (29-bit) CAN message, or a standard (11-bit) CAN message.
can_id – The CAN ID of the message. Must be in the range [0-0x1FFFFFFF] for extended messages, or [0-0x7FF] for standard messages.
data – The data to send in the message. Must be 8 bytes long.
motor_uid – The unique identifier of the motor that sent the message. Only needed in single message mode.
- clear_endpoints() None¶
Clear all endpoints from the comms manager.
- is_running() bool¶
Check if the background polling thread is currently running.
- on_abort() None¶
Called when the comms manager is aborted.
- on_fork() None¶
Called when the process forks.
- register_single_msg_checker(iface: str, func: Callable[[SocketResult], str | None]) None¶
Register a function to check for expected responses in single message mode.
- Parameters:
func – A function that takes a SocketResult and returns a string unique to the motor that sent the message, or None if the message is not expected and should be ignored.
- remove_callback(callback: Callable[[SocketResult], None]) None¶
Remove a callback from being called for each message.
- remove_endpoint(endpoint: str | List[str]) None¶
Remove an endpoint from the comms manager.
- Parameters:
endpoint – The endpoint to remove. Can be a string or a list of strings.
- reset() None¶
Reset the comms manager to empty initial state.
- set_is_dry(is_dry: bool) None¶
Enable/disable ‘dry run’ mode.
In dry run mode, the comms manager will not send any messages, and instead just show info about them
- set_single_message_mode(single_message_mode: bool) None¶
Set the comms manager to single message mode.
In single message mode, the comms manager will only send one message at a time to any one motor.
- tcpsend_modbus(ip: str, tid: int, uid: int, fcode: int, reg_addr: int, reg_val: int) None¶
Send a Modbus TCP message.
- exception feathersdk.comms.comms_manager.CommsManagerNotRunningError(*args: Any, **kwargs: Any)¶
Bases:
ExceptionError raised when the comms manager is not running.
- class feathersdk.comms.comms_manager.LoadAndTime(load, last_message_time)¶
Bases:
tuple- last_message_time¶
Alias for field number 1
- load¶
Alias for field number 0
- exception feathersdk.comms.comms_manager.MissingMotorUIDError(*args: Any, **kwargs: Any)¶
Bases:
ExceptionError raised when a motor UID is required but not provided.
- exception feathersdk.comms.comms_manager.PendingReceiveMessageError(*args: Any, **kwargs: Any)¶
Bases:
ExceptionError raised when a message is to be sent to a motor when we are awaiting a response in single message mode.
- class feathersdk.comms.comms_manager.SendCanMessage(*args: Any, **kwargs: Any)¶
Bases:
objectData class representing a CAN message that is about to be sent. Only used for WireHawk
- can_id: int¶
- data: bytes¶
- extended: bool¶
- iface: str¶
- motor_uid: str | None = None¶
- class feathersdk.comms.comms_manager.SendTCPMessage(*args: Any, **kwargs: Any)¶
Bases:
objectData class representing a TCP message that is about to be sent. Only used for WireHawk
- fcode: int¶
- ip: str¶
- reg_addr: int¶
- reg_val: int¶
- tid: int¶
- uid: int¶
- exception feathersdk.comms.comms_manager.SocketCANLibError¶
Bases:
ExceptionError raised when the socketcan library returns a non-zero error code.
- exception feathersdk.comms.comms_manager.UnknownInterfaceError(*args: Any, **kwargs: Any)¶
Bases:
ExceptionError raised when the interface is not an interface, or is not being tracked by CommsManager.