concurrent service tasks

This commit is contained in:
Thibault Deckers 2020-12-17 20:52:05 +09:00
parent 431cf0652c
commit 4a6622de49
2 changed files with 21 additions and 8 deletions

View file

@ -10,7 +10,10 @@ class ServicePolicy {
final StreamController<QueueState> _queueStreamController = StreamController<QueueState>.broadcast();
final Map<Object, Tuple2<int, _Task>> _paused = {};
final SplayTreeMap<int, Queue<_Task>> _queues = SplayTreeMap();
_Task _running;
final Queue<_Task> _runningQueue = Queue();
// magic number
static const concurrentTaskMax = 4;
Stream<QueueState> get queueStream => _queueStreamController.stream;
@ -23,6 +26,7 @@ class ServicePolicy {
Object key,
}) {
_Task task;
key ??= platformCall.hashCode;
final priorityTask = _paused.remove(key);
if (priorityTask != null) {
debugPrint('resume task with key=$key');
@ -39,7 +43,7 @@ class ServicePolicy {
completer.completeError(error, stackTrace);
}
if (debugLabel != null) debugPrint('$runtimeType $debugLabel completed');
_running = null;
_runningQueue.removeWhere((task) => task.key == key);
_pickNext();
},
completer,
@ -64,10 +68,13 @@ class ServicePolicy {
void _pickNext() {
_notifyQueueState();
if (_running != null) return;
if (_runningQueue.length >= concurrentTaskMax) return;
final queue = _queues.entries.firstWhere((kv) => kv.value.isNotEmpty, orElse: () => null)?.value;
_running = queue?.removeFirst();
_running?.callback?.call();
final task = queue?.removeFirst();
if (task != null) {
_runningQueue.addLast(task);
task.callback();
}
}
bool _takeOut(Object key, Iterable<int> priorities, void Function(int priority, _Task task) action) {
@ -99,7 +106,7 @@ class ServicePolicy {
if (!_queueStreamController.hasListener) return;
final queueByPriority = Map.fromEntries(_queues.entries.map((kv) => MapEntry(kv.key, kv.value.length)));
_queueStreamController.add(QueueState(queueByPriority));
_queueStreamController.add(QueueState(queueByPriority, _runningQueue.length));
}
}
@ -124,6 +131,7 @@ class ServiceCallPriority {
class QueueState {
final Map<int, int> queueByPriority;
final int runningQueue;
const QueueState(this.queueByPriority);
const QueueState(this.queueByPriority, this.runningQueue);
}

View file

@ -20,7 +20,12 @@ class DebugTaskQueueOverlay extends StatelessWidget {
stream: servicePolicy.queueStream,
builder: (context, snapshot) {
if (snapshot.hasError) return SizedBox.shrink();
final queuedEntries = (snapshot.hasData ? snapshot.data.queueByPriority.entries.toList() : []);
final queuedEntries = <MapEntry<dynamic, int>>[];
if (snapshot.hasData) {
final state = snapshot.data;
queuedEntries.add(MapEntry('run', state.runningQueue));
queuedEntries.addAll(state.queueByPriority.entries.map((kv) => MapEntry(kv.key.toString(), kv.value)));
}
queuedEntries.sort((a, b) => a.key.compareTo(b.key));
return Column(
mainAxisSize: MainAxisSize.min,