skills/pupil-labs-neon-realtime/SKILL.md
Connect to Pupil Labs Neon eye-tracking glasses and collect real-time data streams (video, gaze, IMU, events). Use when working with Neon hardware via the Real-time API for live data visualization, data collection, or eye-tracking applications requiring device discovery, multi-threaded streaming, or hardware troubleshooting.
npx skillsauth add stanislavjiricek/neuroflow pupil-labs-neon-realtimeInstall this skill globally with one command. Works with Claude Code, Cursor, and Windsurf.
3 of 9 scanners reported clean
Some scanners were skipped, did not run, or reported a non-clean status. Review each row below.
Connect to Pupil Labs Neon eye-tracking glasses and implement multi-threaded real-time data collection.
# Install dependencies
uv sync
# Run scripts
uv run python neon_video_viewer.py
uv run python neon_realtime_monitor.py
Required packages (Python 3.11+):
pupil-labs-realtime-api>=1.7.3 # Neon device communication
opencv-python>=4.12.0.88 # Video display
matplotlib>=3.10.7 # Plotting (use Agg backend)
numpy # Array operations
For manual pip install:
pip install pupil-labs-realtime-api opencv-python matplotlib numpy
Auto-discover device on local network (no manual IP needed):
from pupil_labs.realtime_api.simple import discover_one_device
device = discover_one_device(max_search_duration_seconds=10)
if device is None:
print("No device found.")
return
print(f"Connected: {device.serial_number_glasses}")
print(f"Phone: {device.phone_name} @ {device.phone_ip}")
Before running any script:
10-second timeout with no output: Check network connectivity and Companion app status.
Device found but data streams empty: Feature not enabled in Companion app (not API-configurable).
Intermittent disconnects: Use try/except TimeoutError on all receive methods.
Why threading is required: All receive_*() methods block until data arrives or timeout expires. Single-threaded code would miss data from other streams while waiting. Threading enables parallel collection from multiple streams without blocking.
Create dedicated thread per data stream with priority-based sleep strategy:
def data_collection_thread(self, data_type):
"""Separate thread per data type."""
while self.running:
try:
if data_type == 'video':
self.receive_video()
# NO sleep - maximum FPS (highest priority)
elif data_type == 'gaze':
self.receive_gaze()
time.sleep(0.01) # 10ms - yield CPU
elif data_type == 'events':
self.receive_events()
time.sleep(0.01) # 10ms
elif data_type == 'imu':
self.receive_imu()
time.sleep(0.005) # 5ms - higher frequency than gaze
except Exception as e:
if self.running:
print(f"Error in {data_type}: {e}")
time.sleep(0.001)
# Start as daemon threads (exit when main program exits)
thread = threading.Thread(target=self.data_collection_thread,
args=('video',), daemon=True)
thread.start()
Critical sleep strategy:
Always use locks when sharing data between collection and rendering threads:
self.data_lock = threading.Lock()
# Collection thread
with self.data_lock:
self.latest_frame = frame.copy()
# Rendering thread
with self.data_lock:
if self.latest_frame is not None:
display = self.latest_frame.copy()
Start video threads first with priority, allow initialization, then start data threads:
# Create threads
video_thread = threading.Thread(target=self.data_collection_thread,
args=('video',), daemon=True)
gaze_thread = threading.Thread(target=self.data_collection_thread,
args=('gaze',), daemon=True)
imu_thread = threading.Thread(target=self.data_collection_thread,
args=('imu',), daemon=True)
# Start video first (highest priority)
video_thread.start()
time.sleep(0.1) # Let video thread initialize
# Then start data collection threads
gaze_thread.start()
imu_thread.start()
Why this order matters: Video initialization can be slow; starting it first prevents other threads from overwhelming the system during startup.
frame_data = device.receive_scene_video_frame(timeout_seconds=0.0001)
if frame_data:
frame = frame_data.bgr_pixels # OpenCV-ready BGR numpy array
Returns different classes based on Companion app settings:
gaze = device.receive_gaze_datum(timeout_seconds=0.001)
# Always available
x, y = gaze.x, gaze.y # Pixel coordinates
worn = gaze.worn # Boolean
# With "Compute eye state" enabled
if hasattr(gaze, 'eyelid_aperture_left'):
# Aperture range: 0-10mm, normalize to 0-1
openness = min(1.0, gaze.eyelid_aperture_left / 10.0)
Requires "Compute fixations" enabled:
event = device.receive_eye_events(timeout_seconds=0.001)
if hasattr(event, 'event_type'):
if event.event_type == 4: # Blink
duration = (event.end_time_ns - event.start_time_ns) / 1e9
elif event.event_type == 0: # Saccade
amplitude = event.amplitude_angle_deg
elif event.event_type == 1: # Fixation
duration = (event.end_time_ns - event.start_time_ns) / 1e9
imu = device.receive_imu_datum(timeout_seconds=0.001)
# Accelerometer (linear motion) in g
accel_x = imu.accel_data.x
accel_y = imu.accel_data.y
accel_z = imu.accel_data.z
# Gyroscope (rotational motion) in rad/s
gyro_x = imu.gyro_data.x
gyro_y = imu.gyro_data.y
gyro_z = imu.gyro_data.z
# Orientation quaternion (w, x, y, z)
quaternion = imu.quaternion
Get temporally synchronized frame and gaze:
matched = device.receive_matched_scene_video_frame_and_gaze()
frame = matched.frame.bgr_pixels
gaze = matched.gaze
All receive methods accept timeout_seconds:
Always wrap in try/except:
try:
data = device.receive_gaze_datum(timeout_seconds=0.001)
except TimeoutError:
pass # No data available yet
self.running = False
time.sleep(0.5) # Let threads finish current operations
device.close()
hasattr() for optional attributes (eyelid, event details)Two complete working implementations provided:
scripts/neon_video_viewer.py - Minimal starter (100 lines)
scripts/neon_realtime_monitor.py - Full dashboard (750 lines)
Run directly or use as templates for custom applications.
testing
This skill should be used whenever the user mentions BIDS, Brain Imaging Data Structure, BIDS conversion, BIDS validation, BIDS compliance, organizing neuroimaging data, dataset_description.json, participants.tsv, bids-validator, pybids, MNE-BIDS, or asks how to structure EEG/MEG/fMRI/iEEG/PET/DWI data for sharing or preprocessing. Also invoke when the user asks how to name scan files, what sidecar JSON fields are needed, how to set up derivatives/, or how to run fMRIPrep/MRIQC on their dataset. Invoke proactively during /data, /data-preprocess, and /data-analyze phases whenever the dataset structure is relevant to the task at hand.
tools
Phase guidance for the /meeting command. Covers meeting file structure, recurring templates, attendee resolution from profiles, Google Calendar MCP integration, agenda preparation with project context, and action-item-to-task conversion at all three levels (project, flowie, hive).
data-ai
Worker-critic agentic loop protocol — orchestrator coordinates a worker agent and a critic agent across up to 3 revision cycles to produce a vetted output for any phase.
development
Knowledge base skill — Karpathy-style LLM-maintained wiki at three levels (personal/flowie, project, team/hive). Handles ingest, query, lint, schema, and project-tagging workflows. Invoked by /flowie --wiki-* (personal), /wiki (project), /hive --wiki-* (team).