Adapting to a Different Respiratory Sensor¶
respyra ships with support for the Vernier Go Direct Respiration Belt (GDX-RB). This belt was chosen because it is inexpensive, well-documented, and available worldwide through educational science suppliers. It provides wireless (BLE) respiratory force measurement out of the box, making it accessible to researchers and teaching labs without specialized biomedical equipment budgets.
However, some applications may require more sophisticated respiratory monitoring — for example, dual-band respiratory inductance plethysmography (RIP) or spirometry. respyra’s architecture separates the belt I/O from the experiment logic, making it straightforward to substitute a different sensor. This guide explains the interface contract and shows how to implement a custom sensor class.
The BreathBelt interface contract¶
The experiment scripts interact with the belt through these methods and properties:
start()¶
Open the device, configure sensors, and begin streaming in a background thread. Must be called on the main thread (required for BLE on Windows).
get_latest() → tuple[float, float] | None¶
Return the most recent (timestamp, force_value) sample, discarding older ones. Returns None if no new data is available. This is non-blocking and safe to call every frame.
get_all() → list[tuple[float, float]]¶
Drain and return all queued samples since the last call. Returns a list of (timestamp, force_value) tuples in chronological order. May be empty.
stop()¶
Signal the background thread to stop, join it, and close the device. Safe to call multiple times.
is_running (property)¶
True if the background thread is alive and no error has occurred.
Context manager¶
BreathBelt supports with statements for automatic cleanup:
with BreathBelt() as belt:
sample = belt.get_latest()
Thread + queue architecture¶
The current implementation uses a background thread that calls the sensor’s blocking read() in a loop and pushes (timestamp, force) tuples into a queue.Queue. The main thread (running PsychoPy’s frame loop) drains the queue via get_latest() or get_all() without blocking.
┌──────────────┐ ┌──────────────┐
│ Reader thread │ queue │ Main thread │
│ │ ───────► │ (PsychoPy) │
│ sensor.read()│ │ get_latest() │
└──────────────┘ └──────────────┘
This pattern is necessary because:
The sensor read call blocks for the full sampling period (e.g., 100 ms)
PsychoPy’s frame loop must run at the monitor refresh rate (typically 60 Hz ≈ 16.7 ms)
A blocking read inside the frame loop would cause frame drops
Implementing a custom sensor¶
To substitute a different sensor, create a class that matches the interface above. Here is a skeleton:
import queue
import threading
import time
class CustomSensor:
"""Drop-in replacement for BreathBelt using a hypothetical sensor."""
def __init__(self, port: str = "/dev/ttyUSB0", sample_rate_hz: float = 10.0):
self._port = port
self._period = 1.0 / sample_rate_hz
self._queue: queue.Queue[tuple[float, float]] = queue.Queue()
self._stop_event = threading.Event()
self._thread = None
self._started = False
def start(self) -> None:
"""Open the device and launch the reader thread."""
self._device = self._open_device(self._port)
self._stop_event.clear()
self._thread = threading.Thread(
target=self._reader_loop, daemon=True,
)
self._started = True
self._thread.start()
def get_latest(self):
"""Return the most recent (timestamp, value) or None."""
latest = None
try:
while True:
latest = self._queue.get_nowait()
except queue.Empty:
pass
return latest
def get_all(self):
"""Drain and return all queued samples."""
samples = []
try:
while True:
samples.append(self._queue.get_nowait())
except queue.Empty:
pass
return samples
def stop(self) -> None:
"""Stop the reader thread and close the device."""
if not self._started:
return
self._stop_event.set()
if self._thread and self._thread.is_alive():
self._thread.join(timeout=2.0)
self._close_device()
self._started = False
@property
def is_running(self) -> bool:
return self._started and self._thread.is_alive()
def __enter__(self):
self.start()
return self
def __exit__(self, *args):
self.stop()
# -- Internal methods (implement for your hardware) --
def _open_device(self, port):
"""Open a connection to your sensor hardware."""
raise NotImplementedError
def _close_device(self):
"""Close the hardware connection."""
raise NotImplementedError
def _read_sample(self):
"""Read one sample from the device (blocking)."""
raise NotImplementedError
def _reader_loop(self):
"""Background loop: read and enqueue samples."""
while not self._stop_event.is_set():
value = self._read_sample()
self._queue.put((time.time(), value))
Key constraints¶
Non-blocking main thread —
get_latest()andget_all()must never block.Tuple format — samples are
(timestamp, force)where timestamp istime.time()and force is in Newtons (or your chosen unit — update config accordingly).Main-thread start for BLE — if your sensor uses BLE on Windows,
start()must run on the main thread before importing PsychoPy.Clean shutdown —
stop()must reliably terminate the background thread and release hardware resources.