music: temp strip down explorer & fix threading

Mostly for continued debugging
This commit is contained in:
Alexander Capehart 2024-11-26 20:12:55 -07:00
parent ba5f51dfe6
commit 144da8a3b5
No known key found for this signature in database
GPG key ID: 37DBE3621FE9AD47

View file

@ -20,23 +20,20 @@ package org.oxycblt.auxio.music.stack.explore
import android.net.Uri
import javax.inject.Inject
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.SharingStarted
import kotlinx.coroutines.flow.asFlow
import kotlinx.coroutines.flow.buffer
import kotlinx.coroutines.flow.filter
import kotlinx.coroutines.flow.filterIsInstance
import kotlinx.coroutines.flow.flattenMerge
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.flow.flowOn
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.flow.merge
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.flow.shareIn
import kotlinx.coroutines.flow.receiveAsFlow
import kotlinx.coroutines.flow.withIndex
import org.oxycblt.auxio.music.stack.explore.cache.CacheResult
import org.oxycblt.auxio.music.stack.explore.cache.TagCache
import org.oxycblt.auxio.music.stack.explore.extractor.TagExtractor
import org.oxycblt.auxio.music.stack.explore.fs.DeviceFiles
@ -68,32 +65,31 @@ constructor(
}
.flowOn(Dispatchers.IO)
.buffer()
val tagRead = tagCache.read(deviceFiles).flowOn(Dispatchers.IO).buffer()
val (uncachedDeviceFiles, cachedAudioFiles) = tagRead.results()
// val cacheResults = tagCache.read(deviceFiles).flowOn(Dispatchers.IO).buffer()
// val (handle, uncachedDeviceFiles, cachedAudioFiles) = tagRead.results()
val extractedAudioFiles =
uncachedDeviceFiles
.split(8)
.map { tagExtractor.extract(it).flowOn(Dispatchers.IO).buffer() }
.flattenMerge()
val writtenAudioFiles = tagCache.write(extractedAudioFiles).flowOn(Dispatchers.IO).buffer()
val audioFiles = merge(cachedAudioFiles, writtenAudioFiles)
deviceFiles.stretch(8) { tagExtractor.extract(it).flowOn(Dispatchers.IO) }
// val writtenAudioFiles =
// tagCache.write(extractedAudioFiles).flowOn(Dispatchers.IO).buffer()
// val audioFiles = merge(cachedAudioFiles, writtenAudioFiles)
val playlistFiles = storedPlaylists.read()
return Files(audioFiles, playlistFiles)
return Files(extractedAudioFiles, playlistFiles)
}
private fun Flow<CacheResult>.results(): Pair<Flow<DeviceFile>, Flow<AudioFile>> {
val shared =
shareIn(CoroutineScope(Dispatchers.Main), SharingStarted.WhileSubscribed(), replay = 0)
val files = shared.filterIsInstance<CacheResult.Miss>().map { it.deviceFile }
val songs = shared.filterIsInstance<CacheResult.Hit>().map { it.audioFile }
return files to songs
}
private fun <T> Flow<T>.split(n: Int): Flow<Flow<T>> {
val indexed = withIndex()
val shared =
indexed.shareIn(
CoroutineScope(Dispatchers.Main), SharingStarted.WhileSubscribed(), replay = 0)
return Array(n) { shared.filter { it.index % n == 0 }.map { it.value } }.asFlow()
/** Temporarily split a flow into 8 parallel threads and then */
private fun <T, R> Flow<T>.stretch(n: Int, creator: (Flow<T>) -> Flow<R>): Flow<R> {
val posChannels = Array(n) { Channel<T>(Channel.BUFFERED) }
val divert: Flow<R> = flow {
withIndex().collect {
val index = it.index % n
posChannels[index].send(it.value)
}
for (channel in posChannels) {
channel.close()
}
}
val handle = posChannels.map { creator(it.receiveAsFlow()).buffer() }.asFlow()
return merge(divert, handle.flattenMerge())
}
}