diff --git a/musikr/src/main/java/org/oxycblt/musikr/pipeline/ExtractStep.kt b/musikr/src/main/java/org/oxycblt/musikr/pipeline/ExtractStep.kt index 2b1e85c2f..ca220546c 100644 --- a/musikr/src/main/java/org/oxycblt/musikr/pipeline/ExtractStep.kt +++ b/musikr/src/main/java/org/oxycblt/musikr/pipeline/ExtractStep.kt @@ -73,9 +73,9 @@ private class ExtractStepImpl( val audioNodes = filterFlow.right val playlistNodes = filterFlow.left.map { ExtractedMusic.Playlist(it) } - val distributedAudioNodes = audioNodes.distribute(8) + val readDistributedFlow = audioNodes.distribute(8) val cacheResults = - distributedAudioNodes.flows + readDistributedFlow.flows .map { flow -> flow .map { wrap(it) { file -> cache.read(file, storedCovers) } } @@ -132,19 +132,26 @@ private class ExtractStepImpl( .flowOn(Dispatchers.IO) .buffer(Channel.UNLIMITED) + val writeDistributedFlow = extractedSongs.distribute(8) val writtenSongs = - extractedSongs - .map { - wrap(it, cache::write) - ExtractedMusic.Song(it) + writeDistributedFlow.flows + .map { flow -> + flow + .map { + wrap(it, cache::write) + ExtractedMusic.Song(it) + } + .flowOn(Dispatchers.IO) + .buffer(Channel.UNLIMITED) } - .flowOn(Dispatchers.IO) + .flattenMerge() return merge( filterFlow.manager, - distributedAudioNodes.manager, + readDistributedFlow.manager, cacheFlow.manager, cachedSongs, + writeDistributedFlow.manager, writtenSongs, playlistNodes) }