diff --git a/app/src/main/java/org/oxycblt/auxio/music/stack/explore/Explorer.kt b/app/src/main/java/org/oxycblt/auxio/music/stack/explore/Explorer.kt index 28f3c9f4c..1726994aa 100644 --- a/app/src/main/java/org/oxycblt/auxio/music/stack/explore/Explorer.kt +++ b/app/src/main/java/org/oxycblt/auxio/music/stack/explore/Explorer.kt @@ -20,23 +20,20 @@ package org.oxycblt.auxio.music.stack.explore import android.net.Uri import javax.inject.Inject -import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.ExperimentalCoroutinesApi +import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.flow.Flow -import kotlinx.coroutines.flow.SharingStarted import kotlinx.coroutines.flow.asFlow import kotlinx.coroutines.flow.buffer -import kotlinx.coroutines.flow.filter -import kotlinx.coroutines.flow.filterIsInstance import kotlinx.coroutines.flow.flattenMerge +import kotlinx.coroutines.flow.flow import kotlinx.coroutines.flow.flowOn import kotlinx.coroutines.flow.map import kotlinx.coroutines.flow.merge import kotlinx.coroutines.flow.onEach -import kotlinx.coroutines.flow.shareIn +import kotlinx.coroutines.flow.receiveAsFlow import kotlinx.coroutines.flow.withIndex -import org.oxycblt.auxio.music.stack.explore.cache.CacheResult import org.oxycblt.auxio.music.stack.explore.cache.TagCache import org.oxycblt.auxio.music.stack.explore.extractor.TagExtractor import org.oxycblt.auxio.music.stack.explore.fs.DeviceFiles @@ -68,32 +65,31 @@ constructor( } .flowOn(Dispatchers.IO) .buffer() - val tagRead = tagCache.read(deviceFiles).flowOn(Dispatchers.IO).buffer() - val (uncachedDeviceFiles, cachedAudioFiles) = tagRead.results() + // val cacheResults = tagCache.read(deviceFiles).flowOn(Dispatchers.IO).buffer() + + // val (handle, uncachedDeviceFiles, cachedAudioFiles) = tagRead.results() val extractedAudioFiles = - uncachedDeviceFiles - .split(8) - .map { tagExtractor.extract(it).flowOn(Dispatchers.IO).buffer() } - .flattenMerge() - val writtenAudioFiles = tagCache.write(extractedAudioFiles).flowOn(Dispatchers.IO).buffer() - val audioFiles = merge(cachedAudioFiles, writtenAudioFiles) + deviceFiles.stretch(8) { tagExtractor.extract(it).flowOn(Dispatchers.IO) } + // val writtenAudioFiles = + // tagCache.write(extractedAudioFiles).flowOn(Dispatchers.IO).buffer() + // val audioFiles = merge(cachedAudioFiles, writtenAudioFiles) val playlistFiles = storedPlaylists.read() - return Files(audioFiles, playlistFiles) + return Files(extractedAudioFiles, playlistFiles) } - private fun Flow.results(): Pair, Flow> { - val shared = - shareIn(CoroutineScope(Dispatchers.Main), SharingStarted.WhileSubscribed(), replay = 0) - val files = shared.filterIsInstance().map { it.deviceFile } - val songs = shared.filterIsInstance().map { it.audioFile } - return files to songs - } - - private fun Flow.split(n: Int): Flow> { - val indexed = withIndex() - val shared = - indexed.shareIn( - CoroutineScope(Dispatchers.Main), SharingStarted.WhileSubscribed(), replay = 0) - return Array(n) { shared.filter { it.index % n == 0 }.map { it.value } }.asFlow() + /** Temporarily split a flow into 8 parallel threads and then */ + private fun Flow.stretch(n: Int, creator: (Flow) -> Flow): Flow { + val posChannels = Array(n) { Channel(Channel.BUFFERED) } + val divert: Flow = flow { + withIndex().collect { + val index = it.index % n + posChannels[index].send(it.value) + } + for (channel in posChannels) { + channel.close() + } + } + val handle = posChannels.map { creator(it.receiveAsFlow()).buffer() }.asFlow() + return merge(divert, handle.flattenMerge()) } }