From 9d22cc37b8fb317f0834976f0eed43942f58a6a7 Mon Sep 17 00:00:00 2001 From: Alexander Capehart Date: Mon, 20 Jan 2025 11:39:47 -0700 Subject: [PATCH] musikr: report invalid songs in pipeline Avoids the bar getting "stuck" --- .../oxycblt/musikr/pipeline/EvaluateStep.kt | 7 ++-- .../oxycblt/musikr/pipeline/ExtractStep.kt | 40 ++++++++++++++----- 2 files changed, 33 insertions(+), 14 deletions(-) diff --git a/musikr/src/main/java/org/oxycblt/musikr/pipeline/EvaluateStep.kt b/musikr/src/main/java/org/oxycblt/musikr/pipeline/EvaluateStep.kt index 2ccc992be..df4f72cb1 100644 --- a/musikr/src/main/java/org/oxycblt/musikr/pipeline/EvaluateStep.kt +++ b/musikr/src/main/java/org/oxycblt/musikr/pipeline/EvaluateStep.kt @@ -23,6 +23,7 @@ import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.buffer import kotlinx.coroutines.flow.collect +import kotlinx.coroutines.flow.filterIsInstance import kotlinx.coroutines.flow.flowOn import kotlinx.coroutines.flow.map import kotlinx.coroutines.flow.merge @@ -57,10 +58,10 @@ private class EvaluateStepImpl( ) : EvaluateStep { override suspend fun evaluate(extractedMusic: Flow): MutableLibrary { val filterFlow = - extractedMusic.divert { + extractedMusic.filterIsInstance().divert { when (it) { - is ExtractedMusic.Song -> Divert.Right(it.song) - is ExtractedMusic.Playlist -> Divert.Left(it.file) + is ExtractedMusic.Valid.Song -> Divert.Right(it.song) + is ExtractedMusic.Valid.Playlist -> Divert.Left(it.file) } } val rawSongs = filterFlow.right diff --git a/musikr/src/main/java/org/oxycblt/musikr/pipeline/ExtractStep.kt b/musikr/src/main/java/org/oxycblt/musikr/pipeline/ExtractStep.kt index bcf1f9fd5..858421457 100644 --- a/musikr/src/main/java/org/oxycblt/musikr/pipeline/ExtractStep.kt +++ b/musikr/src/main/java/org/oxycblt/musikr/pipeline/ExtractStep.kt @@ -76,7 +76,7 @@ private class ExtractStepImpl( } } val audioNodes = filterFlow.right - val playlistNodes = filterFlow.left.map { ExtractedMusic.Playlist(it) } + val playlistNodes = filterFlow.left.map { ExtractedMusic.Valid.Playlist(it) } val readDistributedFlow = audioNodes.distribute(8) val cacheResults = @@ -96,7 +96,7 @@ private class ExtractStepImpl( is CacheResult.Miss -> Divert.Right(it.file) } } - val cachedSongs = cacheFlow.left.map { ExtractedMusic.Song(it) } + val cachedSongs = cacheFlow.left.map { ExtractedMusic.Valid.Song(it) } val uncachedSongs = cacheFlow.right val fds = @@ -118,7 +118,7 @@ private class ExtractStepImpl( wrap(fileWith.file) { _ -> metadataExtractor .extract(fileWith.file, fileWith.with) - ?.let { FileWith(fileWith.file, it) } + .let { FileWith(fileWith.file, it) } .also { withContext(Dispatchers.IO) { fileWith.with.close() } } } } @@ -129,22 +129,34 @@ private class ExtractStepImpl( val extractedSongs = metadata - .mapNotNull { fileWith -> - val tags = tagParser.parse(fileWith.file, fileWith.with) - val cover = fileWith.with.cover?.let { storedCovers.write(it) } - RawSong(fileWith.file, fileWith.with.properties, tags, cover, addingMs) + .map { fileWith -> + if (fileWith.with != null) { + val tags = tagParser.parse(fileWith.file, fileWith.with) + val cover = fileWith.with.cover?.let { storedCovers.write(it) } + RawSong(fileWith.file, fileWith.with.properties, tags, cover, addingMs) + } else { + null + } } .flowOn(Dispatchers.IO) .buffer(Channel.UNLIMITED) - val writeDistributedFlow = extractedSongs.distribute(8) + val extractedFilter = + extractedSongs.divert { + if (it != null) Divert.Left(it) else Divert.Right(ExtractedMusic.Invalid) + } + + val write = extractedFilter.left + val invalid = extractedFilter.right + + val writeDistributedFlow = write.distribute(8) val writtenSongs = writeDistributedFlow.flows .map { flow -> flow .map { wrap(it, cache::write) - ExtractedMusic.Song(it) + ExtractedMusic.Valid.Song(it) } .flowOn(Dispatchers.IO) .buffer(Channel.UNLIMITED) @@ -157,8 +169,10 @@ private class ExtractStepImpl( readDistributedFlow.manager, cacheFlow.manager, cachedSongs, + extractedFilter.manager, writeDistributedFlow.manager, writtenSongs, + invalid, playlistNodes) return merged.onCompletion { cache.finalize() } @@ -176,7 +190,11 @@ internal data class RawSong( ) internal sealed interface ExtractedMusic { - data class Song(val song: RawSong) : ExtractedMusic + sealed interface Valid : ExtractedMusic { + data class Song(val song: RawSong) : Valid - data class Playlist(val file: PlaylistFile) : ExtractedMusic + data class Playlist(val file: PlaylistFile) : Valid + } + + data object Invalid : ExtractedMusic }