From c4a4b69cd1da7894f9b766173942587f9dc654c0 Mon Sep 17 00:00:00 2001 From: Alexander Capehart Date: Fri, 20 Dec 2024 22:21:24 -0500 Subject: [PATCH] musikr.pipeline: parallelize cache writes --- .../oxycblt/musikr/pipeline/ExtractStep.kt | 23 ++++++++++++------- 1 file changed, 15 insertions(+), 8 deletions(-) diff --git a/musikr/src/main/java/org/oxycblt/musikr/pipeline/ExtractStep.kt b/musikr/src/main/java/org/oxycblt/musikr/pipeline/ExtractStep.kt index 2b1e85c2f..ca220546c 100644 --- a/musikr/src/main/java/org/oxycblt/musikr/pipeline/ExtractStep.kt +++ b/musikr/src/main/java/org/oxycblt/musikr/pipeline/ExtractStep.kt @@ -73,9 +73,9 @@ private class ExtractStepImpl( val audioNodes = filterFlow.right val playlistNodes = filterFlow.left.map { ExtractedMusic.Playlist(it) } - val distributedAudioNodes = audioNodes.distribute(8) + val readDistributedFlow = audioNodes.distribute(8) val cacheResults = - distributedAudioNodes.flows + readDistributedFlow.flows .map { flow -> flow .map { wrap(it) { file -> cache.read(file, storedCovers) } } @@ -132,19 +132,26 @@ private class ExtractStepImpl( .flowOn(Dispatchers.IO) .buffer(Channel.UNLIMITED) + val writeDistributedFlow = extractedSongs.distribute(8) val writtenSongs = - extractedSongs - .map { - wrap(it, cache::write) - ExtractedMusic.Song(it) + writeDistributedFlow.flows + .map { flow -> + flow + .map { + wrap(it, cache::write) + ExtractedMusic.Song(it) + } + .flowOn(Dispatchers.IO) + .buffer(Channel.UNLIMITED) } - .flowOn(Dispatchers.IO) + .flattenMerge() return merge( filterFlow.manager, - distributedAudioNodes.manager, + readDistributedFlow.manager, cacheFlow.manager, cachedSongs, + writeDistributedFlow.manager, writtenSongs, playlistNodes) }