'''Motors — serial abstraction for two Arduino-driven stepper motors.
Hardware requirements
---------------------
Arduino
Any Arduino with a USB serial port and I2C support (Uno, Mega, etc.).
The board must be flashed with the ``acam3`` firmware bundled at
``hardware/arduino/acam3/acam3.ino``.
Adafruit Motor Shield v2
The shield (I2C address ``0x60``) must be seated on the Arduino.
It drives two stepper motors via the ``Adafruit_MotorShield`` library.
Standard 200-step-per-revolution motors are assumed.
Arduino libraries
``acam3.ino`` requires the following Arduino libraries:
- ``Adafruit Motor Shield V2 Library``
- ``AccelStepper``
:mod:`~QPolargraph.FlashFirmware` installs them automatically before
compiling. To install manually via the Arduino IDE Library Manager
or ``arduino-cli``::
arduino-cli lib install "Adafruit Motor Shield V2 Library"
arduino-cli lib install "AccelStepper"
Flashing the firmware
---------------------
Use the :mod:`~QPolargraph.FlashFirmware` module to detect an attached
Arduino and upload ``acam3.ino`` without opening the Arduino IDE::
qpolargraph-flash
Or from Python:
.. code-block:: python
from QPolargraph.FlashFirmware import FlashDialog
FlashDialog().exec()
See :mod:`QPolargraph.FlashFirmware` for integration into a
``QMainWindow`` application via a menu action.
'''
from QInstrument.lib.QSerialInstrument import QSerialInstrument
from qtpy import QtCore
import numpy as np
from parse import parse
from pathlib import Path
from time import sleep
import logging
import re
logger = logging.getLogger(__name__)
def _firmware_version() -> str:
ino = Path(__file__).parent / 'arduino' / 'acam3' / 'acam3.ino'
pattern = re.compile(r'#define\s+VERSION\s+"acam(\S+)"')
with ino.open() as f:
for line in f:
if m := pattern.match(line.strip()):
return m.group(1)
raise RuntimeError('VERSION not found in acam3.ino')
[docs]
class Motors(QSerialInstrument):
'''Abstraction of a pair of stepper motors controlled by an Arduino.
Communicates with an Arduino running the acam3 firmware over USB
serial. The Arduino drives two stepper motors via an Adafruit Motor
Shield.
Properties
----------
indexes : numpy.ndarray
``(n1, n2, status)`` — step indexes of the two stepper motors
and a running flag (1 while moving, 0 when stopped).
Setting ``(n1, n2)`` redefines the step-count origin.
motor_speed : numpy.ndarray
``(v1, v2)`` — maximum stepper motor speed [steps/s].
acceleration : numpy.ndarray
``(a1, a2)`` — acceleration [steps/s²].
Methods
-------
goto(n1, n2)
Move to target step counts ``(n1, n2)``.
home()
Equivalent to ``goto(0, 0)``.
stop()
Halt motion immediately.
release()
Stop motors and de-energise the windings.
running()
Return ``True`` if the motors are currently moving.
'''
FIRMWARE_VERSION = _firmware_version()
comm = dict(baudRate=QSerialInstrument.BaudRate.Baud115200,
dataBits=QSerialInstrument.DataBits.Data8,
stopBits=QSerialInstrument.StopBits.OneStop,
parity=QSerialInstrument.Parity.NoParity,
flowControl=QSerialInstrument.FlowControl.NoFlowControl,
eol='\n')
def __init__(self, portName: str | None = None, **kwargs):
super().__init__(portName, **(self.comm | kwargs))
# Matches acam3.ino setup(): stepper1/2.setAcceleration(1000.0)
self._acceleration = np.array([1000., 1000.])
def _registerProperties(self) -> None:
super()._registerProperties()
def _registerMethods(self) -> None:
super()._registerMethods()
self.registerMethod('home', self.home)
self.registerMethod('stop', self.stop)
self.registerMethod('release', self.release)
[docs]
def identify(self) -> bool:
'''Return ``True`` if
(1) the port responds with the correct acam3 version string, and
(2) the Adafruit Motor Shield is detected.
Waits 2 s after opening for the Arduino to reset, then sends ``Q``
and checks that the response is ``acam{FIRMWARE_VERSION}:OK``.
A response of ``acam{FIRMWARE_VERSION}:NOSHIELD`` indicates that the
Adafruit Motor Shield was not detected at I2C address ``0x60``.
'''
logger.info(f' Trying {self._interface.portName()}...')
sleep(2)
res = self.handshake('Q')
logger.debug(f' Received: {res}')
if 'acam' not in res:
return False
result = parse('acam{:>}:{:>}', res)
if result is None:
return False
fw_version, shield_status = result
if fw_version != self.FIRMWARE_VERSION:
logger.error(f' Arduino is running acam3 version {fw_version}')
logger.error(f' Install version {self.FIRMWARE_VERSION}')
return False
if shield_status != 'OK':
logger.error(' Adafruit Motor Shield not detected'
' (I2C address 0x60)')
logger.error(' Check that the shield is fully seated'
' on the Arduino')
logger.error(' and re-flash the firmware'
' if the problem persists')
return False
logger.info(f' Arduino running acam {fw_version}, motor shield OK')
return True
[docs]
def process(self, data: str) -> None:
'''Log unsolicited serial data from the Arduino at DEBUG level.
Called by :class:`QSerialInstrument` whenever the Arduino sends
a line that is not a direct response to a command (e.g. boot
messages or status updates). Subclasses may override to act on
specific unsolicited messages.
Parameters
----------
data : str
Raw line received from the serial port.
'''
logger.debug(f' received: {data}')
[docs]
def goto(self, n1: int, n2: int) -> None:
'''Move to target step counts.
Parameters
----------
n1 : int
Target step index for motor 1.
n2 : int
Target step index for motor 2.
'''
logger.debug(f' goto {n1} {n2}')
ok = self.expect(f'G:{n1}:{n2}', 'G')
if not ok:
logger.error(f'Could not set target indexes: ({n1},{n2})')
[docs]
def home(self) -> None:
'''Move to home position (step index 0, 0).'''
self.goto(0, 0)
[docs]
def stop(self) -> None:
'''Halt motor motion immediately.'''
ok = self.expect('S', 'S')
if not ok:
logger.error('Error stopping motion')
[docs]
def release(self) -> None:
'''De-energise motor coils.'''
ok = self.expect('X', 'X')
if not ok:
logger.error('Error releasing stepper motors!')
[docs]
def running(self) -> bool:
'''Return ``True`` if the motors are currently moving.
Reads the running flag from the ``P`` (position) query, which
is the same source used by :attr:`~Polargraph.Polargraph.position`
during scan loops. The firmware's ``R`` command remains available
for direct hardware queries but is not used here.
'''
return bool(self.indexes[2])
@property
def indexes(self) -> np.ndarray:
'''Current step counts ``(n1, n2, status)`` for both motors.'''
if not self.isOpen():
return np.array([0, 0, 0])
try:
_, n1, n2, running = self.handshake('P').split(':')
indexes = [int(n1), int(n2), int(running)]
logger.debug(f'{indexes}')
except Exception as ex:
logger.warning(f'Did not read position: {ex}')
indexes = [0, 0, 0]
return np.array(indexes)
@indexes.setter
def indexes(self, n) -> None:
n1, n2 = n
self.expect(f'P:{n1}:{n2}', 'P')
@property
def motor_speed(self) -> np.ndarray:
'''Maximum motor speed ``(v1, v2)`` [steps/s].'''
try:
res = self.handshake('V')
_, v1, v2 = res.split(':') if 'V:' in res else (0, 0, 0)
except Exception as ex:
logger.warning(f'Could not read maximum speed: {ex}')
v1, v2 = 0., 0.
return np.array([float(v1), float(v2)])
@motor_speed.setter
def motor_speed(self, v) -> None:
v1, v2 = int(round(v[0])), int(round(v[1]))
ok = self.expect(f'V:{v1}:{v2}', 'V')
if not ok:
logger.warning(f'Could not set maximum speed: ({v1},{v2})')
@property
def acceleration(self) -> np.ndarray:
'''Motor acceleration ``(a1, a2)`` [steps/s²].
Returns the cached Python value last written via the setter.
The acam3 firmware has no read-back command for acceleration,
so this will not reflect any value set before the current
session. The firmware default is 1000 steps/s²; the Python
default matches.
'''
return self._acceleration
@acceleration.setter
def acceleration(self, a) -> None:
a1, a2 = a
self._acceleration = np.array([a1, a2])
res = self.handshake(f'A:{a1}:{a2}')
logger.debug(f'acceleration: {res} {a1} {a2}')
def main():
'''Dev-only smoke test: find motors (or fall back to fake) and make a move.'''
from qtpy.QtCore import QCoreApplication
import sys
print('Motor subsystem test')
app = QCoreApplication(sys.argv)
motors = Motors().find()
if not motors.isOpen():
print('No Motors found. Using FakeMotors.')
from QPolargraph.hardware.fake import FakeMotors
motors = FakeMotors()
print(f'Current position: {motors.indexes}')
motors.goto(100, 50)
if motors.running():
print('Running...')
while motors.running():
pass
print(f'Final position: {motors.indexes}')
motors.close()
app.quit()
if __name__ == '__main__':
main()