from __future__ import annotations
from enum import auto, Enum
from typing import TYPE_CHECKING
from qtpy import QtCore
from qtpy.QtCore import QCoreApplication
import numpy as np
import time
import logging
if TYPE_CHECKING:
from QPolargraph.hardware.Polargraph import Polargraph
logger = logging.getLogger(__name__)
class ScanState(Enum):
'''State of the scan pattern motion controller.
IDLE : No motion in progress.
MOVING : Moving without collecting data (positioning or homing).
SCANNING : Moving and collecting data.
PAUSED : Motion suspended; trajectory is saved for resumption.
'''
IDLE = auto()
MOVING = auto()
SCANNING = auto()
PAUSED = auto()
class _MoveResult(Enum):
COMPLETE = auto()
PAUSED = auto()
ABANDONED = auto()
# TODO: Should the role of QScanPattern be played by a subclass
# of Polargraph? The subclass would be a Polargraph with a built-in
# scan pattern. This might simplify application design
# by eliminating the need to connect signals and slots between
# the scan pattern and the hardware interface.
[docs]
class QScanPattern(QtCore.QObject):
'''Base class for polargraph scan-trajectory patterns.
Manages the scan geometry and drives the polargraph through a
sequence of waypoints. Subclasses override :meth:`vertices` and
:meth:`trajectory` to define different scan patterns.
The motion controller is always in one of four states
(:class:`ScanState`):
* **IDLE** — no motion.
* **MOVING** — positioning to the scan start or returning home;
data are not collected.
* **SCANNING** — actively scanning; :meth:`dataReady` is emitted
at each position poll.
* **PAUSED** — motion suspended; the remaining trajectory is saved
and the polargraph holds its current position.
Properties
----------
width : float
Horizontal extent of the scan area [m]. Default: 0.6.
height : float
Vertical extent of the scan area [m]. Default: 0.6.
dx : float
Horizontal offset of the scan area center from the polargraph
centerline [m]. Default: 0.
dy : float
Vertical offset of the scan area top edge below the home
position [m]. Default: 0.1.
step : float
Spacing between scan lines [mm]. Default: 5.
Signals
-------
dataReady(numpy.ndarray)
Emitted with ``(x, y, t)`` — Cartesian position [m] and a
:func:`time.monotonic` timestamp [s] — at every position poll
during motion (MOVING and SCANNING states). ``t`` allows
post-processing correlation with independently sampled
instruments. Connect to belt animation and, gated on
:meth:`scanning`, to instrument data collection. (Callers that
only need ``x, y`` may ignore ``data[2]``.)
stateChanged(ScanState)
Emitted on every state-machine transition. Subsumes the former
``moveFinished`` and ``scanFinished`` signals.
closeRequested()
Emitted when the state returns to IDLE after
:meth:`interruptAndClose` was called. Connect to the
application window's ``close()`` slot to shut down cleanly
after all motion stops.
'''
dataReady = QtCore.Signal(np.ndarray)
stateChanged = QtCore.Signal(object)
closeRequested = QtCore.Signal()
def __init__(self, *args,
width: float = 0.6,
height: float = 0.6,
dx: float = 0.,
dy: float = 0.1,
step: float = 5,
polargraph: Polargraph,
**kwargs):
super().__init__(**kwargs)
self._width = width
self._height = height
self._dx = dx
self._dy = dy
self._step = step
self.polargraph = polargraph
self._state = ScanState.IDLE
self._paused = False
self._abandon = False
self._closing = False
self._paused_vertices = None
self._pre_pause_state = None
self._continuation = None
@property
def width(self) -> float:
'''Horizontal extent of the scan area [m].'''
return self._width
@width.setter
def width(self, value: float) -> None:
self._width = float(value)
@property
def height(self) -> float:
'''Vertical extent of the scan area [m].'''
return self._height
@height.setter
def height(self, value: float) -> None:
self._height = float(value)
@property
def dx(self) -> float:
'''Horizontal offset of scan center from polargraph centerline [m].'''
return self._dx
@dx.setter
def dx(self, value: float) -> None:
self._dx = float(value)
@property
def dy(self) -> float:
'''Vertical offset of scan top edge below home position [m].'''
return self._dy
@dy.setter
def dy(self, value: float) -> None:
self._dy = float(value)
@property
def step(self) -> float:
'''Spacing between scan lines [mm].'''
return self._step
@step.setter
def step(self, value: float) -> None:
self._step = float(value)
[docs]
def isOpen(self) -> bool:
'''Return ``True`` — scan patterns are always available.'''
return True
@property
def rect(self) -> list:
'''Bounding rectangle ``[x1, y1, x2, y2]`` of the scan area [m].'''
x1 = self.dx - self.width / 2.
y1 = self.polargraph.y0 + self.dy
x2 = x1 + self.width
y2 = y1 + self.height
return [x1, y1, x2, y2]
[docs]
def vertices(self) -> np.ndarray:
'''Vertices of the scan trajectory.
Returns
-------
numpy.ndarray
``(nvertices, 2)`` array of ``(x, y)`` waypoints [m].
'''
x1, y1, x2, y2 = self.rect
return np.array([[x1, y1], [x2, y1], [x2, y2], [x1, y2], [x1, y1]])
[docs]
def trajectory(self) -> np.ndarray:
'''Coordinates along the scan path for display.
Returns
-------
numpy.ndarray
``(2, npts)`` array of ``(x, y)`` coordinates [m].
'''
return self.vertices().T
[docs]
def scanning(self) -> bool:
'''Return ``True`` if the scanner is actively collecting data.'''
return self._state == ScanState.SCANNING
[docs]
def moving(self) -> bool:
'''Return ``True`` if the scanner is in motion (MOVING or SCANNING).'''
return self._state in (ScanState.MOVING, ScanState.SCANNING)
[docs]
def active(self) -> bool:
'''Return ``True`` if the scanner is in any non-IDLE state.'''
return self._state != ScanState.IDLE
def _setState(self, state: ScanState) -> None:
if state != self._state:
self._state = state
self.stateChanged.emit(state)
def _setIdle(self) -> None:
self._setState(ScanState.IDLE)
if self._closing:
self._closing = False
self.closeRequested.emit()
def _resetTrajectory(self) -> None:
self._paused = False
self._abandon = False
self._paused_vertices = None
self._pre_pause_state = None
self._continuation = None
def _moveTo(self, vertices) -> _MoveResult:
'''Move through a sequence of waypoints.
Parameters
----------
vertices : list of array-like
Sequence of ``(x, y)`` target positions [m].
Returns
-------
_MoveResult
``COMPLETE`` or ``PAUSED``.
'''
for i, vertex in enumerate(vertices):
self.polargraph.moveTo(*vertex)
while True:
QCoreApplication.processEvents()
if self._abandon:
self.polargraph.stop()
self.polargraph.release()
self._abandon = False
return _MoveResult.ABANDONED
if self._paused:
self.polargraph.stop()
self._paused_vertices = [vertices[j]
for j in range(i, len(vertices))]
return _MoveResult.PAUSED
x, y, moving = self.polargraph.position
t = time.monotonic()
self._onMeasure(t, x, y)
self.dataReady.emit(np.array([x, y, t]))
if not moving:
break
self.polargraph.release()
return _MoveResult.COMPLETE
def _onMeasure(self, t: float, x: float, y: float) -> None:
'''Called at each position poll, before :attr:`dataReady` is emitted.
Runs in the polargraph device thread. Override in subclasses to
trigger a synchronous instrument read that should be associated
with position ``(x, y)`` at time ``t``. The default
implementation is a no-op.
Parameters
----------
t : float
Timestamp from :func:`time.monotonic` [s].
x : float
Current horizontal coordinate [m].
y : float
Current vertical coordinate [m].
'''
[docs]
@QtCore.Slot()
def home(self) -> None:
'''Move payload to the home position.'''
if self._state in (ScanState.MOVING, ScanState.SCANNING):
return
self._resetTrajectory()
self._setState(ScanState.MOVING)
self._moveTo([[0., self.polargraph.y0]])
self._setIdle()
[docs]
@QtCore.Slot()
def center(self) -> None:
'''Move payload to the center of the scan area.'''
if self._state in (ScanState.MOVING, ScanState.SCANNING):
return
self._resetTrajectory()
self._setState(ScanState.MOVING)
y = self.polargraph.y0 + self.dy + self.height / 2.
self._moveTo([[self.dx, y]])
self._setIdle()
[docs]
@QtCore.Slot()
def scan(self) -> None:
'''Execute a full scan, then return home.
State transitions during a normal scan:
``IDLE → MOVING`` (positioning to start)
``→ SCANNING`` (collecting data)
``→ MOVING`` (returning home)
``→ IDLE``
The scan may be paused at any point via :meth:`pause` and
resumed via :meth:`resume`. Calling :meth:`home`,
:meth:`center`, or :meth:`abandon` while paused discards the
saved trajectory.
'''
if self._state != ScanState.IDLE:
return
vertices = list(self.vertices())
self._setState(ScanState.MOVING)
result = self._moveTo([vertices[0]])
if result == _MoveResult.COMPLETE:
self._continueScan(vertices[1:])
elif result == _MoveResult.PAUSED:
self._pre_pause_state = ScanState.MOVING
self._continuation = lambda: self._continueScan(vertices[1:])
self._setState(ScanState.PAUSED)
else:
self._setIdle()
def _continueScan(self, remaining: list) -> None:
self._setState(ScanState.SCANNING)
result = self._moveTo(remaining)
if result == _MoveResult.COMPLETE:
self._homeAfterScan()
elif result == _MoveResult.PAUSED:
self._pre_pause_state = ScanState.SCANNING
self._continuation = self._homeAfterScan
self._setState(ScanState.PAUSED)
else:
self._setIdle()
def _homeAfterScan(self) -> None:
self._setState(ScanState.MOVING)
result = self._moveTo([[0., self.polargraph.y0]])
if result == _MoveResult.COMPLETE:
self._setIdle()
elif result == _MoveResult.PAUSED:
self._pre_pause_state = ScanState.MOVING
self._continuation = self._setIdle
self._setState(ScanState.PAUSED)
else:
self._setIdle()
[docs]
@QtCore.Slot()
def pause(self) -> None:
'''Pause motion at the end of the current move.
The polargraph stops at its current target vertex and holds
position. Call :meth:`resume` to continue or :meth:`abandon`
to discard the remaining trajectory.
'''
if self._state in (ScanState.MOVING, ScanState.SCANNING):
self._paused = True
[docs]
@QtCore.Slot()
def resume(self) -> None:
'''Resume a paused trajectory.
Re-issues the last target vertex and continues with the
remaining waypoints.
'''
if self._state != ScanState.PAUSED:
return
saved_vertices = self._paused_vertices or []
saved_state = self._pre_pause_state or ScanState.MOVING
continuation = self._continuation
self._paused = False
self._paused_vertices = None
self._pre_pause_state = None
self._continuation = None
self._setState(saved_state)
result = self._moveTo(saved_vertices)
if result == _MoveResult.COMPLETE:
if continuation:
continuation()
else:
self._setIdle()
elif result == _MoveResult.PAUSED:
self._pre_pause_state = saved_state
self._continuation = continuation
self._setState(ScanState.PAUSED)
else:
self._setIdle()
[docs]
@QtCore.Slot()
def toggle(self) -> None:
'''Start, pause, or resume based on current state.
Calling this slot from any thread is safe: it is the preferred
way for the GUI to drive the scan state machine. Connects to
the Scan/Pause/Resume button.
'''
if self._state == ScanState.IDLE:
self.scan()
elif self._state == ScanState.PAUSED:
self.resume()
else:
self.pause()
[docs]
@QtCore.Slot()
def abandon(self) -> None:
'''Abandon the current trajectory and return to IDLE.
If the scanner is moving, the motors are stopped immediately.
If paused, the saved trajectory is discarded. The polargraph
remains at its current position; call :meth:`home` to return
to the home position.
'''
if self._state == ScanState.PAUSED:
self._resetTrajectory()
self._setIdle()
elif self._state != ScanState.IDLE:
self._abandon = True
[docs]
@QtCore.Slot()
def interruptAndClose(self) -> None:
'''Abandon motion and emit :attr:`closeRequested` on reaching IDLE.
Used by the application window's ``closeEvent`` to ensure the
polargraph stops cleanly before the application exits.
'''
self._closing = True
if self._state == ScanState.PAUSED:
self._resetTrajectory()
self._setIdle()
elif self._state != ScanState.IDLE:
self._abandon = True
else:
self._closing = False
self.closeRequested.emit()