From 6bad9e719d8ac527c58da07269f4361d8f2fb30c Mon Sep 17 00:00:00 2001 From: Alexander Capehart Date: Fri, 20 Dec 2024 22:12:01 -0500 Subject: [PATCH] musikr.pipeline: parallelize cache reads --- .../oxycblt/musikr/pipeline/ExtractStep.kt | 40 +++++++++++++------ .../org/oxycblt/musikr/pipeline/FlowUtil.kt | 6 +-- 2 files changed, 31 insertions(+), 15 deletions(-) diff --git a/musikr/src/main/java/org/oxycblt/musikr/pipeline/ExtractStep.kt b/musikr/src/main/java/org/oxycblt/musikr/pipeline/ExtractStep.kt index 60395be44..f9921d7c2 100644 --- a/musikr/src/main/java/org/oxycblt/musikr/pipeline/ExtractStep.kt +++ b/musikr/src/main/java/org/oxycblt/musikr/pipeline/ExtractStep.kt @@ -15,7 +15,7 @@ * You should have received a copy of the GNU General Public License * along with this program. If not, see . */ - + package org.oxycblt.musikr.pipeline import android.content.Context @@ -23,6 +23,8 @@ import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.buffer +import kotlinx.coroutines.flow.flatMapMerge +import kotlinx.coroutines.flow.flattenMerge import kotlinx.coroutines.flow.flowOn import kotlinx.coroutines.flow.map import kotlinx.coroutines.flow.mapNotNull @@ -51,7 +53,8 @@ internal interface ExtractStep { MetadataExtractor.from(context), TagParser.new(), storage.cache, - storage.storedCovers) + storage.storedCovers + ) } } @@ -73,10 +76,17 @@ private class ExtractStepImpl( val audioNodes = filterFlow.right val playlistNodes = filterFlow.left.map { ExtractedMusic.Playlist(it) } + val distributedAudioNodes = audioNodes.distribute(8) val cacheResults = - audioNodes - .map { wrap(it) { file -> cache.read(file, storedCovers)} } - .flowOn(Dispatchers.IO) + distributedAudioNodes.flows + .map { flow -> + flow.map { + wrap(it) { file -> cache.read(file, storedCovers) } + } + .flowOn(Dispatchers.IO) + .buffer(Channel.UNLIMITED) + } + .flattenMerge() .buffer(Channel.UNLIMITED) val cacheFlow = cacheResults.divert { @@ -104,13 +114,13 @@ private class ExtractStepImpl( val metadata = fds.mapNotNull { fileWith -> - wrap(fileWith.file) { _ -> - metadataExtractor - .extract(fileWith.with) - ?.let { FileWith(fileWith.file, it) } - .also { withContext(Dispatchers.IO) { fileWith.with.close() } } - } + wrap(fileWith.file) { _ -> + metadataExtractor + .extract(fileWith.with) + ?.let { FileWith(fileWith.file, it) } + .also { withContext(Dispatchers.IO) { fileWith.with.close() } } } + } .flowOn(Dispatchers.IO) // Covers are pretty big, so cap the amount of parsed metadata in-memory to at most // 8 to minimize GCs. @@ -135,7 +145,13 @@ private class ExtractStepImpl( .flowOn(Dispatchers.IO) return merge( - filterFlow.manager, cacheFlow.manager, cachedSongs, writtenSongs, playlistNodes) + filterFlow.manager, + distributedAudioNodes.manager, + cacheFlow.manager, + cachedSongs, + writtenSongs, + playlistNodes + ) } private data class FileWith(val file: DeviceFile, val with: T) diff --git a/musikr/src/main/java/org/oxycblt/musikr/pipeline/FlowUtil.kt b/musikr/src/main/java/org/oxycblt/musikr/pipeline/FlowUtil.kt index f8716ba04..c45b083f9 100644 --- a/musikr/src/main/java/org/oxycblt/musikr/pipeline/FlowUtil.kt +++ b/musikr/src/main/java/org/oxycblt/musikr/pipeline/FlowUtil.kt @@ -59,7 +59,7 @@ internal inline fun Flow.divert( return DivertedFlow(managedFlow, leftChannel.receiveAsFlow(), rightChannel.receiveAsFlow()) } -internal class DistributedFlow(val manager: Flow, val flows: Array>) +internal class DistributedFlow(val manager: Flow, val flows: Flow>) /** * Equally "distributes" the values of some flow across n new flows. @@ -68,7 +68,7 @@ internal class DistributedFlow(val manager: Flow, val flows: Array Flow.distribute(n: Int): DistributedFlow { - val posChannels = Array(n) { Channel(Channel.UNLIMITED) } + val posChannels = List(n) { Channel(Channel.UNLIMITED) } val managerFlow = flow { withIndex().collect { @@ -79,6 +79,6 @@ internal fun Flow.distribute(n: Int): DistributedFlow { channel.close() } } - val hotFlows = posChannels.map { it.receiveAsFlow() }.toTypedArray() + val hotFlows = posChannels.asFlow().map { it.receiveAsFlow() } return DistributedFlow(managerFlow, hotFlows) }