musikr: steamline loading pipeline

My hope is that overall this is more efficient and also easier to under
stand long-term.
This commit is contained in:
Alexander Capehart 2025-03-15 22:25:44 -06:00
parent b3c66d9b55
commit c2dcbd61f8
No known key found for this signature in database
GPG key ID: 37DBE3621FE9AD47
3 changed files with 123 additions and 55 deletions

View file

@ -21,12 +21,12 @@ package org.oxycblt.musikr.pipeline
import android.content.Context import android.content.Context
import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.asFlow import kotlinx.coroutines.flow.asFlow
import kotlinx.coroutines.flow.buffer import kotlinx.coroutines.flow.buffer
import kotlinx.coroutines.flow.emitAll import kotlinx.coroutines.flow.emitAll
import kotlinx.coroutines.flow.filter import kotlinx.coroutines.flow.filter
import kotlinx.coroutines.flow.flattenMerge
import kotlinx.coroutines.flow.flow import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.flow.flowOn import kotlinx.coroutines.flow.flowOn
import kotlinx.coroutines.flow.map import kotlinx.coroutines.flow.map
@ -35,6 +35,7 @@ import org.oxycblt.musikr.Interpretation
import org.oxycblt.musikr.Storage import org.oxycblt.musikr.Storage
import org.oxycblt.musikr.cache.Cache import org.oxycblt.musikr.cache.Cache
import org.oxycblt.musikr.cache.CacheResult import org.oxycblt.musikr.cache.CacheResult
import org.oxycblt.musikr.cache.CachedSong
import org.oxycblt.musikr.covers.Cover import org.oxycblt.musikr.covers.Cover
import org.oxycblt.musikr.covers.CoverResult import org.oxycblt.musikr.covers.CoverResult
import org.oxycblt.musikr.covers.Covers import org.oxycblt.musikr.covers.Covers
@ -71,38 +72,54 @@ private class ExploreStepImpl(
locations.asFlow(), locations.asFlow(),
) )
.filter { it.mimeType.startsWith("audio/") || it.mimeType == M3U.MIME_TYPE } .filter { it.mimeType.startsWith("audio/") || it.mimeType == M3U.MIME_TYPE }
.distribute(8) .distributedMap(n = 8, on = Dispatchers.IO, buffer = Channel.UNLIMITED) { file ->
.distributedMap { file ->
val cachedSong =
when (val cacheResult = cache.read(file)) { when (val cacheResult = cache.read(file)) {
is CacheResult.Hit -> cacheResult.song is CacheResult.Hit -> NeedsCover(cacheResult.song)
is CacheResult.Stale -> is CacheResult.Stale ->
return@distributedMap NewSong(cacheResult.file, cacheResult.addedMs) Finalized(NewSong(cacheResult.file, cacheResult.addedMs))
is CacheResult.Miss -> is CacheResult.Miss -> Finalized(NewSong(cacheResult.file, addingMs))
return@distributedMap NewSong(cacheResult.file, addingMs)
}
val cover =
cachedSong.coverId?.let { coverId ->
when (val coverResult = covers.obtain(coverId)) {
is CoverResult.Hit -> coverResult.cover
else ->
return@distributedMap NewSong(
cachedSong.file, cachedSong.addedMs)
} }
} }
RawSong(
cachedSong.file,
cachedSong.properties,
cachedSong.tags,
cover,
cachedSong.addedMs)
}
.flattenMerge()
.flowOn(Dispatchers.IO) .flowOn(Dispatchers.IO)
.buffer(), .buffer(Channel.UNLIMITED)
.distributedMap(n = 8, on = Dispatchers.IO, buffer = Channel.UNLIMITED) {
when (it) {
is Finalized -> it
is NeedsCover -> {
when (val coverResult = it.song.coverId?.let { covers.obtain(it) }) {
is CoverResult.Hit ->
Finalized(
RawSong(
it.song.file,
it.song.properties,
it.song.tags,
coverResult.cover,
it.song.addedMs))
null ->
Finalized(
RawSong(
it.song.file,
it.song.properties,
it.song.tags,
null,
it.song.addedMs))
else -> Finalized(NewSong(it.song.file, it.song.addedMs))
}
}
}
}
.map { it.explored }
.flowOn(Dispatchers.IO)
.buffer(Channel.UNLIMITED),
flow { emitAll(storedPlaylists.read().asFlow()) } flow { emitAll(storedPlaylists.read().asFlow()) }
.map { RawPlaylist(it) } .map { RawPlaylist(it) }
.flowOn(Dispatchers.IO) .flowOn(Dispatchers.IO)
.buffer()) .buffer())
} }
private sealed interface InternalExploreItem
private data class NeedsCover(val song: CachedSong) : InternalExploreItem
private data class Finalized(val explored: Explored) : InternalExploreItem
} }

View file

@ -19,9 +19,12 @@
package org.oxycblt.musikr.pipeline package org.oxycblt.musikr.pipeline
import android.content.Context import android.content.Context
import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.flattenMerge import kotlinx.coroutines.flow.buffer
import kotlinx.coroutines.flow.flowOn
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.flow.onCompletion import kotlinx.coroutines.flow.onCompletion
import org.oxycblt.musikr.Storage import org.oxycblt.musikr.Storage
import org.oxycblt.musikr.cache.CachedSong import org.oxycblt.musikr.cache.CachedSong
@ -29,6 +32,7 @@ import org.oxycblt.musikr.cache.MutableCache
import org.oxycblt.musikr.covers.Cover import org.oxycblt.musikr.covers.Cover
import org.oxycblt.musikr.covers.CoverResult import org.oxycblt.musikr.covers.CoverResult
import org.oxycblt.musikr.covers.MutableCovers import org.oxycblt.musikr.covers.MutableCovers
import org.oxycblt.musikr.metadata.Metadata
import org.oxycblt.musikr.metadata.MetadataExtractor import org.oxycblt.musikr.metadata.MetadataExtractor
import org.oxycblt.musikr.tag.parse.TagParser import org.oxycblt.musikr.tag.parse.TagParser
@ -48,34 +52,71 @@ private class ExtractStepImpl(
private val cache: MutableCache, private val cache: MutableCache,
private val covers: MutableCovers<out Cover> private val covers: MutableCovers<out Cover>
) : ExtractStep { ) : ExtractStep {
@OptIn(ExperimentalCoroutinesApi::class)
override fun extract(nodes: Flow<Explored>): Flow<Extracted> { override fun extract(nodes: Flow<Explored>): Flow<Extracted> {
val exclude = mutableListOf<CachedSong>() val exclude = mutableListOf<CachedSong>()
return nodes return nodes
.distribute(8) // Cover art is huge, so we have to kneecap the concurrency here to avoid excessive
.distributedMap { // GCs. We still reap the concurrency benefits here, just not as much as we could.
.distributedMap(on = Dispatchers.IO, n = 8, buffer = Channel.RENDEZVOUS) {
when (it) { when (it) {
is RawSong -> it is RawSong -> Finalized(it)
is RawPlaylist -> it is RawPlaylist -> Finalized(it)
is NewSong -> { is NewSong -> {
val metadata = val metadata = metadataExtractor.extract(it.file)
metadataExtractor.extract(it.file) ?: return@distributedMap InvalidSong if (metadata != null) NeedsParsing(it, metadata) else Finalized(InvalidSong)
val tags = tagParser.parse(metadata) }
}
}
.flowOn(Dispatchers.IO)
.buffer(Channel.RENDEZVOUS)
.distributedMap(on = Dispatchers.IO, n = 8, buffer = Channel.UNLIMITED) {
when (it) {
is Finalized -> it
is NeedsParsing -> {
val tags = tagParser.parse(it.metadata)
val cover = val cover =
when (val result = covers.create(it.file, metadata)) { when (val result = covers.create(it.song.file, it.metadata)) {
is CoverResult.Hit -> result.cover is CoverResult.Hit -> result.cover
else -> null else -> null
} }
NeedsCaching(
RawSong(
it.song.file, it.metadata.properties, tags, cover, it.song.addedMs))
}
}
}
.flowOn(Dispatchers.IO)
.buffer(Channel.UNLIMITED)
.distributedMap(on = Dispatchers.IO, n = 8, buffer = Channel.UNLIMITED) {
when (it) {
is Finalized -> it
is NeedsCaching -> {
val cachedSong = val cachedSong =
CachedSong(it.file, metadata.properties, tags, cover?.id, it.addedMs) CachedSong(
it.song.file,
it.song.properties,
it.song.tags,
it.song.cover?.id,
it.song.addedMs)
cache.write(cachedSong) cache.write(cachedSong)
exclude.add(cachedSong) exclude.add(cachedSong)
val rawSong = RawSong(it.file, metadata.properties, tags, cover, it.addedMs) Finalized(it.song)
rawSong
} }
} }
} }
.flattenMerge() .map { it.extracted }
.flowOn(Dispatchers.IO)
.buffer(Channel.UNLIMITED)
.onCompletion { cache.cleanup(exclude) } .onCompletion { cache.cleanup(exclude) }
} }
private sealed interface ParsedExtractItem
private data class NeedsParsing(val song: NewSong, val metadata: Metadata) : ParsedExtractItem
private sealed interface ParsedCachingItem
private data class NeedsCaching(val song: RawSong) : ParsedCachingItem
private data class Finalized(val extracted: Extracted) : ParsedExtractItem, ParsedCachingItem
} }

View file

@ -18,10 +18,16 @@
package org.oxycblt.musikr.pipeline package org.oxycblt.musikr.pipeline
import kotlin.coroutines.CoroutineContext
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.asFlow import kotlinx.coroutines.flow.asFlow
import kotlinx.coroutines.flow.buffer
import kotlinx.coroutines.flow.flattenMerge
import kotlinx.coroutines.flow.flow import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.flow.flowOn
import kotlinx.coroutines.flow.map import kotlinx.coroutines.flow.map
import kotlinx.coroutines.flow.receiveAsFlow import kotlinx.coroutines.flow.receiveAsFlow
import kotlinx.coroutines.flow.withIndex import kotlinx.coroutines.flow.withIndex
@ -32,7 +38,13 @@ import kotlinx.coroutines.flow.withIndex
* Note that this function requires the "manager" flow to be consumed alongside the split flows in * Note that this function requires the "manager" flow to be consumed alongside the split flows in
* order to function. Without this, all of the newly split flows will simply block. * order to function. Without this, all of the newly split flows will simply block.
*/ */
internal fun <T> Flow<T>.distribute(n: Int): Flow<Flow<T>> { @OptIn(ExperimentalCoroutinesApi::class)
internal fun <T, R> Flow<T>.distributedMap(
n: Int,
on: CoroutineContext = Dispatchers.Main,
buffer: Int = Channel.UNLIMITED,
block: suspend (T) -> R,
): Flow<R> {
val posChannels = List(n) { Channel<T>(Channel.UNLIMITED) } val posChannels = List(n) { Channel<T>(Channel.UNLIMITED) }
val managerFlow = val managerFlow =
flow<Nothing> { flow<Nothing> {
@ -44,14 +56,12 @@ internal fun <T> Flow<T>.distribute(n: Int): Flow<Flow<T>> {
channel.close() channel.close()
} }
} }
return (posChannels.map { it.receiveAsFlow() } + managerFlow).asFlow() return (posChannels.map { it.receiveAsFlow() } + managerFlow)
.asFlow()
.map { it.tryMap(block).flowOn(on).buffer(buffer) }
.flattenMerge()
} }
internal fun <T, R> Flow<Flow<T>>.distributedMap(transform: suspend (T) -> R): Flow<Flow<R>> =
flow {
collect { innerFlow -> emit(innerFlow.tryMap(transform)) }
}
internal fun <T, R> Flow<T>.tryMap(transform: suspend (T) -> R): Flow<R> = flow { internal fun <T, R> Flow<T>.tryMap(transform: suspend (T) -> R): Flow<R> = flow {
collect { value -> collect { value ->
try { try {