perf: improved task pause/resume

This commit is contained in:
Thibault Deckers 2021-02-02 19:54:28 +09:00
parent e4ed5ef751
commit e02593def3
4 changed files with 54 additions and 62 deletions

View file

@ -21,7 +21,7 @@ class ThumbnailProvider extends ImageProvider<ThumbnailProviderKey> {
ImageStreamCompleter load(ThumbnailProviderKey key, DecoderCallback decode) {
return MultiFrameImageStreamCompleter(
codec: _loadAsync(key, decode),
scale: key.scale,
scale: 1.0,
informationCollector: () sync* {
yield ErrorDescription('uri=${key.uri}, pageId=${key.pageId}, mimeType=${key.mimeType}, extent=${key.extent}');
},
@ -69,7 +69,7 @@ class ThumbnailProviderKey {
final int pageId, rotationDegrees;
final bool isFlipped;
final int dateModifiedSecs;
final double extent, scale;
final double extent;
const ThumbnailProviderKey({
@required this.uri,
@ -79,33 +79,27 @@ class ThumbnailProviderKey {
@required this.isFlipped,
@required this.dateModifiedSecs,
this.extent = 0,
this.scale = 1,
}) : assert(uri != null),
assert(mimeType != null),
assert(rotationDegrees != null),
assert(isFlipped != null),
assert(dateModifiedSecs != null),
assert(extent != null),
assert(scale != null);
assert(extent != null);
@override
bool operator ==(Object other) {
if (other.runtimeType != runtimeType) return false;
return other is ThumbnailProviderKey && other.uri == uri && other.mimeType == mimeType && other.pageId == pageId && other.rotationDegrees == rotationDegrees && other.isFlipped == isFlipped && other.dateModifiedSecs == dateModifiedSecs && other.extent == extent && other.scale == scale;
return other is ThumbnailProviderKey && other.uri == uri && other.pageId == pageId && other.dateModifiedSecs == dateModifiedSecs && other.extent == extent;
}
@override
int get hashCode => hashValues(
uri,
mimeType,
pageId,
rotationDegrees,
isFlipped,
dateModifiedSecs,
extent,
scale,
);
@override
String toString() => '$runtimeType#${shortHash(this)}{uri=$uri, mimeType=$mimeType, pageId=$pageId, rotationDegrees=$rotationDegrees, isFlipped=$isFlipped, dateModifiedSecs=$dateModifiedSecs, extent=$extent, scale=$scale}';
String toString() => '$runtimeType#${shortHash(this)}{uri=$uri, mimeType=$mimeType, pageId=$pageId, rotationDegrees=$rotationDegrees, isFlipped=$isFlipped, dateModifiedSecs=$dateModifiedSecs, extent=$extent}';
}

View file

@ -204,7 +204,6 @@ class ImageFileService {
}
return null;
},
// debugLabel: 'getThumbnail width=$width, height=$height entry=${entry.filenameWithoutExtension}',
priority: priority ?? (extent == 0 ? ServiceCallPriority.getFastThumbnail : ServiceCallPriority.getSizedThumbnail),
key: taskKey,
);

View file

@ -9,8 +9,8 @@ final ServicePolicy servicePolicy = ServicePolicy._private();
class ServicePolicy {
final StreamController<QueueState> _queueStreamController = StreamController<QueueState>.broadcast();
final Map<Object, Tuple2<int, _Task>> _paused = {};
final SplayTreeMap<int, Queue<_Task>> _queues = SplayTreeMap();
final Queue<_Task> _runningQueue = Queue();
final SplayTreeMap<int, LinkedHashMap<Object, _Task>> _queues = SplayTreeMap();
final LinkedHashMap<Object, _Task> _runningQueue = LinkedHashMap();
// magic number
static const concurrentTaskMax = 4;
@ -22,57 +22,59 @@ class ServicePolicy {
Future<T> call<T>(
Future<T> Function() platformCall, {
int priority = ServiceCallPriority.normal,
String debugLabel,
Object key,
}) {
Completer<T> completer;
_Task task;
key ??= platformCall.hashCode;
final priorityTask = _paused.remove(key);
if (priorityTask != null) {
debugPrint('resume task with key=$key');
priority = priorityTask.item1;
task = priorityTask.item2;
final toResume = _paused.remove(key);
if (toResume != null) {
priority = toResume.item1;
task = toResume.item2;
completer = task.completer;
} else {
completer = Completer<T>();
task = _Task(
() async {
try {
completer.complete(await platformCall());
} catch (error, stackTrace) {
completer.completeError(error, stackTrace);
}
_runningQueue.remove(key);
_pickNext();
},
completer,
);
}
var completer = task?.completer ?? Completer<T>();
task ??= _Task(
() async {
if (debugLabel != null) debugPrint('$runtimeType $debugLabel start');
try {
completer.complete(await platformCall());
} catch (error, stackTrace) {
completer.completeError(error, stackTrace);
}
if (debugLabel != null) debugPrint('$runtimeType $debugLabel completed');
_runningQueue.removeWhere((task) => task.key == key);
_pickNext();
},
completer,
key,
);
_getQueue(priority).addLast(task);
_getQueue(priority)[key] = task;
_pickNext();
return completer.future;
}
Future<T> resume<T>(Object key) {
final priorityTask = _paused.remove(key);
if (priorityTask == null) return null;
final priority = priorityTask.item1;
final task = priorityTask.item2;
_getQueue(priority).addLast(task);
_pickNext();
return task.completer.future;
final toResume = _paused.remove(key);
if (toResume != null) {
final priority = toResume.item1;
final task = toResume.item2;
_getQueue(priority)[key] = task;
_pickNext();
return task.completer.future;
} else {
return null;
}
}
Queue<_Task> _getQueue(int priority) => _queues.putIfAbsent(priority, () => Queue<_Task>());
LinkedHashMap<Object, _Task> _getQueue(int priority) => _queues.putIfAbsent(priority, () => LinkedHashMap());
void _pickNext() {
_notifyQueueState();
if (_runningQueue.length >= concurrentTaskMax) return;
final queue = _queues.entries.firstWhere((kv) => kv.value.isNotEmpty, orElse: () => null)?.value;
final task = queue?.removeFirst();
if (task != null) {
_runningQueue.addLast(task);
if (queue != null && queue.isNotEmpty) {
final key = queue.keys.first;
final task = queue.remove(key);
_runningQueue[key] = task;
task.callback();
}
}
@ -80,14 +82,11 @@ class ServicePolicy {
bool _takeOut(Object key, Iterable<int> priorities, void Function(int priority, _Task task) action) {
var out = false;
priorities.forEach((priority) {
final queue = _getQueue(priority);
final tasks = queue.where((task) => task.key == key).toList();
tasks.forEach((task) {
if (queue.remove(task)) {
out = true;
action(priority, task);
}
});
final task = _getQueue(priority).remove(key);
if (task != null) {
out = true;
action(priority, task);
}
});
return out;
}
@ -106,16 +105,15 @@ class ServicePolicy {
if (!_queueStreamController.hasListener) return;
final queueByPriority = Map.fromEntries(_queues.entries.map((kv) => MapEntry(kv.key, kv.value.length)));
_queueStreamController.add(QueueState(queueByPriority, _runningQueue.length));
_queueStreamController.add(QueueState(queueByPriority, _runningQueue.length, _paused.length));
}
}
class _Task {
final VoidCallback callback;
final Completer completer;
final Object key;
const _Task(this.callback, this.completer, this.key);
const _Task(this.callback, this.completer);
}
class CancelledException {}
@ -131,7 +129,7 @@ class ServiceCallPriority {
class QueueState {
final Map<int, int> queueByPriority;
final int runningQueue;
final int runningCount, pausedCount;
const QueueState(this.queueByPriority, this.runningQueue);
const QueueState(this.queueByPriority, this.runningCount, this.pausedCount);
}

View file

@ -24,7 +24,8 @@ class DebugTaskQueueOverlay extends StatelessWidget {
final queuedEntries = <MapEntry<dynamic, int>>[];
if (snapshot.hasData) {
final state = snapshot.data;
queuedEntries.add(MapEntry('run', state.runningQueue));
queuedEntries.add(MapEntry('run', state.runningCount));
queuedEntries.add(MapEntry('paused', state.pausedCount));
queuedEntries.addAll(state.queueByPriority.entries.map((kv) => MapEntry(kv.key.toString(), kv.value)));
}
queuedEntries.sort((a, b) => a.key.compareTo(b.key));