.claude/skills/asyncredux-streams-timers/SKILL.md
Manage Streams and Timers with AsyncRedux. Covers creating actions to start/stop streams, storing stream subscriptions in store props, dispatching actions from stream callbacks, and proper cleanup with disposeProps().
npx skillsauth add marcglasberg/async_redux asyncredux-streams-timersInstall this skill globally with one command. Works with Claude Code, Cursor, and Windsurf.
3 of 9 scanners reported clean
Some scanners were skipped, did not run, or reported a non-clean status. Review each row below.
Two fundamental rules for working with streams and timers in AsyncRedux:
Don't send streams or timers down to widgets. Don't declare, subscribe, or unsubscribe to them inside widgets.
Don't put streams or timers in the Redux store state. They produce state changes, but they are not state themselves.
Instead, store streams and timers in the store's props - a key-value container that can hold any object type.
AsyncRedux provides methods for managing props in both Store and ReduxAction:
setProp(key, value)Stores an object (timer, stream subscription, etc.) in the store's props:
setProp('myTimer', Timer.periodic(Duration(seconds: 1), callback));
setProp('priceStream', priceStream.listen(onData));
prop<T>(key)Retrieves a property from the store:
var timer = prop<Timer>('myTimer');
var subscription = prop<StreamSubscription>('priceStream');
disposeProp(key)Disposes a single property by its key. Automatically cancels/closes timers, futures, and stream subscriptions:
disposeProp('myTimer'); // Cancels the timer and removes from props
disposeProps([predicate])Disposes multiple properties. Without a predicate, disposes all Timer, Future, and Stream-related props:
// Dispose all timers, futures, stream subscriptions
disposeProps();
// Dispose only timers
disposeProps(({Object? key, Object? value}) => value is Timer);
// Dispose props with specific keys
disposeProps(({Object? key, Object? value}) => key.toString().startsWith('temp_'));
Create an action that sets up a Timer.periodic and stores it in props:
class StartPollingAction extends ReduxAction<AppState> {
@override
AppState? reduce() {
// Store the timer in props
setProp('pollingTimer', Timer.periodic(
Duration(seconds: 5),
(timer) => dispatch(FetchDataAction()),
));
return null; // No state change from this action
}
}
Create an action to dispose the timer:
class StopPollingAction extends ReduxAction<AppState> {
@override
AppState? reduce() {
disposeProp('pollingTimer');
return null;
}
}
Access the timer's tick count in callbacks:
class StartTimerAction extends ReduxAction<AppState> {
@override
AppState? reduce() {
setProp('myTimer', Timer.periodic(
Duration(seconds: 1),
(timer) => dispatch(UpdateTickAction(timer.tick)),
));
return null;
}
}
class UpdateTickAction extends ReduxAction<AppState> {
final int tick;
UpdateTickAction(this.tick);
@override
AppState? reduce() => state.copy(tickCount: tick);
}
Create an action that subscribes to a stream and stores the subscription:
class StartListeningAction extends ReduxAction<AppState> {
@override
AppState? reduce() {
final subscription = myDataStream.listen(
(data) => dispatch(DataReceivedAction(data)),
onError: (error) => dispatch(StreamErrorAction(error)),
);
setProp('dataSubscription', subscription);
return null;
}
}
class StopListeningAction extends ReduxAction<AppState> {
@override
AppState? reduce() {
disposeProp('dataSubscription');
return null;
}
}
The stream callback dispatches an action with the data, which updates the state:
class DataReceivedAction extends ReduxAction<AppState> {
final MyData data;
DataReceivedAction(this.data);
@override
AppState? reduce() => state.copy(latestData: data);
}
Use StoreConnector's onInit and onDispose callbacks:
class PriceScreen extends StatelessWidget {
@override
Widget build(BuildContext context) {
return StoreConnector<AppState, _Vm>(
vm: () => _Factory(),
onInit: _onInit,
onDispose: _onDispose,
builder: (context, vm) => PriceWidget(price: vm.price),
);
}
void _onInit(Store<AppState> store) {
store.dispatch(StartPriceStreamAction());
}
void _onDispose(Store<AppState> store) {
store.dispatch(StopPriceStreamAction());
}
}
Start after store creation, stop when app closes:
void main() {
final store = Store<AppState>(initialState: AppState.initialState());
// Start app-wide streams/timers
store.dispatch(StartGlobalPollingAction());
runApp(StoreProvider<AppState>(
store: store,
child: MyApp(),
));
}
// In your app's dispose logic
store.dispatch(StopGlobalPollingAction());
store.disposeProps(); // Clean up all remaining props
store.shutdown();
Combine start/stop in one action:
class TogglePollingAction extends ReduxAction<AppState> {
final bool start;
TogglePollingAction(this.start);
@override
AppState? reduce() {
if (start) {
setProp('polling', Timer.periodic(
Duration(seconds: 5),
(_) => dispatch(RefreshDataAction()),
));
} else {
disposeProp('polling');
}
return null;
}
}
// State
class AppState {
final double price;
final bool isStreaming;
AppState({required this.price, required this.isStreaming});
static AppState initialState() => AppState(price: 0.0, isStreaming: false);
AppState copy({double? price, bool? isStreaming}) => AppState(
price: price ?? this.price,
isStreaming: isStreaming ?? this.isStreaming,
);
}
// Start streaming prices
class StartPriceStreamAction extends ReduxAction<AppState> {
@override
AppState? reduce() {
// Don't start if already streaming
if (state.isStreaming) return null;
final subscription = priceService.priceStream.listen(
(price) => dispatch(UpdatePriceAction(price)),
onError: (e) => dispatch(PriceStreamErrorAction(e)),
);
setProp('priceSubscription', subscription);
return state.copy(isStreaming: true);
}
}
// Stop streaming prices
class StopPriceStreamAction extends ReduxAction<AppState> {
@override
AppState? reduce() {
if (!state.isStreaming) return null;
disposeProp('priceSubscription');
return state.copy(isStreaming: false);
}
}
// Handle price updates
class UpdatePriceAction extends ReduxAction<AppState> {
final double price;
UpdatePriceAction(this.price);
@override
AppState? reduce() => state.copy(price: price);
}
// Handle stream errors
class PriceStreamErrorAction extends ReduxAction<AppState> {
final Object error;
PriceStreamErrorAction(this.error);
@override
AppState? reduce() {
// Stop streaming on error
disposeProp('priceSubscription');
return state.copy(isStreaming: false);
}
}
Use ConnectorTester to test lifecycle callbacks without full widget tests:
test('starts and stops polling on screen lifecycle', () async {
var store = Store<AppState>(initialState: AppState.initialState());
var connectorTester = store.getConnectorTester(PriceScreen());
// Simulate screen entering view
connectorTester.runOnInit();
var startAction = await store.waitAnyActionTypeFinishes([StartPriceStreamAction]);
expect(store.state.isStreaming, true);
// Simulate screen leaving view
connectorTester.runOnDispose();
var stopAction = await store.waitAnyActionTypeFinishes([StopPriceStreamAction]);
expect(store.state.isStreaming, false);
});
Call disposeProps() before shutting down the store to clean up all remaining timers and stream subscriptions:
// Clean up all Timer, Future, and Stream-related props
store.disposeProps();
// Shut down the store
store.shutdown();
The disposeProps() method automatically:
Timer objectsStreamSubscription objectsStreamController and StreamSink objectsFuture objects (to prevent unhandled errors)Regular (non-disposable) props are kept unless you provide a predicate that matches them.
URLs from the documentation:
data-ai
Show loading states and handle action failures in widgets. Covers `isWaiting(ActionType)` for spinners, `isFailed(ActionType)` for error states, `exceptionFor(ActionType)` for error messages, and `clearExceptionFor()` to reset failure states.
data-ai
Use `waitCondition()` inside actions to pause execution until state meets criteria. Covers waiting for price thresholds, coordinating between actions, and implementing conditional workflows.
testing
Handle user-facing errors with UserException. Covers throwing UserException from actions, setting up UserExceptionDialog, customizing error dialogs with `onShowUserExceptionDialog`, and using UserExceptionAction for non-interrupting error display.
tools
Implement undo/redo functionality using state observers. Covers recording state history with stateObserver, creating a RecoverStateAction, implementing undo for the full state or partial state, and managing history limits.