music: enable tag caching
This commit is contained in:
parent
a22e972bd3
commit
a3da28fb84
2 changed files with 38 additions and 14 deletions
|
@ -35,10 +35,12 @@ import kotlinx.coroutines.flow.onEach
|
||||||
import kotlinx.coroutines.flow.receiveAsFlow
|
import kotlinx.coroutines.flow.receiveAsFlow
|
||||||
import kotlinx.coroutines.flow.withIndex
|
import kotlinx.coroutines.flow.withIndex
|
||||||
import org.oxycblt.auxio.music.stack.IndexingProgress
|
import org.oxycblt.auxio.music.stack.IndexingProgress
|
||||||
|
import org.oxycblt.auxio.music.stack.explore.cache.CacheResult
|
||||||
import org.oxycblt.auxio.music.stack.explore.cache.TagCache
|
import org.oxycblt.auxio.music.stack.explore.cache.TagCache
|
||||||
import org.oxycblt.auxio.music.stack.explore.extractor.TagExtractor
|
import org.oxycblt.auxio.music.stack.explore.extractor.TagExtractor
|
||||||
import org.oxycblt.auxio.music.stack.explore.fs.DeviceFiles
|
import org.oxycblt.auxio.music.stack.explore.fs.DeviceFiles
|
||||||
import org.oxycblt.auxio.music.stack.explore.playlists.StoredPlaylists
|
import org.oxycblt.auxio.music.stack.explore.playlists.StoredPlaylists
|
||||||
|
import timber.log.Timber
|
||||||
|
|
||||||
interface Explorer {
|
interface Explorer {
|
||||||
fun explore(uris: List<Uri>, onProgress: suspend (IndexingProgress.Songs) -> Unit): Files
|
fun explore(uris: List<Uri>, onProgress: suspend (IndexingProgress.Songs) -> Unit): Files
|
||||||
|
@ -65,24 +67,26 @@ constructor(
|
||||||
deviceFiles
|
deviceFiles
|
||||||
.explore(uris.asFlow())
|
.explore(uris.asFlow())
|
||||||
.onEach {
|
.onEach {
|
||||||
|
Timber.d("File explored: $it")
|
||||||
explored++
|
explored++
|
||||||
onProgress(IndexingProgress.Songs(loaded, explored))
|
onProgress(IndexingProgress.Songs(loaded, explored))
|
||||||
}
|
}
|
||||||
.flowOn(Dispatchers.IO)
|
.flowOn(Dispatchers.IO)
|
||||||
.buffer(Channel.UNLIMITED)
|
.buffer(Channel.UNLIMITED)
|
||||||
// val cacheResults = tagCache.read(deviceFiles).flowOn(Dispatchers.IO).buffer()
|
val cacheResults = tagCache.read(deviceFiles).flowOn(Dispatchers.IO).buffer()
|
||||||
|
|
||||||
// val (handle, uncachedDeviceFiles, cachedAudioFiles) = tagRead.results()
|
|
||||||
val extractedAudioFiles =
|
|
||||||
deviceFiles.stretch(8) { tagExtractor.extract(it).flowOn(Dispatchers.IO) }
|
|
||||||
// val writtenAudioFiles =
|
|
||||||
// tagCache.write(extractedAudioFiles).flowOn(Dispatchers.IO).buffer()
|
|
||||||
// val audioFiles = merge(cachedAudioFiles, writtenAudioFiles)
|
|
||||||
val audioFiles =
|
val audioFiles =
|
||||||
extractedAudioFiles.onEach {
|
cacheResults
|
||||||
loaded++
|
.handleMisses {
|
||||||
onProgress(IndexingProgress.Songs(loaded, explored))
|
val extracted =
|
||||||
}
|
it.stretch(8) { tagExtractor.extract(it).flowOn(Dispatchers.IO) }.buffer()
|
||||||
|
val written = tagCache.write(extracted).flowOn(Dispatchers.IO).buffer()
|
||||||
|
written
|
||||||
|
}
|
||||||
|
.onEach {
|
||||||
|
loaded++
|
||||||
|
onProgress(IndexingProgress.Songs(loaded, explored))
|
||||||
|
Timber.d("File extracted: $it")
|
||||||
|
}
|
||||||
val playlistFiles = storedPlaylists.read()
|
val playlistFiles = storedPlaylists.read()
|
||||||
return Files(audioFiles, playlistFiles)
|
return Files(audioFiles, playlistFiles)
|
||||||
}
|
}
|
||||||
|
@ -103,4 +107,24 @@ constructor(
|
||||||
posChannels.map { creator(it.receiveAsFlow()).buffer(Channel.UNLIMITED) }.asFlow()
|
posChannels.map { creator(it.receiveAsFlow()).buffer(Channel.UNLIMITED) }.asFlow()
|
||||||
return merge(divert, handle.flattenMerge())
|
return merge(divert, handle.flattenMerge())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private fun Flow<CacheResult>.handleMisses(
|
||||||
|
uncached: (Flow<DeviceFile>) -> Flow<AudioFile>
|
||||||
|
): Flow<AudioFile> {
|
||||||
|
val uncachedChannel = Channel<DeviceFile>()
|
||||||
|
val cachedChannel = Channel<AudioFile>()
|
||||||
|
val divert: Flow<AudioFile> = flow {
|
||||||
|
collect {
|
||||||
|
when (it) {
|
||||||
|
is CacheResult.Hit -> cachedChannel.send(it.audioFile)
|
||||||
|
is CacheResult.Miss -> uncachedChannel.send(it.deviceFile)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
cachedChannel.close()
|
||||||
|
uncachedChannel.close()
|
||||||
|
}
|
||||||
|
val uncached = uncached(uncachedChannel.receiveAsFlow())
|
||||||
|
val cached = cachedChannel.receiveAsFlow()
|
||||||
|
return merge(divert, uncached, cached)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -20,8 +20,8 @@ package org.oxycblt.auxio.music.stack.explore.cache
|
||||||
|
|
||||||
import javax.inject.Inject
|
import javax.inject.Inject
|
||||||
import kotlinx.coroutines.flow.Flow
|
import kotlinx.coroutines.flow.Flow
|
||||||
|
import kotlinx.coroutines.flow.map
|
||||||
import kotlinx.coroutines.flow.onEach
|
import kotlinx.coroutines.flow.onEach
|
||||||
import kotlinx.coroutines.flow.transform
|
|
||||||
import org.oxycblt.auxio.music.stack.explore.AudioFile
|
import org.oxycblt.auxio.music.stack.explore.AudioFile
|
||||||
import org.oxycblt.auxio.music.stack.explore.DeviceFile
|
import org.oxycblt.auxio.music.stack.explore.DeviceFile
|
||||||
|
|
||||||
|
@ -39,7 +39,7 @@ interface TagCache {
|
||||||
|
|
||||||
class TagCacheImpl @Inject constructor(private val tagDao: TagDao) : TagCache {
|
class TagCacheImpl @Inject constructor(private val tagDao: TagDao) : TagCache {
|
||||||
override fun read(files: Flow<DeviceFile>) =
|
override fun read(files: Flow<DeviceFile>) =
|
||||||
files.transform<DeviceFile, CacheResult> { file ->
|
files.map { file ->
|
||||||
val tags = tagDao.selectTags(file.uri.toString(), file.lastModified)
|
val tags = tagDao.selectTags(file.uri.toString(), file.lastModified)
|
||||||
if (tags != null) {
|
if (tags != null) {
|
||||||
CacheResult.Hit(tags.toAudioFile(file))
|
CacheResult.Hit(tags.toAudioFile(file))
|
||||||
|
|
Loading…
Reference in a new issue