analysis: chunked work requests

This commit is contained in:
Thibault Deckers 2023-05-14 21:52:57 +02:00
parent 00346ebb31
commit 123e7c67cb
5 changed files with 45 additions and 13 deletions

View file

@ -63,6 +63,8 @@ class AnalysisWorker(context: Context, parameters: WorkerParameters) : Coroutine
"start", hashMapOf(
"entryIds" to inputData.getIntArray(KEY_ENTRY_IDS)?.toList(),
"force" to inputData.getBoolean(KEY_FORCE, false),
"progressTotal" to inputData.getInt(KEY_PROGRESS_TOTAL, 0),
"progressOffset" to inputData.getInt(KEY_PROGRESS_OFFSET, 0),
)
)
}
@ -169,5 +171,7 @@ class AnalysisWorker(context: Context, parameters: WorkerParameters) : Coroutine
const val KEY_ENTRY_IDS = "entry_ids"
const val KEY_FORCE = "force"
const val KEY_PROGRESS_TOTAL = "progress_total"
const val KEY_PROGRESS_OFFSET = "progress_offset"
}
}

View file

@ -3,6 +3,7 @@ package deckers.thibault.aves.channel.calls
import android.content.Context
import androidx.core.app.ComponentActivity
import androidx.work.ExistingWorkPolicy
import androidx.work.OneTimeWorkRequest
import androidx.work.OneTimeWorkRequestBuilder
import androidx.work.WorkInfo
import androidx.work.WorkManager
@ -51,20 +52,35 @@ class AnalysisHandler(private val activity: ComponentActivity, private val onAna
}
// can be null or empty
val entryIds = call.argument<List<Int>>("entryIds")
val allEntryIds = call.argument<List<Int>>("entryIds")
val progressTotal = allEntryIds?.size ?: 0
var progressOffset = 0
WorkManager.getInstance(activity).enqueueUniqueWork(
// work `Data` cannot occupy more than 10240 bytes when serialized
// so we split it when we have a long list of entry IDs
val chunked = allEntryIds?.chunked(WORK_DATA_CHUNK_SIZE) ?: listOf(null)
fun buildRequest(entryIds: List<Int>?, progressOffset: Int): OneTimeWorkRequest {
val workData = workDataOf(
AnalysisWorker.KEY_ENTRY_IDS to entryIds?.toIntArray(),
AnalysisWorker.KEY_FORCE to force,
AnalysisWorker.KEY_PROGRESS_TOTAL to progressTotal,
AnalysisWorker.KEY_PROGRESS_OFFSET to progressOffset,
)
return OneTimeWorkRequestBuilder<AnalysisWorker>().apply { setInputData(workData) }.build()
}
var work = WorkManager.getInstance(activity).beginUniqueWork(
ANALYSIS_WORK_NAME,
ExistingWorkPolicy.KEEP,
OneTimeWorkRequestBuilder<AnalysisWorker>().apply {
setInputData(
workDataOf(
AnalysisWorker.KEY_ENTRY_IDS to entryIds?.toIntArray(),
AnalysisWorker.KEY_FORCE to force,
)
)
}.build()
buildRequest(chunked.first(), progressOffset),
)
chunked.drop(1).forEach { entryIds ->
progressOffset += WORK_DATA_CHUNK_SIZE
work = work.then(buildRequest(entryIds, progressOffset))
}
work.enqueue()
attachToActivity()
result.success(null)
}
@ -89,5 +105,6 @@ class AnalysisHandler(private val activity: ComponentActivity, private val onAna
companion object {
const val CHANNEL = "deckers.thibault/aves/analysis"
private const val ANALYSIS_WORK_NAME = "analysis_work"
private const val WORK_DATA_CHUNK_SIZE = 1000
}
}

View file

@ -2,6 +2,7 @@ import 'package:flutter/foundation.dart';
class AnalysisController {
final bool canStartService, force;
final int progressTotal, progressOffset;
final List<int>? entryIds;
final ValueNotifier<bool> stopSignal;
@ -9,6 +10,8 @@ class AnalysisController {
this.canStartService = true,
this.entryIds,
this.force = false,
this.progressTotal = 0,
this.progressOffset = 0,
ValueNotifier<bool>? stopSignal,
}) : stopSignal = stopSignal ?? ValueNotifier(false);

View file

@ -34,8 +34,11 @@ mixin TagMixin on SourceBase {
if (todo.isEmpty) return;
state = SourceState.cataloguing;
var progressDone = 0;
final progressTotal = todo.length;
var progressDone = controller.progressOffset;
var progressTotal = controller.progressTotal;
if (progressTotal == 0) {
progressTotal = todo.length;
}
setProgress(done: progressDone, total: progressTotal);
var stopCheckCount = 0;

View file

@ -108,15 +108,20 @@ class Analyzer {
Future<void> start(dynamic args) async {
List<int>? entryIds;
var force = false;
var progressTotal = 0, progressOffset = 0;
if (args is Map) {
entryIds = (args['entryIds'] as List?)?.cast<int>();
force = args['force'] ?? false;
progressTotal = args['progressTotal'];
progressOffset = args['progressOffset'];
}
debugPrint('$runtimeType start for ${entryIds?.length ?? 'all'} entries');
debugPrint('$runtimeType start for ${entryIds?.length ?? 'all'} entries, at $progressOffset/$progressTotal');
_controller = AnalysisController(
canStartService: false,
entryIds: entryIds,
force: force,
progressTotal: progressTotal,
progressOffset: progressOffset,
stopSignal: ValueNotifier(false),
);