diff --git a/musikr/src/main/java/org/oxycblt/musikr/pipeline/ExtractStep.kt b/musikr/src/main/java/org/oxycblt/musikr/pipeline/ExtractStep.kt
index 60395be44..f9921d7c2 100644
--- a/musikr/src/main/java/org/oxycblt/musikr/pipeline/ExtractStep.kt
+++ b/musikr/src/main/java/org/oxycblt/musikr/pipeline/ExtractStep.kt
@@ -15,7 +15,7 @@
* You should have received a copy of the GNU General Public License
* along with this program. If not, see .
*/
-
+
package org.oxycblt.musikr.pipeline
import android.content.Context
@@ -23,6 +23,8 @@ import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.buffer
+import kotlinx.coroutines.flow.flatMapMerge
+import kotlinx.coroutines.flow.flattenMerge
import kotlinx.coroutines.flow.flowOn
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.flow.mapNotNull
@@ -51,7 +53,8 @@ internal interface ExtractStep {
MetadataExtractor.from(context),
TagParser.new(),
storage.cache,
- storage.storedCovers)
+ storage.storedCovers
+ )
}
}
@@ -73,10 +76,17 @@ private class ExtractStepImpl(
val audioNodes = filterFlow.right
val playlistNodes = filterFlow.left.map { ExtractedMusic.Playlist(it) }
+ val distributedAudioNodes = audioNodes.distribute(8)
val cacheResults =
- audioNodes
- .map { wrap(it) { file -> cache.read(file, storedCovers)} }
- .flowOn(Dispatchers.IO)
+ distributedAudioNodes.flows
+ .map { flow ->
+ flow.map {
+ wrap(it) { file -> cache.read(file, storedCovers) }
+ }
+ .flowOn(Dispatchers.IO)
+ .buffer(Channel.UNLIMITED)
+ }
+ .flattenMerge()
.buffer(Channel.UNLIMITED)
val cacheFlow =
cacheResults.divert {
@@ -104,13 +114,13 @@ private class ExtractStepImpl(
val metadata =
fds.mapNotNull { fileWith ->
- wrap(fileWith.file) { _ ->
- metadataExtractor
- .extract(fileWith.with)
- ?.let { FileWith(fileWith.file, it) }
- .also { withContext(Dispatchers.IO) { fileWith.with.close() } }
- }
+ wrap(fileWith.file) { _ ->
+ metadataExtractor
+ .extract(fileWith.with)
+ ?.let { FileWith(fileWith.file, it) }
+ .also { withContext(Dispatchers.IO) { fileWith.with.close() } }
}
+ }
.flowOn(Dispatchers.IO)
// Covers are pretty big, so cap the amount of parsed metadata in-memory to at most
// 8 to minimize GCs.
@@ -135,7 +145,13 @@ private class ExtractStepImpl(
.flowOn(Dispatchers.IO)
return merge(
- filterFlow.manager, cacheFlow.manager, cachedSongs, writtenSongs, playlistNodes)
+ filterFlow.manager,
+ distributedAudioNodes.manager,
+ cacheFlow.manager,
+ cachedSongs,
+ writtenSongs,
+ playlistNodes
+ )
}
private data class FileWith(val file: DeviceFile, val with: T)
diff --git a/musikr/src/main/java/org/oxycblt/musikr/pipeline/FlowUtil.kt b/musikr/src/main/java/org/oxycblt/musikr/pipeline/FlowUtil.kt
index f8716ba04..c45b083f9 100644
--- a/musikr/src/main/java/org/oxycblt/musikr/pipeline/FlowUtil.kt
+++ b/musikr/src/main/java/org/oxycblt/musikr/pipeline/FlowUtil.kt
@@ -59,7 +59,7 @@ internal inline fun Flow.divert(
return DivertedFlow(managedFlow, leftChannel.receiveAsFlow(), rightChannel.receiveAsFlow())
}
-internal class DistributedFlow(val manager: Flow, val flows: Array>)
+internal class DistributedFlow(val manager: Flow, val flows: Flow>)
/**
* Equally "distributes" the values of some flow across n new flows.
@@ -68,7 +68,7 @@ internal class DistributedFlow(val manager: Flow, val flows: Array Flow.distribute(n: Int): DistributedFlow {
- val posChannels = Array(n) { Channel(Channel.UNLIMITED) }
+ val posChannels = List(n) { Channel(Channel.UNLIMITED) }
val managerFlow =
flow {
withIndex().collect {
@@ -79,6 +79,6 @@ internal fun Flow.distribute(n: Int): DistributedFlow {
channel.close()
}
}
- val hotFlows = posChannels.map { it.receiveAsFlow() }.toTypedArray()
+ val hotFlows = posChannels.asFlow().map { it.receiveAsFlow() }
return DistributedFlow(managerFlow, hotFlows)
}