musikr: report invalid songs in pipeline

Avoids the bar getting "stuck"
This commit is contained in:
Alexander Capehart 2025-01-20 11:39:47 -07:00
parent d49286981c
commit 9d22cc37b8
No known key found for this signature in database
GPG key ID: 37DBE3621FE9AD47
2 changed files with 33 additions and 14 deletions

View file

@ -23,6 +23,7 @@ import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.buffer import kotlinx.coroutines.flow.buffer
import kotlinx.coroutines.flow.collect import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.filterIsInstance
import kotlinx.coroutines.flow.flowOn import kotlinx.coroutines.flow.flowOn
import kotlinx.coroutines.flow.map import kotlinx.coroutines.flow.map
import kotlinx.coroutines.flow.merge import kotlinx.coroutines.flow.merge
@ -57,10 +58,10 @@ private class EvaluateStepImpl(
) : EvaluateStep { ) : EvaluateStep {
override suspend fun evaluate(extractedMusic: Flow<ExtractedMusic>): MutableLibrary { override suspend fun evaluate(extractedMusic: Flow<ExtractedMusic>): MutableLibrary {
val filterFlow = val filterFlow =
extractedMusic.divert { extractedMusic.filterIsInstance<ExtractedMusic.Valid>().divert {
when (it) { when (it) {
is ExtractedMusic.Song -> Divert.Right(it.song) is ExtractedMusic.Valid.Song -> Divert.Right(it.song)
is ExtractedMusic.Playlist -> Divert.Left(it.file) is ExtractedMusic.Valid.Playlist -> Divert.Left(it.file)
} }
} }
val rawSongs = filterFlow.right val rawSongs = filterFlow.right

View file

@ -76,7 +76,7 @@ private class ExtractStepImpl(
} }
} }
val audioNodes = filterFlow.right val audioNodes = filterFlow.right
val playlistNodes = filterFlow.left.map { ExtractedMusic.Playlist(it) } val playlistNodes = filterFlow.left.map { ExtractedMusic.Valid.Playlist(it) }
val readDistributedFlow = audioNodes.distribute(8) val readDistributedFlow = audioNodes.distribute(8)
val cacheResults = val cacheResults =
@ -96,7 +96,7 @@ private class ExtractStepImpl(
is CacheResult.Miss -> Divert.Right(it.file) is CacheResult.Miss -> Divert.Right(it.file)
} }
} }
val cachedSongs = cacheFlow.left.map { ExtractedMusic.Song(it) } val cachedSongs = cacheFlow.left.map { ExtractedMusic.Valid.Song(it) }
val uncachedSongs = cacheFlow.right val uncachedSongs = cacheFlow.right
val fds = val fds =
@ -118,7 +118,7 @@ private class ExtractStepImpl(
wrap(fileWith.file) { _ -> wrap(fileWith.file) { _ ->
metadataExtractor metadataExtractor
.extract(fileWith.file, fileWith.with) .extract(fileWith.file, fileWith.with)
?.let { FileWith(fileWith.file, it) } .let { FileWith(fileWith.file, it) }
.also { withContext(Dispatchers.IO) { fileWith.with.close() } } .also { withContext(Dispatchers.IO) { fileWith.with.close() } }
} }
} }
@ -129,22 +129,34 @@ private class ExtractStepImpl(
val extractedSongs = val extractedSongs =
metadata metadata
.mapNotNull { fileWith -> .map { fileWith ->
val tags = tagParser.parse(fileWith.file, fileWith.with) if (fileWith.with != null) {
val cover = fileWith.with.cover?.let { storedCovers.write(it) } val tags = tagParser.parse(fileWith.file, fileWith.with)
RawSong(fileWith.file, fileWith.with.properties, tags, cover, addingMs) val cover = fileWith.with.cover?.let { storedCovers.write(it) }
RawSong(fileWith.file, fileWith.with.properties, tags, cover, addingMs)
} else {
null
}
} }
.flowOn(Dispatchers.IO) .flowOn(Dispatchers.IO)
.buffer(Channel.UNLIMITED) .buffer(Channel.UNLIMITED)
val writeDistributedFlow = extractedSongs.distribute(8) val extractedFilter =
extractedSongs.divert {
if (it != null) Divert.Left(it) else Divert.Right(ExtractedMusic.Invalid)
}
val write = extractedFilter.left
val invalid = extractedFilter.right
val writeDistributedFlow = write.distribute(8)
val writtenSongs = val writtenSongs =
writeDistributedFlow.flows writeDistributedFlow.flows
.map { flow -> .map { flow ->
flow flow
.map { .map {
wrap(it, cache::write) wrap(it, cache::write)
ExtractedMusic.Song(it) ExtractedMusic.Valid.Song(it)
} }
.flowOn(Dispatchers.IO) .flowOn(Dispatchers.IO)
.buffer(Channel.UNLIMITED) .buffer(Channel.UNLIMITED)
@ -157,8 +169,10 @@ private class ExtractStepImpl(
readDistributedFlow.manager, readDistributedFlow.manager,
cacheFlow.manager, cacheFlow.manager,
cachedSongs, cachedSongs,
extractedFilter.manager,
writeDistributedFlow.manager, writeDistributedFlow.manager,
writtenSongs, writtenSongs,
invalid,
playlistNodes) playlistNodes)
return merged.onCompletion { cache.finalize() } return merged.onCompletion { cache.finalize() }
@ -176,7 +190,11 @@ internal data class RawSong(
) )
internal sealed interface ExtractedMusic { internal sealed interface ExtractedMusic {
data class Song(val song: RawSong) : ExtractedMusic sealed interface Valid : ExtractedMusic {
data class Song(val song: RawSong) : Valid
data class Playlist(val file: PlaylistFile) : ExtractedMusic data class Playlist(val file: PlaylistFile) : Valid
}
data object Invalid : ExtractedMusic
} }