musikr: streamline pipelining system

This commit is contained in:
Alexander Capehart 2025-01-20 20:03:12 -07:00
parent 9685f3cf51
commit 3eac245aea
No known key found for this signature in database
GPG key ID: 37DBE3621FE9AD47
12 changed files with 285 additions and 293 deletions

View file

@ -24,20 +24,20 @@ import org.oxycblt.musikr.playlist.db.StoredPlaylists
import org.oxycblt.musikr.tag.interpret.Naming import org.oxycblt.musikr.tag.interpret.Naming
import org.oxycblt.musikr.tag.interpret.Separators import org.oxycblt.musikr.tag.interpret.Separators
/** Side-effect laden [Storage] for use during music loading and [MutableLibrary] operation. */ /** Side-effect repositories for use during music loading and [MutableLibrary] operation. */
data class Storage( data class Storage(
/** /**
* A factory producing a repository of cached metadata to read and write from over the course of * A repository of cached metadata to read and write from over the course of music loading only.
* music loading. This will only be used during music loading. * This will be used only during music loading.
*/ */
val cache: Cache.Factory, val cache: Cache,
/** /**
* A repository of cover images to for re-use during music loading. Should be kept in lock-step * A repository of cover images to for re-use during music loading. Should be kept in lock-step
* with the cache for best performance. This will be used during music loading and when * with the cache for best performance. This will be used during music loading and when
* retrieving cover information from the library. * retrieving cover information from the library.
*/ */
val storedCovers: MutableCovers, val covers: MutableCovers,
/** /**
* A repository of user-created playlists that should also be loaded into the library. This will * A repository of user-created playlists that should also be loaded into the library. This will

View file

@ -22,13 +22,19 @@ import android.content.Context
import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.coroutineScope import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.flow.buffer import kotlinx.coroutines.flow.buffer
import kotlinx.coroutines.flow.filterIsInstance
import kotlinx.coroutines.flow.merge
import kotlinx.coroutines.flow.onCompletion import kotlinx.coroutines.flow.onCompletion
import kotlinx.coroutines.flow.onEach import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.flow.onStart import kotlinx.coroutines.flow.onStart
import org.oxycblt.musikr.fs.MusicLocation import org.oxycblt.musikr.fs.MusicLocation
import org.oxycblt.musikr.pipeline.Divert
import org.oxycblt.musikr.pipeline.EvaluateStep import org.oxycblt.musikr.pipeline.EvaluateStep
import org.oxycblt.musikr.pipeline.ExploreStep import org.oxycblt.musikr.pipeline.ExploreStep
import org.oxycblt.musikr.pipeline.Explored
import org.oxycblt.musikr.pipeline.ExtractStep import org.oxycblt.musikr.pipeline.ExtractStep
import org.oxycblt.musikr.pipeline.Extracted
import org.oxycblt.musikr.pipeline.divert
/** /**
* A highly opinionated, multi-threaded device music library. * A highly opinionated, multi-threaded device music library.
@ -127,13 +133,24 @@ private class MusikrImpl(
.buffer(Channel.UNLIMITED) .buffer(Channel.UNLIMITED)
.onStart { onProgress(IndexingProgress.Songs(0, 0)) } .onStart { onProgress(IndexingProgress.Songs(0, 0)) }
.onEach { onProgress(IndexingProgress.Songs(extractedCount, ++exploredCount)) } .onEach { onProgress(IndexingProgress.Songs(extractedCount, ++exploredCount)) }
val typeDiversion =
explored.divert {
when (it) {
is Explored.Known -> Divert.Right(it)
is Explored.New -> Divert.Left(it)
}
}
val known = typeDiversion.right
val new = typeDiversion.left
val extracted = val extracted =
extractStep extractStep
.extract(explored) .extract(new)
.buffer(Channel.UNLIMITED) .buffer(Channel.UNLIMITED)
.onEach { onProgress(IndexingProgress.Songs(++extractedCount, exploredCount)) } .onEach { onProgress(IndexingProgress.Songs(++extractedCount, exploredCount)) }
.onCompletion { onProgress(IndexingProgress.Indeterminate) } .onCompletion { onProgress(IndexingProgress.Indeterminate) }
val library = evaluateStep.evaluate(extracted) val complete =
merge(typeDiversion.manager, known, extracted.filterIsInstance<Extracted.Valid>())
val library = evaluateStep.evaluate(complete)
LibraryResultImpl(storage, library) LibraryResultImpl(storage, library)
} }
} }
@ -143,6 +160,6 @@ private class LibraryResultImpl(
override val library: MutableLibrary override val library: MutableLibrary
) : LibraryResult { ) : LibraryResult {
override suspend fun cleanup() { override suspend fun cleanup() {
storage.storedCovers.cleanup(library.songs.mapNotNull { it.cover }) storage.covers.cleanup(library.songs.mapNotNull { it.cover })
} }
} }

View file

@ -18,16 +18,17 @@
package org.oxycblt.musikr.cache package org.oxycblt.musikr.cache
import org.oxycblt.musikr.cover.Covers
import org.oxycblt.musikr.fs.DeviceFile import org.oxycblt.musikr.fs.DeviceFile
import org.oxycblt.musikr.metadata.Properties
import org.oxycblt.musikr.pipeline.RawSong import org.oxycblt.musikr.pipeline.RawSong
import org.oxycblt.musikr.tag.parse.ParsedTags
abstract class Cache { abstract class Cache {
internal abstract suspend fun read(file: DeviceFile, covers: Covers): CacheResult internal abstract suspend fun read(file: DeviceFile): CacheResult
internal abstract suspend fun write(song: RawSong) internal abstract suspend fun write(song: RawSong)
internal abstract suspend fun finalize() internal abstract suspend fun finalize(songs: List<RawSong>)
abstract class Factory { abstract class Factory {
internal abstract fun open(): Cache internal abstract fun open(): Cache
@ -35,7 +36,15 @@ abstract class Cache {
} }
internal sealed interface CacheResult { internal sealed interface CacheResult {
data class Hit(val song: RawSong) : CacheResult data class Hit(
val file: DeviceFile,
val properties: Properties,
val tags: ParsedTags,
val coverId: String?,
val addedMs: Long
) : CacheResult
data class Miss(val file: DeviceFile, val addedMs: Long?) : CacheResult data class Outdated(val file: DeviceFile, val addedMs: Long) : CacheResult
data class Miss(val file: DeviceFile) : CacheResult
} }

View file

@ -31,13 +31,8 @@ import androidx.room.RoomDatabase
import androidx.room.Transaction import androidx.room.Transaction
import androidx.room.TypeConverter import androidx.room.TypeConverter
import androidx.room.TypeConverters import androidx.room.TypeConverters
import org.oxycblt.musikr.cover.Covers
import org.oxycblt.musikr.cover.ObtainResult
import org.oxycblt.musikr.fs.DeviceFile
import org.oxycblt.musikr.metadata.Properties
import org.oxycblt.musikr.pipeline.RawSong import org.oxycblt.musikr.pipeline.RawSong
import org.oxycblt.musikr.tag.Date import org.oxycblt.musikr.tag.Date
import org.oxycblt.musikr.tag.parse.ParsedTags
import org.oxycblt.musikr.util.correctWhitespace import org.oxycblt.musikr.util.correctWhitespace
import org.oxycblt.musikr.util.splitEscaped import org.oxycblt.musikr.util.splitEscaped
@ -118,45 +113,6 @@ internal data class CachedSong(
val replayGainAlbumAdjustment: Float?, val replayGainAlbumAdjustment: Float?,
val coverId: String?, val coverId: String?,
) { ) {
suspend fun intoRawSong(file: DeviceFile, covers: Covers): RawSong? {
val cover =
when (val result = coverId?.let { covers.obtain(it) }) {
// We found the cover.
is ObtainResult.Hit -> result.cover
// We actually didn't find the cover, can't safely convert.
is ObtainResult.Miss -> return null
// No cover in the first place, can ignore.
null -> null
}
return RawSong(
file,
Properties(mimeType, durationMs, bitrateHz, sampleRateHz),
ParsedTags(
musicBrainzId = musicBrainzId,
name = name,
sortName = sortName,
durationMs = durationMs,
track = track,
disc = disc,
subtitle = subtitle,
date = date,
albumMusicBrainzId = albumMusicBrainzId,
albumName = albumName,
albumSortName = albumSortName,
releaseTypes = releaseTypes,
artistMusicBrainzIds = artistMusicBrainzIds,
artistNames = artistNames,
artistSortNames = artistSortNames,
albumArtistMusicBrainzIds = albumArtistMusicBrainzIds,
albumArtistNames = albumArtistNames,
albumArtistSortNames = albumArtistSortNames,
genreNames = genreNames,
replayGainTrackAdjustment = replayGainTrackAdjustment,
replayGainAlbumAdjustment = replayGainAlbumAdjustment),
cover = cover,
addedMs = addedMs)
}
object Converters { object Converters {
@TypeConverter @TypeConverter
fun fromMultiValue(values: List<String>) = fun fromMultiValue(values: List<String>) =

View file

@ -21,7 +21,9 @@ package org.oxycblt.musikr.cache
import android.content.Context import android.content.Context
import org.oxycblt.musikr.cover.Covers import org.oxycblt.musikr.cover.Covers
import org.oxycblt.musikr.fs.DeviceFile import org.oxycblt.musikr.fs.DeviceFile
import org.oxycblt.musikr.metadata.Properties
import org.oxycblt.musikr.pipeline.RawSong import org.oxycblt.musikr.pipeline.RawSong
import org.oxycblt.musikr.tag.parse.ParsedTags
interface StoredCache { interface StoredCache {
fun visible(): Cache.Factory fun visible(): Cache.Factory
@ -54,17 +56,44 @@ private abstract class BaseStoredCache(protected val writeDao: CacheWriteDao) :
private class VisibleStoredCache(private val visibleDao: VisibleCacheDao, writeDao: CacheWriteDao) : private class VisibleStoredCache(private val visibleDao: VisibleCacheDao, writeDao: CacheWriteDao) :
BaseStoredCache(writeDao) { BaseStoredCache(writeDao) {
override suspend fun read(file: DeviceFile, covers: Covers): CacheResult { override suspend fun read(file: DeviceFile, covers: Covers): CacheResult {
val song = visibleDao.selectSong(file.uri.toString()) ?: return CacheResult.Miss(file, null) val cachedSong = visibleDao.selectSong(file.uri.toString()) ?: return CacheResult.Miss(file)
if (song.modifiedMs != file.modifiedMs) { if (cachedSong.modifiedMs != file.modifiedMs) {
// We *found* this file earlier, but it's out of date. // We *found* this file earlier, but it's out of date.
// Send back it with the timestamp so it will be re-used. // Send back it with the timestamp so it will be re-used.
// The touch timestamp will be updated on write. // The touch timestamp will be updated on write.
return CacheResult.Miss(file, song.addedMs) return CacheResult.Outdated(file, cachedSong.addedMs)
} }
// Valid file, update the touch time. // Valid file, update the touch time.
visibleDao.touch(file.uri.toString()) visibleDao.touch(file.uri.toString())
val rawSong = song.intoRawSong(file, covers) ?: return CacheResult.Miss(file, song.addedMs) return cachedSong.run {
return CacheResult.Hit(rawSong) CacheResult.Hit(
file,
Properties(mimeType, durationMs, bitrateHz, sampleRateHz),
ParsedTags(
musicBrainzId = musicBrainzId,
name = name,
sortName = sortName,
durationMs = durationMs,
track = track,
disc = disc,
subtitle = subtitle,
date = date,
albumMusicBrainzId = albumMusicBrainzId,
albumName = albumName,
albumSortName = albumSortName,
releaseTypes = releaseTypes,
artistMusicBrainzIds = artistMusicBrainzIds,
artistNames = artistNames,
artistSortNames = artistSortNames,
albumArtistMusicBrainzIds = albumArtistMusicBrainzIds,
albumArtistNames = albumArtistNames,
albumArtistSortNames = albumArtistSortNames,
genreNames = genreNames,
replayGainTrackAdjustment = replayGainTrackAdjustment,
replayGainAlbumAdjustment = replayGainAlbumAdjustment),
coverId = coverId,
addedMs = addedMs)
}
} }
class Factory(private val cacheDatabase: CacheDatabase) : Cache.Factory() { class Factory(private val cacheDatabase: CacheDatabase) : Cache.Factory() {
@ -77,8 +106,11 @@ private class InvisibleStoredCache(
private val invisibleCacheDao: InvisibleCacheDao, private val invisibleCacheDao: InvisibleCacheDao,
writeDao: CacheWriteDao writeDao: CacheWriteDao
) : BaseStoredCache(writeDao) { ) : BaseStoredCache(writeDao) {
override suspend fun read(file: DeviceFile, covers: Covers) = override suspend fun read(file: DeviceFile, covers: Covers): CacheResult {
CacheResult.Miss(file, invisibleCacheDao.selectAddedMs(file.uri.toString())) val addedMs =
invisibleCacheDao.selectAddedMs(file.uri.toString()) ?: return CacheResult.Miss(file)
return CacheResult.Outdated(file, addedMs)
}
class Factory(private val cacheDatabase: CacheDatabase) : Cache.Factory() { class Factory(private val cacheDatabase: CacheDatabase) : Cache.Factory() {
override fun open() = override fun open() =

View file

@ -18,6 +18,8 @@
package org.oxycblt.musikr.metadata package org.oxycblt.musikr.metadata
import android.annotation.SuppressLint
import android.content.Context
import android.os.ParcelFileDescriptor import android.os.ParcelFileDescriptor
import java.io.FileInputStream import java.io.FileInputStream
import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.Dispatchers
@ -25,17 +27,38 @@ import kotlinx.coroutines.withContext
import org.oxycblt.musikr.fs.DeviceFile import org.oxycblt.musikr.fs.DeviceFile
internal interface MetadataExtractor { internal interface MetadataExtractor {
suspend fun extract(deviceFile: DeviceFile, fd: ParcelFileDescriptor): Metadata? suspend fun open(deviceFile: DeviceFile): MetadataHandle?
companion object { companion object {
fun new(): MetadataExtractor = MetadataExtractorImpl fun new(context: Context): MetadataExtractor = MetadataExtractorImpl(context)
} }
} }
private object MetadataExtractorImpl : MetadataExtractor { internal interface MetadataHandle {
override suspend fun extract(deviceFile: DeviceFile, fd: ParcelFileDescriptor) = suspend fun extract(): Metadata?
}
private class MetadataExtractorImpl(private val context: Context) : MetadataExtractor {
@SuppressLint("Recycle")
override suspend fun open(deviceFile: DeviceFile): MetadataHandle? {
val fd =
withContext(Dispatchers.IO) { withContext(Dispatchers.IO) {
val fis = FileInputStream(fd.fileDescriptor) context.contentResolver.openFileDescriptor(deviceFile.uri, "r")
TagLibJNI.open(deviceFile, fis).also { fis.close() } }
return MetadataHandleImpl(deviceFile, fd ?: return null)
}
}
private class MetadataHandleImpl(
private val file: DeviceFile,
private val fd: ParcelFileDescriptor
) : MetadataHandle {
override suspend fun extract() =
withContext(Dispatchers.IO) {
val fis = FileInputStream(fd.fileDescriptor)
TagLibJNI.open(file, fis).also {
fis.close()
fd.close()
}
} }
} }

View file

@ -23,9 +23,7 @@ import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.buffer import kotlinx.coroutines.flow.buffer
import kotlinx.coroutines.flow.collect import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.filterIsInstance
import kotlinx.coroutines.flow.flowOn import kotlinx.coroutines.flow.flowOn
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.flow.merge import kotlinx.coroutines.flow.merge
import kotlinx.coroutines.flow.onEach import kotlinx.coroutines.flow.onEach
import org.oxycblt.musikr.Interpretation import org.oxycblt.musikr.Interpretation
@ -38,7 +36,7 @@ import org.oxycblt.musikr.playlist.interpret.PlaylistInterpreter
import org.oxycblt.musikr.tag.interpret.TagInterpreter import org.oxycblt.musikr.tag.interpret.TagInterpreter
internal interface EvaluateStep { internal interface EvaluateStep {
suspend fun evaluate(extractedMusic: Flow<ExtractedMusic>): MutableLibrary suspend fun evaluate(complete: Flow<Complete>): MutableLibrary
companion object { companion object {
fun new(storage: Storage, interpretation: Interpretation): EvaluateStep = fun new(storage: Storage, interpretation: Interpretation): EvaluateStep =
@ -56,31 +54,31 @@ private class EvaluateStepImpl(
private val storedPlaylists: StoredPlaylists, private val storedPlaylists: StoredPlaylists,
private val libraryFactory: LibraryFactory private val libraryFactory: LibraryFactory
) : EvaluateStep { ) : EvaluateStep {
override suspend fun evaluate(extractedMusic: Flow<ExtractedMusic>): MutableLibrary { override suspend fun evaluate(complete: Flow<Complete>): MutableLibrary {
val filterFlow = val filterFlow =
extractedMusic.filterIsInstance<ExtractedMusic.Valid>().divert { complete.divert {
when (it) { when (it) {
is ExtractedMusic.Valid.Song -> Divert.Right(it.song) is RawSong -> Divert.Right(it)
is ExtractedMusic.Valid.Playlist -> Divert.Left(it.file) is RawPlaylist -> Divert.Left(it.file)
} }
} }
val rawSongs = filterFlow.right val rawSongs = filterFlow.right
val preSongs = val preSongs =
rawSongs rawSongs
.map { wrap(it, tagInterpreter::interpret) } .tryMap { tagInterpreter.interpret(it) }
.flowOn(Dispatchers.Default) .flowOn(Dispatchers.Default)
.buffer(Channel.UNLIMITED) .buffer(Channel.UNLIMITED)
val prePlaylists = val prePlaylists =
filterFlow.left filterFlow.left
.map { wrap(it, playlistInterpreter::interpret) } .tryMap { playlistInterpreter.interpret(it) }
.flowOn(Dispatchers.Default) .flowOn(Dispatchers.Default)
.buffer(Channel.UNLIMITED) .buffer(Channel.UNLIMITED)
val graphBuilder = MusicGraph.builder() val graphBuilder = MusicGraph.builder()
val graphBuild = val graphBuild =
merge( merge(
filterFlow.manager, filterFlow.manager,
preSongs.onEach { wrap(it, graphBuilder::add) }, preSongs.onEach { graphBuilder.add(it) },
prePlaylists.onEach { wrap(it, graphBuilder::add) }) prePlaylists.onEach { graphBuilder.add(it) })
graphBuild.collect() graphBuild.collect()
val graph = graphBuilder.build() val graph = graphBuilder.build()
return libraryFactory.create(graph, storedPlaylists, playlistInterpreter) return libraryFactory.create(graph, storedPlaylists, playlistInterpreter)

View file

@ -24,56 +24,71 @@ import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.asFlow import kotlinx.coroutines.flow.asFlow
import kotlinx.coroutines.flow.buffer import kotlinx.coroutines.flow.buffer
import kotlinx.coroutines.flow.emitAll import kotlinx.coroutines.flow.emitAll
import kotlinx.coroutines.flow.filter
import kotlinx.coroutines.flow.flow import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.flow.flowOn import kotlinx.coroutines.flow.flowOn
import kotlinx.coroutines.flow.map import kotlinx.coroutines.flow.map
import kotlinx.coroutines.flow.mapNotNull
import kotlinx.coroutines.flow.merge import kotlinx.coroutines.flow.merge
import org.oxycblt.musikr.Storage import org.oxycblt.musikr.Storage
import org.oxycblt.musikr.cache.Cache
import org.oxycblt.musikr.cache.CacheResult
import org.oxycblt.musikr.cover.Covers
import org.oxycblt.musikr.cover.ObtainResult
import org.oxycblt.musikr.fs.DeviceFile import org.oxycblt.musikr.fs.DeviceFile
import org.oxycblt.musikr.fs.MusicLocation import org.oxycblt.musikr.fs.MusicLocation
import org.oxycblt.musikr.fs.device.DeviceFiles import org.oxycblt.musikr.fs.device.DeviceFiles
import org.oxycblt.musikr.playlist.PlaylistFile
import org.oxycblt.musikr.playlist.db.StoredPlaylists import org.oxycblt.musikr.playlist.db.StoredPlaylists
import org.oxycblt.musikr.playlist.m3u.M3U import org.oxycblt.musikr.playlist.m3u.M3U
internal interface ExploreStep { internal interface ExploreStep {
fun explore(locations: List<MusicLocation>): Flow<ExploreNode> fun explore(locations: List<MusicLocation>): Flow<Explored>
companion object { companion object {
fun from(context: Context, storage: Storage): ExploreStep = fun from(context: Context, storage: Storage): ExploreStep =
ExploreStepImpl(DeviceFiles.from(context), storage.storedPlaylists) ExploreStepImpl(
DeviceFiles.from(context), storage.storedPlaylists, storage.cache, storage.covers)
} }
} }
private class ExploreStepImpl( private class ExploreStepImpl(
private val deviceFiles: DeviceFiles, private val deviceFiles: DeviceFiles,
private val storedPlaylists: StoredPlaylists private val storedPlaylists: StoredPlaylists,
private val cache: Cache,
private val covers: Covers
) : ExploreStep { ) : ExploreStep {
override fun explore(locations: List<MusicLocation>): Flow<ExploreNode> { override fun explore(locations: List<MusicLocation>): Flow<Explored> {
val audios = val audios =
deviceFiles deviceFiles
.explore(locations.asFlow()) .explore(locations.asFlow())
.mapNotNull { .filter { it.mimeType.startsWith("audio/") || it.mimeType == M3U.MIME_TYPE }
when { .map { evaluateAudio(it) }
it.mimeType == M3U.MIME_TYPE -> null
it.mimeType.startsWith("audio/") -> ExploreNode.Audio(it)
else -> null
}
}
.flowOn(Dispatchers.IO) .flowOn(Dispatchers.IO)
.buffer() .buffer()
val playlists = val playlists =
flow { emitAll(storedPlaylists.read().asFlow()) } flow { emitAll(storedPlaylists.read().asFlow()) }
.map { ExploreNode.Playlist(it) } .map { RawPlaylist(it) }
.flowOn(Dispatchers.IO) .flowOn(Dispatchers.IO)
.buffer() .buffer()
return merge(audios, playlists) return merge(audios, playlists)
} }
}
internal sealed interface ExploreNode { private suspend fun evaluateAudio(file: DeviceFile): Explored {
data class Audio(val file: DeviceFile) : ExploreNode return when (val cacheResult = cache.read(file)) {
is CacheResult.Hit -> {
data class Playlist(val file: PlaylistFile) : ExploreNode val coverResult = cacheResult.coverId?.let { covers.obtain(it) }
when (coverResult) {
is ObtainResult.Hit ->
RawSong(
file,
cacheResult.properties,
cacheResult.tags,
coverResult.cover,
cacheResult.addedMs)
else -> NewSong(file, cacheResult.addedMs)
}
}
is CacheResult.Outdated -> NewSong(file, cacheResult.addedMs)
is CacheResult.Miss -> NewSong(file, System.currentTimeMillis())
}
}
} }

View file

@ -20,106 +20,59 @@ package org.oxycblt.musikr.pipeline
import android.content.Context import android.content.Context
import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.buffer import kotlinx.coroutines.flow.buffer
import kotlinx.coroutines.flow.flattenMerge import kotlinx.coroutines.flow.filterIsInstance
import kotlinx.coroutines.flow.flowOn import kotlinx.coroutines.flow.flowOn
import kotlinx.coroutines.flow.map import kotlinx.coroutines.flow.map
import kotlinx.coroutines.flow.mapNotNull
import kotlinx.coroutines.flow.merge import kotlinx.coroutines.flow.merge
import kotlinx.coroutines.flow.onCompletion
import kotlinx.coroutines.withContext
import org.oxycblt.musikr.Storage import org.oxycblt.musikr.Storage
import org.oxycblt.musikr.cache.Cache import org.oxycblt.musikr.cache.Cache
import org.oxycblt.musikr.cache.CacheResult
import org.oxycblt.musikr.cover.Cover
import org.oxycblt.musikr.cover.MutableCovers import org.oxycblt.musikr.cover.MutableCovers
import org.oxycblt.musikr.fs.DeviceFile import org.oxycblt.musikr.metadata.Metadata
import org.oxycblt.musikr.metadata.MetadataExtractor import org.oxycblt.musikr.metadata.MetadataExtractor
import org.oxycblt.musikr.metadata.Properties import org.oxycblt.musikr.metadata.MetadataHandle
import org.oxycblt.musikr.playlist.PlaylistFile
import org.oxycblt.musikr.tag.parse.ParsedTags
import org.oxycblt.musikr.tag.parse.TagParser import org.oxycblt.musikr.tag.parse.TagParser
internal interface ExtractStep { internal interface ExtractStep {
fun extract(nodes: Flow<ExploreNode>): Flow<ExtractedMusic> fun extract(nodes: Flow<Explored.New>): Flow<Extracted>
companion object { companion object {
fun from(context: Context, storage: Storage): ExtractStep = fun from(context: Context, storage: Storage): ExtractStep =
ExtractStepImpl( ExtractStepImpl(
context, MetadataExtractor.new(context), TagParser.new(), storage.cache, storage.covers)
MetadataExtractor.new(),
TagParser.new(),
storage.cache,
storage.storedCovers)
} }
} }
private class ExtractStepImpl( private class ExtractStepImpl(
private val context: Context,
private val metadataExtractor: MetadataExtractor, private val metadataExtractor: MetadataExtractor,
private val tagParser: TagParser, private val tagParser: TagParser,
private val cacheFactory: Cache.Factory, private val cache: Cache,
private val storedCovers: MutableCovers private val storedCovers: MutableCovers
) : ExtractStep { ) : ExtractStep {
@OptIn(ExperimentalCoroutinesApi::class) override fun extract(nodes: Flow<Explored.New>): Flow<Extracted> {
override fun extract(nodes: Flow<ExploreNode>): Flow<ExtractedMusic> { val newSongs = nodes.filterIsInstance<NewSong>()
val cache = cacheFactory.open()
val addingMs = System.currentTimeMillis()
val filterFlow =
nodes.divert {
when (it) {
is ExploreNode.Audio -> Divert.Right(it.file)
is ExploreNode.Playlist -> Divert.Left(it.file)
}
}
val audioNodes = filterFlow.right
val playlistNodes = filterFlow.left.map { ExtractedMusic.Valid.Playlist(it) }
val readDistributedFlow = audioNodes.distribute(8) val handles: Flow<ExtractedInternal.Pre> =
val cacheResults = newSongs
readDistributedFlow.flows .tryMap {
.map { flow -> val handle = metadataExtractor.open(it.file)
flow if (handle != null) NewSongHandle(it, handle) else ExtractFailed
.map { wrap(it) { file -> cache.read(file, storedCovers) } }
.flowOn(Dispatchers.IO)
.buffer(Channel.UNLIMITED)
}
.flattenMerge()
.buffer(Channel.UNLIMITED)
val cacheFlow =
cacheResults.divert {
when (it) {
is CacheResult.Hit -> Divert.Left(it.song)
is CacheResult.Miss -> Divert.Right(it.file)
}
}
val cachedSongs = cacheFlow.left.map { ExtractedMusic.Valid.Song(it) }
val uncachedSongs = cacheFlow.right
val fds =
uncachedSongs
.mapNotNull {
wrap(it) { file ->
withContext(Dispatchers.IO) {
context.contentResolver.openFileDescriptor(file.uri, "r")?.let { fd ->
FileWith(file, fd)
}
}
}
} }
.flowOn(Dispatchers.IO) .flowOn(Dispatchers.IO)
.buffer(Channel.UNLIMITED) .buffer(Channel.UNLIMITED)
val metadata = val extracted: Flow<ExtractedInternal.Post> =
fds.mapNotNull { fileWith -> handles
wrap(fileWith.file) { _ -> .tryMap { item ->
metadataExtractor when (item) {
.extract(fileWith.file, fileWith.with) is NewSongHandle -> {
.let { FileWith(fileWith.file, it) } val metadata = item.handle.extract()
.also { withContext(Dispatchers.IO) { fileWith.with.close() } } if (metadata != null) NewSongMetadata(item.song, metadata)
else ExtractFailed
}
is ExtractFailed -> ExtractFailed
} }
} }
.flowOn(Dispatchers.IO) .flowOn(Dispatchers.IO)
@ -127,74 +80,56 @@ private class ExtractStepImpl(
// 8 to minimize GCs. // 8 to minimize GCs.
.buffer(8) .buffer(8)
val extractedSongs = val validDiversion =
metadata extracted.divert {
.map { fileWith -> when (it) {
if (fileWith.with != null) { is NewSongMetadata -> Divert.Right(it)
val tags = tagParser.parse(fileWith.file, fileWith.with) is ExtractFailed -> Divert.Left(it)
val cover = fileWith.with.cover?.let { storedCovers.write(it) }
RawSong(fileWith.file, fileWith.with.properties, tags, cover, addingMs)
} else {
null
} }
} }
val validSongs = validDiversion.right
val invalidSongs = validDiversion.left
val parsed =
validSongs
.tryMap { item ->
val tags = tagParser.parse(item.song.file, item.metadata)
val cover = item.metadata.cover?.let { storedCovers.write(it) }
RawSong(
item.song.file, item.metadata.properties, tags, cover, item.song.addedMs)
}
.flowOn(Dispatchers.IO) .flowOn(Dispatchers.IO)
.buffer(Channel.UNLIMITED) .buffer(Channel.UNLIMITED)
val extractedFilter = val writeDistribution = parsed.distribute(8)
extractedSongs.divert {
if (it != null) Divert.Left(it) else Divert.Right(ExtractedMusic.Invalid)
}
val write = extractedFilter.left
val invalid = extractedFilter.right
val writeDistributedFlow = write.distribute(8)
val writtenSongs = val writtenSongs =
writeDistributedFlow.flows writeDistribution.flows.mapx { flow ->
.map { flow ->
flow flow
.map { .tryMap {
wrap(it, cache::write) cache.write(it)
ExtractedMusic.Valid.Song(it) it
} }
.flowOn(Dispatchers.IO) .flowOn(Dispatchers.IO)
.buffer(Channel.UNLIMITED) .buffer(Channel.UNLIMITED)
} }
.flattenMerge()
val merged = val invalid = invalidSongs.map { InvalidSong }
merge(
filterFlow.manager,
readDistributedFlow.manager,
cacheFlow.manager,
cachedSongs,
extractedFilter.manager,
writeDistributedFlow.manager,
writtenSongs,
invalid,
playlistNodes)
return merged.onCompletion { cache.finalize() } return merge(validDiversion.manager, writeDistribution.manager, *writtenSongs, invalid)
} }
private data class FileWith<T>(val file: DeviceFile, val with: T) private sealed interface ExtractedInternal {
} sealed interface Pre : ExtractedInternal
internal data class RawSong( sealed interface Post : ExtractedInternal
val file: DeviceFile,
val properties: Properties,
val tags: ParsedTags,
val cover: Cover?,
val addedMs: Long
)
internal sealed interface ExtractedMusic {
sealed interface Valid : ExtractedMusic {
data class Song(val song: RawSong) : Valid
data class Playlist(val file: PlaylistFile) : Valid
} }
data object Invalid : ExtractedMusic private data class NewSongHandle(val song: NewSong, val handle: MetadataHandle) :
ExtractedInternal.Pre
private data class NewSongMetadata(val song: NewSong, val metadata: Metadata) :
ExtractedInternal.Post
private data object ExtractFailed : ExtractedInternal.Pre, ExtractedInternal.Post
} }

View file

@ -20,9 +20,7 @@ package org.oxycblt.musikr.pipeline
import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.asFlow
import kotlinx.coroutines.flow.flow import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.flow.receiveAsFlow import kotlinx.coroutines.flow.receiveAsFlow
import kotlinx.coroutines.flow.withIndex import kotlinx.coroutines.flow.withIndex
@ -57,7 +55,7 @@ internal inline fun <T, L, R> Flow<T>.divert(
return DivertedFlow(managedFlow, leftChannel.receiveAsFlow(), rightChannel.receiveAsFlow()) return DivertedFlow(managedFlow, leftChannel.receiveAsFlow(), rightChannel.receiveAsFlow())
} }
internal class DistributedFlow<T>(val manager: Flow<Nothing>, val flows: Flow<Flow<T>>) internal class DistributedFlow<T>(val manager: Flow<Nothing>, val flows: Array<Flow<T>>)
/** /**
* Equally "distributes" the values of some flow across n new flows. * Equally "distributes" the values of some flow across n new flows.
@ -66,7 +64,7 @@ internal class DistributedFlow<T>(val manager: Flow<Nothing>, val flows: Flow<Fl
* order to function. Without this, all of the newly split flows will simply block. * order to function. Without this, all of the newly split flows will simply block.
*/ */
internal fun <T> Flow<T>.distribute(n: Int): DistributedFlow<T> { internal fun <T> Flow<T>.distribute(n: Int): DistributedFlow<T> {
val posChannels = List(n) { Channel<T>(Channel.UNLIMITED) } val posChannels = Array(n) { Channel<T>(Channel.UNLIMITED) }
val managerFlow = val managerFlow =
flow<Nothing> { flow<Nothing> {
withIndex().collect { withIndex().collect {
@ -77,6 +75,9 @@ internal fun <T> Flow<T>.distribute(n: Int): DistributedFlow<T> {
channel.close() channel.close()
} }
} }
val hotFlows = posChannels.asFlow().map { it.receiveAsFlow() } val hotFlows = posChannels.mapx { it.receiveAsFlow() }
return DistributedFlow(managerFlow, hotFlows) return DistributedFlow(managerFlow, hotFlows)
} }
internal inline fun <T, reified R> Array<T>.mapx(transform: (T) -> R) =
Array(size) { index -> transform(this[index]) }

View file

@ -18,71 +18,20 @@
package org.oxycblt.musikr.pipeline package org.oxycblt.musikr.pipeline
import org.oxycblt.musikr.fs.DeviceFile import kotlinx.coroutines.flow.Flow
import org.oxycblt.musikr.playlist.PlaylistFile import kotlinx.coroutines.flow.map
import org.oxycblt.musikr.playlist.interpret.PrePlaylist
import org.oxycblt.musikr.tag.interpret.PreSong
class PipelineException(val processing: WhileProcessing, val error: Exception) : Exception() { class PipelineException(val processing: Any?, val error: Exception) : Exception() {
override val cause = error override val cause = error
override val message = "Error while processing ${processing}: ${error.stackTraceToString()}" override val message =
"Error while processing a ${processing?.let { it::class.simpleName} } ${processing}: ${error.stackTraceToString()}"
} }
sealed interface WhileProcessing { internal fun <T : Any, R> Flow<T>.tryMap(block: suspend (T) -> R): Flow<R> = map {
class AFile internal constructor(private val file: DeviceFile) : WhileProcessing { try {
override fun toString() = "File @ ${file.path}" block(it)
} } catch (e: Exception) {
throw PipelineException(it, e)
class ARawSong internal constructor(private val rawSong: RawSong) : WhileProcessing {
override fun toString() = "Raw Song @ ${rawSong.file.path}"
}
class APlaylistFile internal constructor(private val playlist: PlaylistFile) : WhileProcessing {
override fun toString() = "Playlist File @ ${playlist.name}"
}
class APreSong internal constructor(private val preSong: PreSong) : WhileProcessing {
override fun toString() = "Pre Song @ ${preSong.path}"
}
class APrePlaylist internal constructor(private val prePlaylist: PrePlaylist) :
WhileProcessing {
override fun toString() = "Pre Playlist @ ${prePlaylist.name}"
} }
} }
internal suspend fun <R> wrap(file: DeviceFile, block: suspend (DeviceFile) -> R): R =
try {
block(file)
} catch (e: Exception) {
throw PipelineException(WhileProcessing.AFile(file), e)
}
internal suspend fun <R> wrap(song: RawSong, block: suspend (RawSong) -> R): R =
try {
block(song)
} catch (e: Exception) {
throw PipelineException(WhileProcessing.ARawSong(song), e)
}
internal suspend fun <R> wrap(file: PlaylistFile, block: suspend (PlaylistFile) -> R): R =
try {
block(file)
} catch (e: Exception) {
throw PipelineException(WhileProcessing.APlaylistFile(file), e)
}
internal suspend fun <R> wrap(song: PreSong, block: suspend (PreSong) -> R): R =
try {
block(song)
} catch (e: Exception) {
throw PipelineException(WhileProcessing.APreSong(song), e)
}
internal suspend fun <R> wrap(playlist: PrePlaylist, block: suspend (PrePlaylist) -> R): R =
try {
block(playlist)
} catch (e: Exception) {
throw PipelineException(WhileProcessing.APrePlaylist(playlist), e)
}

View file

@ -0,0 +1,57 @@
/*
* Copyright (c) 2025 Auxio Project
* PipelineItem.kt is part of Auxio.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package org.oxycblt.musikr.pipeline
import org.oxycblt.musikr.cover.Cover
import org.oxycblt.musikr.fs.DeviceFile
import org.oxycblt.musikr.metadata.Properties
import org.oxycblt.musikr.playlist.PlaylistFile
import org.oxycblt.musikr.tag.parse.ParsedTags
internal sealed interface PipelineItem
internal sealed interface Incomplete : PipelineItem
internal sealed interface Complete : PipelineItem
internal sealed interface Explored : PipelineItem {
sealed interface New : Explored, Incomplete
sealed interface Known : Explored, Complete
}
internal data class NewSong(val file: DeviceFile, val addedMs: Long) : Explored.New
internal sealed interface Extracted : PipelineItem {
sealed interface Valid : Complete, Extracted
sealed interface Invalid : Extracted
}
data object InvalidSong : Extracted.Invalid
internal data class RawPlaylist(val file: PlaylistFile) : Explored.Known, Extracted.Valid
internal data class RawSong(
val file: DeviceFile,
val properties: Properties,
val tags: ParsedTags,
val cover: Cover?,
val addedMs: Long
) : Explored.Known, Extracted.Valid