From 4a6622de4963f9a1c29bc9f33f67b20394c21d7c Mon Sep 17 00:00:00 2001 From: Thibault Deckers Date: Thu, 17 Dec 2020 20:52:05 +0900 Subject: [PATCH] concurrent service tasks --- lib/services/service_policy.dart | 22 +++++++++++++++------- lib/widgets/debug/overlay.dart | 7 ++++++- 2 files changed, 21 insertions(+), 8 deletions(-) diff --git a/lib/services/service_policy.dart b/lib/services/service_policy.dart index 6794a75e5..c4f8fa043 100644 --- a/lib/services/service_policy.dart +++ b/lib/services/service_policy.dart @@ -10,7 +10,10 @@ class ServicePolicy { final StreamController _queueStreamController = StreamController.broadcast(); final Map> _paused = {}; final SplayTreeMap> _queues = SplayTreeMap(); - _Task _running; + final Queue<_Task> _runningQueue = Queue(); + + // magic number + static const concurrentTaskMax = 4; Stream get queueStream => _queueStreamController.stream; @@ -23,6 +26,7 @@ class ServicePolicy { Object key, }) { _Task task; + key ??= platformCall.hashCode; final priorityTask = _paused.remove(key); if (priorityTask != null) { debugPrint('resume task with key=$key'); @@ -39,7 +43,7 @@ class ServicePolicy { completer.completeError(error, stackTrace); } if (debugLabel != null) debugPrint('$runtimeType $debugLabel completed'); - _running = null; + _runningQueue.removeWhere((task) => task.key == key); _pickNext(); }, completer, @@ -64,10 +68,13 @@ class ServicePolicy { void _pickNext() { _notifyQueueState(); - if (_running != null) return; + if (_runningQueue.length >= concurrentTaskMax) return; final queue = _queues.entries.firstWhere((kv) => kv.value.isNotEmpty, orElse: () => null)?.value; - _running = queue?.removeFirst(); - _running?.callback?.call(); + final task = queue?.removeFirst(); + if (task != null) { + _runningQueue.addLast(task); + task.callback(); + } } bool _takeOut(Object key, Iterable priorities, void Function(int priority, _Task task) action) { @@ -99,7 +106,7 @@ class ServicePolicy { if (!_queueStreamController.hasListener) return; final queueByPriority = Map.fromEntries(_queues.entries.map((kv) => MapEntry(kv.key, kv.value.length))); - _queueStreamController.add(QueueState(queueByPriority)); + _queueStreamController.add(QueueState(queueByPriority, _runningQueue.length)); } } @@ -124,6 +131,7 @@ class ServiceCallPriority { class QueueState { final Map queueByPriority; + final int runningQueue; - const QueueState(this.queueByPriority); + const QueueState(this.queueByPriority, this.runningQueue); } diff --git a/lib/widgets/debug/overlay.dart b/lib/widgets/debug/overlay.dart index 6d90d9038..9c0081c0e 100644 --- a/lib/widgets/debug/overlay.dart +++ b/lib/widgets/debug/overlay.dart @@ -20,7 +20,12 @@ class DebugTaskQueueOverlay extends StatelessWidget { stream: servicePolicy.queueStream, builder: (context, snapshot) { if (snapshot.hasError) return SizedBox.shrink(); - final queuedEntries = (snapshot.hasData ? snapshot.data.queueByPriority.entries.toList() : []); + final queuedEntries = >[]; + if (snapshot.hasData) { + final state = snapshot.data; + queuedEntries.add(MapEntry('run', state.runningQueue)); + queuedEntries.addAll(state.queueByPriority.entries.map((kv) => MapEntry(kv.key.toString(), kv.value))); + } queuedEntries.sort((a, b) => a.key.compareTo(b.key)); return Column( mainAxisSize: MainAxisSize.min,