diff --git a/musikr/src/main/java/org/oxycblt/musikr/Config.kt b/musikr/src/main/java/org/oxycblt/musikr/Config.kt index a793680db..fa9535b35 100644 --- a/musikr/src/main/java/org/oxycblt/musikr/Config.kt +++ b/musikr/src/main/java/org/oxycblt/musikr/Config.kt @@ -24,20 +24,20 @@ import org.oxycblt.musikr.playlist.db.StoredPlaylists import org.oxycblt.musikr.tag.interpret.Naming import org.oxycblt.musikr.tag.interpret.Separators -/** Side-effect laden [Storage] for use during music loading and [MutableLibrary] operation. */ +/** Side-effect repositories for use during music loading and [MutableLibrary] operation. */ data class Storage( /** - * A factory producing a repository of cached metadata to read and write from over the course of - * music loading. This will only be used during music loading. + * A repository of cached metadata to read and write from over the course of music loading only. + * This will be used only during music loading. */ - val cache: Cache.Factory, + val cache: Cache, /** * A repository of cover images to for re-use during music loading. Should be kept in lock-step * with the cache for best performance. This will be used during music loading and when * retrieving cover information from the library. */ - val storedCovers: MutableCovers, + val covers: MutableCovers, /** * A repository of user-created playlists that should also be loaded into the library. This will diff --git a/musikr/src/main/java/org/oxycblt/musikr/Musikr.kt b/musikr/src/main/java/org/oxycblt/musikr/Musikr.kt index c18a01684..da788f9c8 100644 --- a/musikr/src/main/java/org/oxycblt/musikr/Musikr.kt +++ b/musikr/src/main/java/org/oxycblt/musikr/Musikr.kt @@ -22,13 +22,19 @@ import android.content.Context import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.coroutineScope import kotlinx.coroutines.flow.buffer +import kotlinx.coroutines.flow.filterIsInstance +import kotlinx.coroutines.flow.merge import kotlinx.coroutines.flow.onCompletion import kotlinx.coroutines.flow.onEach import kotlinx.coroutines.flow.onStart import org.oxycblt.musikr.fs.MusicLocation +import org.oxycblt.musikr.pipeline.Divert import org.oxycblt.musikr.pipeline.EvaluateStep import org.oxycblt.musikr.pipeline.ExploreStep +import org.oxycblt.musikr.pipeline.Explored import org.oxycblt.musikr.pipeline.ExtractStep +import org.oxycblt.musikr.pipeline.Extracted +import org.oxycblt.musikr.pipeline.divert /** * A highly opinionated, multi-threaded device music library. @@ -127,13 +133,24 @@ private class MusikrImpl( .buffer(Channel.UNLIMITED) .onStart { onProgress(IndexingProgress.Songs(0, 0)) } .onEach { onProgress(IndexingProgress.Songs(extractedCount, ++exploredCount)) } + val typeDiversion = + explored.divert { + when (it) { + is Explored.Known -> Divert.Right(it) + is Explored.New -> Divert.Left(it) + } + } + val known = typeDiversion.right + val new = typeDiversion.left val extracted = extractStep - .extract(explored) + .extract(new) .buffer(Channel.UNLIMITED) .onEach { onProgress(IndexingProgress.Songs(++extractedCount, exploredCount)) } .onCompletion { onProgress(IndexingProgress.Indeterminate) } - val library = evaluateStep.evaluate(extracted) + val complete = + merge(typeDiversion.manager, known, extracted.filterIsInstance()) + val library = evaluateStep.evaluate(complete) LibraryResultImpl(storage, library) } } @@ -143,6 +160,6 @@ private class LibraryResultImpl( override val library: MutableLibrary ) : LibraryResult { override suspend fun cleanup() { - storage.storedCovers.cleanup(library.songs.mapNotNull { it.cover }) + storage.covers.cleanup(library.songs.mapNotNull { it.cover }) } } diff --git a/musikr/src/main/java/org/oxycblt/musikr/cache/Cache.kt b/musikr/src/main/java/org/oxycblt/musikr/cache/Cache.kt index 277495d3a..b6145a44e 100644 --- a/musikr/src/main/java/org/oxycblt/musikr/cache/Cache.kt +++ b/musikr/src/main/java/org/oxycblt/musikr/cache/Cache.kt @@ -18,16 +18,17 @@ package org.oxycblt.musikr.cache -import org.oxycblt.musikr.cover.Covers import org.oxycblt.musikr.fs.DeviceFile +import org.oxycblt.musikr.metadata.Properties import org.oxycblt.musikr.pipeline.RawSong +import org.oxycblt.musikr.tag.parse.ParsedTags abstract class Cache { - internal abstract suspend fun read(file: DeviceFile, covers: Covers): CacheResult + internal abstract suspend fun read(file: DeviceFile): CacheResult internal abstract suspend fun write(song: RawSong) - internal abstract suspend fun finalize() + internal abstract suspend fun finalize(songs: List) abstract class Factory { internal abstract fun open(): Cache @@ -35,7 +36,15 @@ abstract class Cache { } internal sealed interface CacheResult { - data class Hit(val song: RawSong) : CacheResult + data class Hit( + val file: DeviceFile, + val properties: Properties, + val tags: ParsedTags, + val coverId: String?, + val addedMs: Long + ) : CacheResult - data class Miss(val file: DeviceFile, val addedMs: Long?) : CacheResult + data class Outdated(val file: DeviceFile, val addedMs: Long) : CacheResult + + data class Miss(val file: DeviceFile) : CacheResult } diff --git a/musikr/src/main/java/org/oxycblt/musikr/cache/CacheDatabase.kt b/musikr/src/main/java/org/oxycblt/musikr/cache/CacheDatabase.kt index 854e37a37..4e8b2075c 100644 --- a/musikr/src/main/java/org/oxycblt/musikr/cache/CacheDatabase.kt +++ b/musikr/src/main/java/org/oxycblt/musikr/cache/CacheDatabase.kt @@ -31,13 +31,8 @@ import androidx.room.RoomDatabase import androidx.room.Transaction import androidx.room.TypeConverter import androidx.room.TypeConverters -import org.oxycblt.musikr.cover.Covers -import org.oxycblt.musikr.cover.ObtainResult -import org.oxycblt.musikr.fs.DeviceFile -import org.oxycblt.musikr.metadata.Properties import org.oxycblt.musikr.pipeline.RawSong import org.oxycblt.musikr.tag.Date -import org.oxycblt.musikr.tag.parse.ParsedTags import org.oxycblt.musikr.util.correctWhitespace import org.oxycblt.musikr.util.splitEscaped @@ -118,45 +113,6 @@ internal data class CachedSong( val replayGainAlbumAdjustment: Float?, val coverId: String?, ) { - suspend fun intoRawSong(file: DeviceFile, covers: Covers): RawSong? { - val cover = - when (val result = coverId?.let { covers.obtain(it) }) { - // We found the cover. - is ObtainResult.Hit -> result.cover - // We actually didn't find the cover, can't safely convert. - is ObtainResult.Miss -> return null - // No cover in the first place, can ignore. - null -> null - } - return RawSong( - file, - Properties(mimeType, durationMs, bitrateHz, sampleRateHz), - ParsedTags( - musicBrainzId = musicBrainzId, - name = name, - sortName = sortName, - durationMs = durationMs, - track = track, - disc = disc, - subtitle = subtitle, - date = date, - albumMusicBrainzId = albumMusicBrainzId, - albumName = albumName, - albumSortName = albumSortName, - releaseTypes = releaseTypes, - artistMusicBrainzIds = artistMusicBrainzIds, - artistNames = artistNames, - artistSortNames = artistSortNames, - albumArtistMusicBrainzIds = albumArtistMusicBrainzIds, - albumArtistNames = albumArtistNames, - albumArtistSortNames = albumArtistSortNames, - genreNames = genreNames, - replayGainTrackAdjustment = replayGainTrackAdjustment, - replayGainAlbumAdjustment = replayGainAlbumAdjustment), - cover = cover, - addedMs = addedMs) - } - object Converters { @TypeConverter fun fromMultiValue(values: List) = diff --git a/musikr/src/main/java/org/oxycblt/musikr/cache/StoredCache.kt b/musikr/src/main/java/org/oxycblt/musikr/cache/StoredCache.kt index 4707ffe3f..7ec7df30c 100644 --- a/musikr/src/main/java/org/oxycblt/musikr/cache/StoredCache.kt +++ b/musikr/src/main/java/org/oxycblt/musikr/cache/StoredCache.kt @@ -21,7 +21,9 @@ package org.oxycblt.musikr.cache import android.content.Context import org.oxycblt.musikr.cover.Covers import org.oxycblt.musikr.fs.DeviceFile +import org.oxycblt.musikr.metadata.Properties import org.oxycblt.musikr.pipeline.RawSong +import org.oxycblt.musikr.tag.parse.ParsedTags interface StoredCache { fun visible(): Cache.Factory @@ -54,17 +56,44 @@ private abstract class BaseStoredCache(protected val writeDao: CacheWriteDao) : private class VisibleStoredCache(private val visibleDao: VisibleCacheDao, writeDao: CacheWriteDao) : BaseStoredCache(writeDao) { override suspend fun read(file: DeviceFile, covers: Covers): CacheResult { - val song = visibleDao.selectSong(file.uri.toString()) ?: return CacheResult.Miss(file, null) - if (song.modifiedMs != file.modifiedMs) { + val cachedSong = visibleDao.selectSong(file.uri.toString()) ?: return CacheResult.Miss(file) + if (cachedSong.modifiedMs != file.modifiedMs) { // We *found* this file earlier, but it's out of date. // Send back it with the timestamp so it will be re-used. // The touch timestamp will be updated on write. - return CacheResult.Miss(file, song.addedMs) + return CacheResult.Outdated(file, cachedSong.addedMs) } // Valid file, update the touch time. visibleDao.touch(file.uri.toString()) - val rawSong = song.intoRawSong(file, covers) ?: return CacheResult.Miss(file, song.addedMs) - return CacheResult.Hit(rawSong) + return cachedSong.run { + CacheResult.Hit( + file, + Properties(mimeType, durationMs, bitrateHz, sampleRateHz), + ParsedTags( + musicBrainzId = musicBrainzId, + name = name, + sortName = sortName, + durationMs = durationMs, + track = track, + disc = disc, + subtitle = subtitle, + date = date, + albumMusicBrainzId = albumMusicBrainzId, + albumName = albumName, + albumSortName = albumSortName, + releaseTypes = releaseTypes, + artistMusicBrainzIds = artistMusicBrainzIds, + artistNames = artistNames, + artistSortNames = artistSortNames, + albumArtistMusicBrainzIds = albumArtistMusicBrainzIds, + albumArtistNames = albumArtistNames, + albumArtistSortNames = albumArtistSortNames, + genreNames = genreNames, + replayGainTrackAdjustment = replayGainTrackAdjustment, + replayGainAlbumAdjustment = replayGainAlbumAdjustment), + coverId = coverId, + addedMs = addedMs) + } } class Factory(private val cacheDatabase: CacheDatabase) : Cache.Factory() { @@ -77,8 +106,11 @@ private class InvisibleStoredCache( private val invisibleCacheDao: InvisibleCacheDao, writeDao: CacheWriteDao ) : BaseStoredCache(writeDao) { - override suspend fun read(file: DeviceFile, covers: Covers) = - CacheResult.Miss(file, invisibleCacheDao.selectAddedMs(file.uri.toString())) + override suspend fun read(file: DeviceFile, covers: Covers): CacheResult { + val addedMs = + invisibleCacheDao.selectAddedMs(file.uri.toString()) ?: return CacheResult.Miss(file) + return CacheResult.Outdated(file, addedMs) + } class Factory(private val cacheDatabase: CacheDatabase) : Cache.Factory() { override fun open() = diff --git a/musikr/src/main/java/org/oxycblt/musikr/metadata/MetadataExtractor.kt b/musikr/src/main/java/org/oxycblt/musikr/metadata/MetadataExtractor.kt index 0b0cfc48d..ccc39914a 100644 --- a/musikr/src/main/java/org/oxycblt/musikr/metadata/MetadataExtractor.kt +++ b/musikr/src/main/java/org/oxycblt/musikr/metadata/MetadataExtractor.kt @@ -18,6 +18,8 @@ package org.oxycblt.musikr.metadata +import android.annotation.SuppressLint +import android.content.Context import android.os.ParcelFileDescriptor import java.io.FileInputStream import kotlinx.coroutines.Dispatchers @@ -25,17 +27,38 @@ import kotlinx.coroutines.withContext import org.oxycblt.musikr.fs.DeviceFile internal interface MetadataExtractor { - suspend fun extract(deviceFile: DeviceFile, fd: ParcelFileDescriptor): Metadata? + suspend fun open(deviceFile: DeviceFile): MetadataHandle? companion object { - fun new(): MetadataExtractor = MetadataExtractorImpl + fun new(context: Context): MetadataExtractor = MetadataExtractorImpl(context) } } -private object MetadataExtractorImpl : MetadataExtractor { - override suspend fun extract(deviceFile: DeviceFile, fd: ParcelFileDescriptor) = +internal interface MetadataHandle { + suspend fun extract(): Metadata? +} + +private class MetadataExtractorImpl(private val context: Context) : MetadataExtractor { + @SuppressLint("Recycle") + override suspend fun open(deviceFile: DeviceFile): MetadataHandle? { + val fd = + withContext(Dispatchers.IO) { + context.contentResolver.openFileDescriptor(deviceFile.uri, "r") + } + return MetadataHandleImpl(deviceFile, fd ?: return null) + } +} + +private class MetadataHandleImpl( + private val file: DeviceFile, + private val fd: ParcelFileDescriptor +) : MetadataHandle { + override suspend fun extract() = withContext(Dispatchers.IO) { val fis = FileInputStream(fd.fileDescriptor) - TagLibJNI.open(deviceFile, fis).also { fis.close() } + TagLibJNI.open(file, fis).also { + fis.close() + fd.close() + } } } diff --git a/musikr/src/main/java/org/oxycblt/musikr/pipeline/EvaluateStep.kt b/musikr/src/main/java/org/oxycblt/musikr/pipeline/EvaluateStep.kt index df4f72cb1..ef0599037 100644 --- a/musikr/src/main/java/org/oxycblt/musikr/pipeline/EvaluateStep.kt +++ b/musikr/src/main/java/org/oxycblt/musikr/pipeline/EvaluateStep.kt @@ -23,9 +23,7 @@ import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.buffer import kotlinx.coroutines.flow.collect -import kotlinx.coroutines.flow.filterIsInstance import kotlinx.coroutines.flow.flowOn -import kotlinx.coroutines.flow.map import kotlinx.coroutines.flow.merge import kotlinx.coroutines.flow.onEach import org.oxycblt.musikr.Interpretation @@ -38,7 +36,7 @@ import org.oxycblt.musikr.playlist.interpret.PlaylistInterpreter import org.oxycblt.musikr.tag.interpret.TagInterpreter internal interface EvaluateStep { - suspend fun evaluate(extractedMusic: Flow): MutableLibrary + suspend fun evaluate(complete: Flow): MutableLibrary companion object { fun new(storage: Storage, interpretation: Interpretation): EvaluateStep = @@ -56,31 +54,31 @@ private class EvaluateStepImpl( private val storedPlaylists: StoredPlaylists, private val libraryFactory: LibraryFactory ) : EvaluateStep { - override suspend fun evaluate(extractedMusic: Flow): MutableLibrary { + override suspend fun evaluate(complete: Flow): MutableLibrary { val filterFlow = - extractedMusic.filterIsInstance().divert { + complete.divert { when (it) { - is ExtractedMusic.Valid.Song -> Divert.Right(it.song) - is ExtractedMusic.Valid.Playlist -> Divert.Left(it.file) + is RawSong -> Divert.Right(it) + is RawPlaylist -> Divert.Left(it.file) } } val rawSongs = filterFlow.right val preSongs = rawSongs - .map { wrap(it, tagInterpreter::interpret) } + .tryMap { tagInterpreter.interpret(it) } .flowOn(Dispatchers.Default) .buffer(Channel.UNLIMITED) val prePlaylists = filterFlow.left - .map { wrap(it, playlistInterpreter::interpret) } + .tryMap { playlistInterpreter.interpret(it) } .flowOn(Dispatchers.Default) .buffer(Channel.UNLIMITED) val graphBuilder = MusicGraph.builder() val graphBuild = merge( filterFlow.manager, - preSongs.onEach { wrap(it, graphBuilder::add) }, - prePlaylists.onEach { wrap(it, graphBuilder::add) }) + preSongs.onEach { graphBuilder.add(it) }, + prePlaylists.onEach { graphBuilder.add(it) }) graphBuild.collect() val graph = graphBuilder.build() return libraryFactory.create(graph, storedPlaylists, playlistInterpreter) diff --git a/musikr/src/main/java/org/oxycblt/musikr/pipeline/ExploreStep.kt b/musikr/src/main/java/org/oxycblt/musikr/pipeline/ExploreStep.kt index 1e77659e1..2b68f3b93 100644 --- a/musikr/src/main/java/org/oxycblt/musikr/pipeline/ExploreStep.kt +++ b/musikr/src/main/java/org/oxycblt/musikr/pipeline/ExploreStep.kt @@ -24,56 +24,71 @@ import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.asFlow import kotlinx.coroutines.flow.buffer import kotlinx.coroutines.flow.emitAll +import kotlinx.coroutines.flow.filter import kotlinx.coroutines.flow.flow import kotlinx.coroutines.flow.flowOn import kotlinx.coroutines.flow.map -import kotlinx.coroutines.flow.mapNotNull import kotlinx.coroutines.flow.merge import org.oxycblt.musikr.Storage +import org.oxycblt.musikr.cache.Cache +import org.oxycblt.musikr.cache.CacheResult +import org.oxycblt.musikr.cover.Covers +import org.oxycblt.musikr.cover.ObtainResult import org.oxycblt.musikr.fs.DeviceFile import org.oxycblt.musikr.fs.MusicLocation import org.oxycblt.musikr.fs.device.DeviceFiles -import org.oxycblt.musikr.playlist.PlaylistFile import org.oxycblt.musikr.playlist.db.StoredPlaylists import org.oxycblt.musikr.playlist.m3u.M3U internal interface ExploreStep { - fun explore(locations: List): Flow + fun explore(locations: List): Flow companion object { fun from(context: Context, storage: Storage): ExploreStep = - ExploreStepImpl(DeviceFiles.from(context), storage.storedPlaylists) + ExploreStepImpl( + DeviceFiles.from(context), storage.storedPlaylists, storage.cache, storage.covers) } } private class ExploreStepImpl( private val deviceFiles: DeviceFiles, - private val storedPlaylists: StoredPlaylists + private val storedPlaylists: StoredPlaylists, + private val cache: Cache, + private val covers: Covers ) : ExploreStep { - override fun explore(locations: List): Flow { + override fun explore(locations: List): Flow { val audios = deviceFiles .explore(locations.asFlow()) - .mapNotNull { - when { - it.mimeType == M3U.MIME_TYPE -> null - it.mimeType.startsWith("audio/") -> ExploreNode.Audio(it) - else -> null - } - } + .filter { it.mimeType.startsWith("audio/") || it.mimeType == M3U.MIME_TYPE } + .map { evaluateAudio(it) } .flowOn(Dispatchers.IO) .buffer() val playlists = flow { emitAll(storedPlaylists.read().asFlow()) } - .map { ExploreNode.Playlist(it) } + .map { RawPlaylist(it) } .flowOn(Dispatchers.IO) .buffer() return merge(audios, playlists) } -} -internal sealed interface ExploreNode { - data class Audio(val file: DeviceFile) : ExploreNode - - data class Playlist(val file: PlaylistFile) : ExploreNode + private suspend fun evaluateAudio(file: DeviceFile): Explored { + return when (val cacheResult = cache.read(file)) { + is CacheResult.Hit -> { + val coverResult = cacheResult.coverId?.let { covers.obtain(it) } + when (coverResult) { + is ObtainResult.Hit -> + RawSong( + file, + cacheResult.properties, + cacheResult.tags, + coverResult.cover, + cacheResult.addedMs) + else -> NewSong(file, cacheResult.addedMs) + } + } + is CacheResult.Outdated -> NewSong(file, cacheResult.addedMs) + is CacheResult.Miss -> NewSong(file, System.currentTimeMillis()) + } + } } diff --git a/musikr/src/main/java/org/oxycblt/musikr/pipeline/ExtractStep.kt b/musikr/src/main/java/org/oxycblt/musikr/pipeline/ExtractStep.kt index 858421457..9dd958ffe 100644 --- a/musikr/src/main/java/org/oxycblt/musikr/pipeline/ExtractStep.kt +++ b/musikr/src/main/java/org/oxycblt/musikr/pipeline/ExtractStep.kt @@ -20,106 +20,59 @@ package org.oxycblt.musikr.pipeline import android.content.Context import kotlinx.coroutines.Dispatchers -import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.buffer -import kotlinx.coroutines.flow.flattenMerge +import kotlinx.coroutines.flow.filterIsInstance import kotlinx.coroutines.flow.flowOn import kotlinx.coroutines.flow.map -import kotlinx.coroutines.flow.mapNotNull import kotlinx.coroutines.flow.merge -import kotlinx.coroutines.flow.onCompletion -import kotlinx.coroutines.withContext import org.oxycblt.musikr.Storage import org.oxycblt.musikr.cache.Cache -import org.oxycblt.musikr.cache.CacheResult -import org.oxycblt.musikr.cover.Cover import org.oxycblt.musikr.cover.MutableCovers -import org.oxycblt.musikr.fs.DeviceFile +import org.oxycblt.musikr.metadata.Metadata import org.oxycblt.musikr.metadata.MetadataExtractor -import org.oxycblt.musikr.metadata.Properties -import org.oxycblt.musikr.playlist.PlaylistFile -import org.oxycblt.musikr.tag.parse.ParsedTags +import org.oxycblt.musikr.metadata.MetadataHandle import org.oxycblt.musikr.tag.parse.TagParser internal interface ExtractStep { - fun extract(nodes: Flow): Flow + fun extract(nodes: Flow): Flow companion object { fun from(context: Context, storage: Storage): ExtractStep = ExtractStepImpl( - context, - MetadataExtractor.new(), - TagParser.new(), - storage.cache, - storage.storedCovers) + MetadataExtractor.new(context), TagParser.new(), storage.cache, storage.covers) } } private class ExtractStepImpl( - private val context: Context, private val metadataExtractor: MetadataExtractor, private val tagParser: TagParser, - private val cacheFactory: Cache.Factory, + private val cache: Cache, private val storedCovers: MutableCovers ) : ExtractStep { - @OptIn(ExperimentalCoroutinesApi::class) - override fun extract(nodes: Flow): Flow { - val cache = cacheFactory.open() - val addingMs = System.currentTimeMillis() - val filterFlow = - nodes.divert { - when (it) { - is ExploreNode.Audio -> Divert.Right(it.file) - is ExploreNode.Playlist -> Divert.Left(it.file) - } - } - val audioNodes = filterFlow.right - val playlistNodes = filterFlow.left.map { ExtractedMusic.Valid.Playlist(it) } + override fun extract(nodes: Flow): Flow { + val newSongs = nodes.filterIsInstance() - val readDistributedFlow = audioNodes.distribute(8) - val cacheResults = - readDistributedFlow.flows - .map { flow -> - flow - .map { wrap(it) { file -> cache.read(file, storedCovers) } } - .flowOn(Dispatchers.IO) - .buffer(Channel.UNLIMITED) - } - .flattenMerge() - .buffer(Channel.UNLIMITED) - val cacheFlow = - cacheResults.divert { - when (it) { - is CacheResult.Hit -> Divert.Left(it.song) - is CacheResult.Miss -> Divert.Right(it.file) - } - } - val cachedSongs = cacheFlow.left.map { ExtractedMusic.Valid.Song(it) } - val uncachedSongs = cacheFlow.right - - val fds = - uncachedSongs - .mapNotNull { - wrap(it) { file -> - withContext(Dispatchers.IO) { - context.contentResolver.openFileDescriptor(file.uri, "r")?.let { fd -> - FileWith(file, fd) - } - } - } + val handles: Flow = + newSongs + .tryMap { + val handle = metadataExtractor.open(it.file) + if (handle != null) NewSongHandle(it, handle) else ExtractFailed } .flowOn(Dispatchers.IO) .buffer(Channel.UNLIMITED) - val metadata = - fds.mapNotNull { fileWith -> - wrap(fileWith.file) { _ -> - metadataExtractor - .extract(fileWith.file, fileWith.with) - .let { FileWith(fileWith.file, it) } - .also { withContext(Dispatchers.IO) { fileWith.with.close() } } + val extracted: Flow = + handles + .tryMap { item -> + when (item) { + is NewSongHandle -> { + val metadata = item.handle.extract() + if (metadata != null) NewSongMetadata(item.song, metadata) + else ExtractFailed + } + is ExtractFailed -> ExtractFailed } } .flowOn(Dispatchers.IO) @@ -127,74 +80,56 @@ private class ExtractStepImpl( // 8 to minimize GCs. .buffer(8) - val extractedSongs = - metadata - .map { fileWith -> - if (fileWith.with != null) { - val tags = tagParser.parse(fileWith.file, fileWith.with) - val cover = fileWith.with.cover?.let { storedCovers.write(it) } - RawSong(fileWith.file, fileWith.with.properties, tags, cover, addingMs) - } else { - null - } + val validDiversion = + extracted.divert { + when (it) { + is NewSongMetadata -> Divert.Right(it) + is ExtractFailed -> Divert.Left(it) + } + } + + val validSongs = validDiversion.right + val invalidSongs = validDiversion.left + + val parsed = + validSongs + .tryMap { item -> + val tags = tagParser.parse(item.song.file, item.metadata) + val cover = item.metadata.cover?.let { storedCovers.write(it) } + RawSong( + item.song.file, item.metadata.properties, tags, cover, item.song.addedMs) } .flowOn(Dispatchers.IO) .buffer(Channel.UNLIMITED) - val extractedFilter = - extractedSongs.divert { - if (it != null) Divert.Left(it) else Divert.Right(ExtractedMusic.Invalid) + val writeDistribution = parsed.distribute(8) + val writtenSongs = + writeDistribution.flows.mapx { flow -> + flow + .tryMap { + cache.write(it) + it + } + .flowOn(Dispatchers.IO) + .buffer(Channel.UNLIMITED) } - val write = extractedFilter.left - val invalid = extractedFilter.right + val invalid = invalidSongs.map { InvalidSong } - val writeDistributedFlow = write.distribute(8) - val writtenSongs = - writeDistributedFlow.flows - .map { flow -> - flow - .map { - wrap(it, cache::write) - ExtractedMusic.Valid.Song(it) - } - .flowOn(Dispatchers.IO) - .buffer(Channel.UNLIMITED) - } - .flattenMerge() - - val merged = - merge( - filterFlow.manager, - readDistributedFlow.manager, - cacheFlow.manager, - cachedSongs, - extractedFilter.manager, - writeDistributedFlow.manager, - writtenSongs, - invalid, - playlistNodes) - - return merged.onCompletion { cache.finalize() } + return merge(validDiversion.manager, writeDistribution.manager, *writtenSongs, invalid) } - private data class FileWith(val file: DeviceFile, val with: T) -} + private sealed interface ExtractedInternal { + sealed interface Pre : ExtractedInternal -internal data class RawSong( - val file: DeviceFile, - val properties: Properties, - val tags: ParsedTags, - val cover: Cover?, - val addedMs: Long -) - -internal sealed interface ExtractedMusic { - sealed interface Valid : ExtractedMusic { - data class Song(val song: RawSong) : Valid - - data class Playlist(val file: PlaylistFile) : Valid + sealed interface Post : ExtractedInternal } - data object Invalid : ExtractedMusic + private data class NewSongHandle(val song: NewSong, val handle: MetadataHandle) : + ExtractedInternal.Pre + + private data class NewSongMetadata(val song: NewSong, val metadata: Metadata) : + ExtractedInternal.Post + + private data object ExtractFailed : ExtractedInternal.Pre, ExtractedInternal.Post } diff --git a/musikr/src/main/java/org/oxycblt/musikr/pipeline/FlowUtil.kt b/musikr/src/main/java/org/oxycblt/musikr/pipeline/FlowUtil.kt index 58f2c6eb0..0927b5c7d 100644 --- a/musikr/src/main/java/org/oxycblt/musikr/pipeline/FlowUtil.kt +++ b/musikr/src/main/java/org/oxycblt/musikr/pipeline/FlowUtil.kt @@ -20,9 +20,7 @@ package org.oxycblt.musikr.pipeline import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.flow.Flow -import kotlinx.coroutines.flow.asFlow import kotlinx.coroutines.flow.flow -import kotlinx.coroutines.flow.map import kotlinx.coroutines.flow.receiveAsFlow import kotlinx.coroutines.flow.withIndex @@ -57,7 +55,7 @@ internal inline fun Flow.divert( return DivertedFlow(managedFlow, leftChannel.receiveAsFlow(), rightChannel.receiveAsFlow()) } -internal class DistributedFlow(val manager: Flow, val flows: Flow>) +internal class DistributedFlow(val manager: Flow, val flows: Array>) /** * Equally "distributes" the values of some flow across n new flows. @@ -66,7 +64,7 @@ internal class DistributedFlow(val manager: Flow, val flows: Flow Flow.distribute(n: Int): DistributedFlow { - val posChannels = List(n) { Channel(Channel.UNLIMITED) } + val posChannels = Array(n) { Channel(Channel.UNLIMITED) } val managerFlow = flow { withIndex().collect { @@ -77,6 +75,9 @@ internal fun Flow.distribute(n: Int): DistributedFlow { channel.close() } } - val hotFlows = posChannels.asFlow().map { it.receiveAsFlow() } + val hotFlows = posChannels.mapx { it.receiveAsFlow() } return DistributedFlow(managerFlow, hotFlows) } + +internal inline fun Array.mapx(transform: (T) -> R) = + Array(size) { index -> transform(this[index]) } diff --git a/musikr/src/main/java/org/oxycblt/musikr/pipeline/PipelineException.kt b/musikr/src/main/java/org/oxycblt/musikr/pipeline/PipelineException.kt index 1f6efc892..0d46db1e8 100644 --- a/musikr/src/main/java/org/oxycblt/musikr/pipeline/PipelineException.kt +++ b/musikr/src/main/java/org/oxycblt/musikr/pipeline/PipelineException.kt @@ -18,71 +18,20 @@ package org.oxycblt.musikr.pipeline -import org.oxycblt.musikr.fs.DeviceFile -import org.oxycblt.musikr.playlist.PlaylistFile -import org.oxycblt.musikr.playlist.interpret.PrePlaylist -import org.oxycblt.musikr.tag.interpret.PreSong +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.map -class PipelineException(val processing: WhileProcessing, val error: Exception) : Exception() { +class PipelineException(val processing: Any?, val error: Exception) : Exception() { override val cause = error - override val message = "Error while processing ${processing}: ${error.stackTraceToString()}" + override val message = + "Error while processing a ${processing?.let { it::class.simpleName} } ${processing}: ${error.stackTraceToString()}" } -sealed interface WhileProcessing { - class AFile internal constructor(private val file: DeviceFile) : WhileProcessing { - override fun toString() = "File @ ${file.path}" - } - - class ARawSong internal constructor(private val rawSong: RawSong) : WhileProcessing { - override fun toString() = "Raw Song @ ${rawSong.file.path}" - } - - class APlaylistFile internal constructor(private val playlist: PlaylistFile) : WhileProcessing { - override fun toString() = "Playlist File @ ${playlist.name}" - } - - class APreSong internal constructor(private val preSong: PreSong) : WhileProcessing { - override fun toString() = "Pre Song @ ${preSong.path}" - } - - class APrePlaylist internal constructor(private val prePlaylist: PrePlaylist) : - WhileProcessing { - override fun toString() = "Pre Playlist @ ${prePlaylist.name}" +internal fun Flow.tryMap(block: suspend (T) -> R): Flow = map { + try { + block(it) + } catch (e: Exception) { + throw PipelineException(it, e) } } - -internal suspend fun wrap(file: DeviceFile, block: suspend (DeviceFile) -> R): R = - try { - block(file) - } catch (e: Exception) { - throw PipelineException(WhileProcessing.AFile(file), e) - } - -internal suspend fun wrap(song: RawSong, block: suspend (RawSong) -> R): R = - try { - block(song) - } catch (e: Exception) { - throw PipelineException(WhileProcessing.ARawSong(song), e) - } - -internal suspend fun wrap(file: PlaylistFile, block: suspend (PlaylistFile) -> R): R = - try { - block(file) - } catch (e: Exception) { - throw PipelineException(WhileProcessing.APlaylistFile(file), e) - } - -internal suspend fun wrap(song: PreSong, block: suspend (PreSong) -> R): R = - try { - block(song) - } catch (e: Exception) { - throw PipelineException(WhileProcessing.APreSong(song), e) - } - -internal suspend fun wrap(playlist: PrePlaylist, block: suspend (PrePlaylist) -> R): R = - try { - block(playlist) - } catch (e: Exception) { - throw PipelineException(WhileProcessing.APrePlaylist(playlist), e) - } diff --git a/musikr/src/main/java/org/oxycblt/musikr/pipeline/PipelineItem.kt b/musikr/src/main/java/org/oxycblt/musikr/pipeline/PipelineItem.kt new file mode 100644 index 000000000..32434d620 --- /dev/null +++ b/musikr/src/main/java/org/oxycblt/musikr/pipeline/PipelineItem.kt @@ -0,0 +1,57 @@ +/* + * Copyright (c) 2025 Auxio Project + * PipelineItem.kt is part of Auxio. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ + +package org.oxycblt.musikr.pipeline + +import org.oxycblt.musikr.cover.Cover +import org.oxycblt.musikr.fs.DeviceFile +import org.oxycblt.musikr.metadata.Properties +import org.oxycblt.musikr.playlist.PlaylistFile +import org.oxycblt.musikr.tag.parse.ParsedTags + +internal sealed interface PipelineItem + +internal sealed interface Incomplete : PipelineItem + +internal sealed interface Complete : PipelineItem + +internal sealed interface Explored : PipelineItem { + sealed interface New : Explored, Incomplete + + sealed interface Known : Explored, Complete +} + +internal data class NewSong(val file: DeviceFile, val addedMs: Long) : Explored.New + +internal sealed interface Extracted : PipelineItem { + sealed interface Valid : Complete, Extracted + + sealed interface Invalid : Extracted +} + +data object InvalidSong : Extracted.Invalid + +internal data class RawPlaylist(val file: PlaylistFile) : Explored.Known, Extracted.Valid + +internal data class RawSong( + val file: DeviceFile, + val properties: Properties, + val tags: ParsedTags, + val cover: Cover?, + val addedMs: Long +) : Explored.Known, Extracted.Valid