diff --git a/musikr/src/main/java/org/oxycblt/musikr/Musikr.kt b/musikr/src/main/java/org/oxycblt/musikr/Musikr.kt index 9616699bf..c18a01684 100644 --- a/musikr/src/main/java/org/oxycblt/musikr/Musikr.kt +++ b/musikr/src/main/java/org/oxycblt/musikr/Musikr.kt @@ -19,15 +19,16 @@ package org.oxycblt.musikr import android.content.Context -import kotlinx.coroutines.async import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.coroutineScope +import kotlinx.coroutines.flow.buffer +import kotlinx.coroutines.flow.onCompletion +import kotlinx.coroutines.flow.onEach +import kotlinx.coroutines.flow.onStart import org.oxycblt.musikr.fs.MusicLocation import org.oxycblt.musikr.pipeline.EvaluateStep -import org.oxycblt.musikr.pipeline.ExploreNode import org.oxycblt.musikr.pipeline.ExploreStep import org.oxycblt.musikr.pipeline.ExtractStep -import org.oxycblt.musikr.pipeline.ExtractedMusic /** * A highly opinionated, multi-threaded device music library. @@ -118,57 +119,21 @@ private class MusikrImpl( locations: List, onProgress: suspend (IndexingProgress) -> Unit ) = coroutineScope { - var explored = 0 - var loaded = 0 - val intermediateNodes = Channel(Channel.UNLIMITED) - val nodes = Channel(Channel.UNLIMITED) - - val exploreTask = exploreStep.explore(locations, intermediateNodes) - val exploreMonitor = async { - try { - onProgress(IndexingProgress.Songs(loaded, explored)) - for (node in intermediateNodes) { - explored++ - onProgress(IndexingProgress.Songs(loaded, explored)) - nodes.send(node) - } - - nodes.close() - Result.success(Unit) - } catch (e: Exception) { - nodes.close(e) - Result.failure(e) - } - } - - val intermediateExtracted = Channel(Channel.UNLIMITED) - val extracted = Channel(Channel.UNLIMITED) - - val extractTask = extractStep.extract(nodes, intermediateExtracted) - val extractMonitor = async { - try { - onProgress(IndexingProgress.Songs(loaded, explored)) - for (music in intermediateExtracted) { - loaded++ - onProgress(IndexingProgress.Songs(loaded, explored)) - extracted.send(music) - } - - extracted.close() - Result.success(Unit) - } catch (e: Exception) { - extracted.close(e) - Result.failure(e) - } - } - - val libraryTask = evaluateStep.evaluate(extracted) - - exploreTask.await().getOrThrow() - exploreMonitor.await().getOrThrow() - extractTask.await().getOrThrow() - extractMonitor.await().getOrThrow() - val library = libraryTask.await().getOrThrow() + var exploredCount = 0 + var extractedCount = 0 + val explored = + exploreStep + .explore(locations) + .buffer(Channel.UNLIMITED) + .onStart { onProgress(IndexingProgress.Songs(0, 0)) } + .onEach { onProgress(IndexingProgress.Songs(extractedCount, ++exploredCount)) } + val extracted = + extractStep + .extract(explored) + .buffer(Channel.UNLIMITED) + .onEach { onProgress(IndexingProgress.Songs(++extractedCount, exploredCount)) } + .onCompletion { onProgress(IndexingProgress.Indeterminate) } + val library = evaluateStep.evaluate(extracted) LibraryResultImpl(storage, library) } } diff --git a/musikr/src/main/java/org/oxycblt/musikr/pipeline/EvaluateStep.kt b/musikr/src/main/java/org/oxycblt/musikr/pipeline/EvaluateStep.kt index 538763f82..df4f72cb1 100644 --- a/musikr/src/main/java/org/oxycblt/musikr/pipeline/EvaluateStep.kt +++ b/musikr/src/main/java/org/oxycblt/musikr/pipeline/EvaluateStep.kt @@ -18,10 +18,16 @@ package org.oxycblt.musikr.pipeline -import kotlinx.coroutines.Deferred -import kotlinx.coroutines.async +import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.channels.Channel -import kotlinx.coroutines.coroutineScope +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.buffer +import kotlinx.coroutines.flow.collect +import kotlinx.coroutines.flow.filterIsInstance +import kotlinx.coroutines.flow.flowOn +import kotlinx.coroutines.flow.map +import kotlinx.coroutines.flow.merge +import kotlinx.coroutines.flow.onEach import org.oxycblt.musikr.Interpretation import org.oxycblt.musikr.MutableLibrary import org.oxycblt.musikr.Storage @@ -32,7 +38,7 @@ import org.oxycblt.musikr.playlist.interpret.PlaylistInterpreter import org.oxycblt.musikr.tag.interpret.TagInterpreter internal interface EvaluateStep { - suspend fun evaluate(extractedMusic: Channel): Deferred> + suspend fun evaluate(extractedMusic: Flow): MutableLibrary companion object { fun new(storage: Storage, interpretation: Interpretation): EvaluateStep = @@ -50,30 +56,33 @@ private class EvaluateStepImpl( private val storedPlaylists: StoredPlaylists, private val libraryFactory: LibraryFactory ) : EvaluateStep { - override suspend fun evaluate( - extractedMusic: Channel - ): Deferred> = coroutineScope { - async { - try { - val graphBuilder = MusicGraph.builder() - for (music in extractedMusic) { - when (music) { - is ExtractedMusic.Valid.Song -> - graphBuilder.add(tagInterpreter.interpret(music.song)) - is ExtractedMusic.Valid.Playlist -> - graphBuilder.add(playlistInterpreter.interpret(music.file)) - is ExtractedMusic.Invalid -> {} - } + override suspend fun evaluate(extractedMusic: Flow): MutableLibrary { + val filterFlow = + extractedMusic.filterIsInstance().divert { + when (it) { + is ExtractedMusic.Valid.Song -> Divert.Right(it.song) + is ExtractedMusic.Valid.Playlist -> Divert.Left(it.file) } - val graph = graphBuilder.build() - val library = libraryFactory.create(graph, storedPlaylists, playlistInterpreter) - - extractedMusic.close() - Result.success(library) - } catch (e: Exception) { - extractedMusic.close(e) - Result.failure(e) } - } + val rawSongs = filterFlow.right + val preSongs = + rawSongs + .map { wrap(it, tagInterpreter::interpret) } + .flowOn(Dispatchers.Default) + .buffer(Channel.UNLIMITED) + val prePlaylists = + filterFlow.left + .map { wrap(it, playlistInterpreter::interpret) } + .flowOn(Dispatchers.Default) + .buffer(Channel.UNLIMITED) + val graphBuilder = MusicGraph.builder() + val graphBuild = + merge( + filterFlow.manager, + preSongs.onEach { wrap(it, graphBuilder::add) }, + prePlaylists.onEach { wrap(it, graphBuilder::add) }) + graphBuild.collect() + val graph = graphBuilder.build() + return libraryFactory.create(graph, storedPlaylists, playlistInterpreter) } } diff --git a/musikr/src/main/java/org/oxycblt/musikr/pipeline/ExploreStep.kt b/musikr/src/main/java/org/oxycblt/musikr/pipeline/ExploreStep.kt index 82a589660..1e77659e1 100644 --- a/musikr/src/main/java/org/oxycblt/musikr/pipeline/ExploreStep.kt +++ b/musikr/src/main/java/org/oxycblt/musikr/pipeline/ExploreStep.kt @@ -19,12 +19,16 @@ package org.oxycblt.musikr.pipeline import android.content.Context -import kotlinx.coroutines.Deferred -import kotlinx.coroutines.async -import kotlinx.coroutines.channels.SendChannel -import kotlinx.coroutines.coroutineScope +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.asFlow +import kotlinx.coroutines.flow.buffer +import kotlinx.coroutines.flow.emitAll +import kotlinx.coroutines.flow.flow +import kotlinx.coroutines.flow.flowOn +import kotlinx.coroutines.flow.map import kotlinx.coroutines.flow.mapNotNull +import kotlinx.coroutines.flow.merge import org.oxycblt.musikr.Storage import org.oxycblt.musikr.fs.DeviceFile import org.oxycblt.musikr.fs.MusicLocation @@ -34,10 +38,7 @@ import org.oxycblt.musikr.playlist.db.StoredPlaylists import org.oxycblt.musikr.playlist.m3u.M3U internal interface ExploreStep { - suspend fun explore( - locations: List, - explored: SendChannel - ): Deferred> + fun explore(locations: List): Flow companion object { fun from(context: Context, storage: Storage): ExploreStep = @@ -49,49 +50,25 @@ private class ExploreStepImpl( private val deviceFiles: DeviceFiles, private val storedPlaylists: StoredPlaylists ) : ExploreStep { - override suspend fun explore( - locations: List, - explored: SendChannel - ) = coroutineScope { - async { - try { - val audioTask = async { - try { - deviceFiles - .explore(locations.asFlow()) - .mapNotNull { - when { - it.mimeType == M3U.MIME_TYPE -> null - it.mimeType.startsWith("audio/") -> ExploreNode.Audio(it) - else -> null - } - } - .collect { explored.send(it) } - Result.success(Unit) - } catch (e: Exception) { - Result.failure(e) + override fun explore(locations: List): Flow { + val audios = + deviceFiles + .explore(locations.asFlow()) + .mapNotNull { + when { + it.mimeType == M3U.MIME_TYPE -> null + it.mimeType.startsWith("audio/") -> ExploreNode.Audio(it) + else -> null } } - - val playlistTask = async { - try { - storedPlaylists.read().forEach { explored.send(ExploreNode.Playlist(it)) } - Result.success(Unit) - } catch (e: Exception) { - Result.failure(e) - } - } - - audioTask.await().getOrThrow() - playlistTask.await().getOrThrow() - - explored.close() - Result.success(Unit) - } catch (e: Exception) { - explored.close(e) - Result.failure(e) - } - } + .flowOn(Dispatchers.IO) + .buffer() + val playlists = + flow { emitAll(storedPlaylists.read().asFlow()) } + .map { ExploreNode.Playlist(it) } + .flowOn(Dispatchers.IO) + .buffer() + return merge(audios, playlists) } } diff --git a/musikr/src/main/java/org/oxycblt/musikr/pipeline/ExtractStep.kt b/musikr/src/main/java/org/oxycblt/musikr/pipeline/ExtractStep.kt index cb208f488..858421457 100644 --- a/musikr/src/main/java/org/oxycblt/musikr/pipeline/ExtractStep.kt +++ b/musikr/src/main/java/org/oxycblt/musikr/pipeline/ExtractStep.kt @@ -19,15 +19,17 @@ package org.oxycblt.musikr.pipeline import android.content.Context -import android.os.ParcelFileDescriptor -import kotlinx.coroutines.Deferred import kotlinx.coroutines.Dispatchers -import kotlinx.coroutines.async -import kotlinx.coroutines.awaitAll +import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.channels.Channel -import kotlinx.coroutines.channels.ReceiveChannel -import kotlinx.coroutines.channels.SendChannel -import kotlinx.coroutines.coroutineScope +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.buffer +import kotlinx.coroutines.flow.flattenMerge +import kotlinx.coroutines.flow.flowOn +import kotlinx.coroutines.flow.map +import kotlinx.coroutines.flow.mapNotNull +import kotlinx.coroutines.flow.merge +import kotlinx.coroutines.flow.onCompletion import kotlinx.coroutines.withContext import org.oxycblt.musikr.Storage import org.oxycblt.musikr.cache.Cache @@ -35,7 +37,6 @@ import org.oxycblt.musikr.cache.CacheResult import org.oxycblt.musikr.cover.Cover import org.oxycblt.musikr.cover.MutableCovers import org.oxycblt.musikr.fs.DeviceFile -import org.oxycblt.musikr.metadata.Metadata import org.oxycblt.musikr.metadata.MetadataExtractor import org.oxycblt.musikr.metadata.Properties import org.oxycblt.musikr.playlist.PlaylistFile @@ -43,10 +44,7 @@ import org.oxycblt.musikr.tag.parse.ParsedTags import org.oxycblt.musikr.tag.parse.TagParser internal interface ExtractStep { - suspend fun extract( - explored: ReceiveChannel, - extracted: SendChannel - ): Deferred> + fun extract(nodes: Flow): Flow companion object { fun from(context: Context, storage: Storage): ExtractStep = @@ -66,155 +64,118 @@ private class ExtractStepImpl( private val cacheFactory: Cache.Factory, private val storedCovers: MutableCovers ) : ExtractStep { - override suspend fun extract( - explored: ReceiveChannel, - extracted: SendChannel - ) = coroutineScope { - async { - try { - val cache = cacheFactory.open() - val addingMs = System.currentTimeMillis() + @OptIn(ExperimentalCoroutinesApi::class) + override fun extract(nodes: Flow): Flow { + val cache = cacheFactory.open() + val addingMs = System.currentTimeMillis() + val filterFlow = + nodes.divert { + when (it) { + is ExploreNode.Audio -> Divert.Right(it.file) + is ExploreNode.Playlist -> Divert.Left(it.file) + } + } + val audioNodes = filterFlow.right + val playlistNodes = filterFlow.left.map { ExtractedMusic.Valid.Playlist(it) } - val read = Channel(Channel.UNLIMITED) - val open = Channel(Channel.UNLIMITED) - val extract = Channel>(Channel.UNLIMITED) + val readDistributedFlow = audioNodes.distribute(8) + val cacheResults = + readDistributedFlow.flows + .map { flow -> + flow + .map { wrap(it) { file -> cache.read(file, storedCovers) } } + .flowOn(Dispatchers.IO) + .buffer(Channel.UNLIMITED) + } + .flattenMerge() + .buffer(Channel.UNLIMITED) + val cacheFlow = + cacheResults.divert { + when (it) { + is CacheResult.Hit -> Divert.Left(it.song) + is CacheResult.Miss -> Divert.Right(it.file) + } + } + val cachedSongs = cacheFlow.left.map { ExtractedMusic.Valid.Song(it) } + val uncachedSongs = cacheFlow.right + + val fds = + uncachedSongs + .mapNotNull { + wrap(it) { file -> + withContext(Dispatchers.IO) { + context.contentResolver.openFileDescriptor(file.uri, "r")?.let { fd -> + FileWith(file, fd) + } + } + } + } + .flowOn(Dispatchers.IO) + .buffer(Channel.UNLIMITED) + + val metadata = + fds.mapNotNull { fileWith -> + wrap(fileWith.file) { _ -> + metadataExtractor + .extract(fileWith.file, fileWith.with) + .let { FileWith(fileWith.file, it) } + .also { withContext(Dispatchers.IO) { fileWith.with.close() } } + } + } + .flowOn(Dispatchers.IO) // Covers are pretty big, so cap the amount of parsed metadata in-memory to at most // 8 to minimize GCs. - val parse = Channel>(8) - val write = Channel(Channel.UNLIMITED) + .buffer(8) - val exploreAssortTask = async { - try { - for (node in explored) { - when (node) { - is ExploreNode.Audio -> read.send(node.file) - is ExploreNode.Playlist -> - extracted.send(ExtractedMusic.Valid.Playlist(node.file)) - } - } - read.close() - Result.success(Unit) - } catch (e: Exception) { - read.close(e) - Result.failure(e) + val extractedSongs = + metadata + .map { fileWith -> + if (fileWith.with != null) { + val tags = tagParser.parse(fileWith.file, fileWith.with) + val cover = fileWith.with.cover?.let { storedCovers.write(it) } + RawSong(fileWith.file, fileWith.with.properties, tags, cover, addingMs) + } else { + null } } + .flowOn(Dispatchers.IO) + .buffer(Channel.UNLIMITED) - val readTasks = - List(8) { - async { - try { - for (file in read) { - when (val result = cache.read(file, storedCovers)) { - is CacheResult.Hit -> - extracted.send(ExtractedMusic.Valid.Song(result.song)) - is CacheResult.Miss -> open.send(result.file) - } - } - Result.success(Unit) - } catch (e: Exception) { - Result.failure(e) - } - } - } - - val readTask = async { - try { - readTasks.awaitAll().forEach { it.getOrThrow() } - open.close() - Result.success(Unit) - } catch (e: Exception) { - open.close(e) - Result.failure(e) - } - } - - val openTask = async { - try { - for (file in open) { - withContext(Dispatchers.IO) { - val fd = context.contentResolver.openFileDescriptor(file.uri, "r") - if (fd != null) { - extract.send(FileWith(file, fd)) - } else { - extracted.send(ExtractedMusic.Invalid) - } - } - } - extract.close() - Result.success(Unit) - } catch (e: Exception) { - extract.close(e) - Result.failure(e) - } - } - - val extractTask = async { - try { - for (fileWith in extract) { - val metadata = metadataExtractor.extract(fileWith.file, fileWith.with) - if (metadata != null) { - parse.send(FileWith(fileWith.file, metadata)) - } else { - extracted.send(ExtractedMusic.Invalid) - } - fileWith.with.close() - } - parse.close() - Result.success(Unit) - } catch (e: Exception) { - parse.close(e) - Result.failure(e) - } - } - - val parseTask = async { - try { - for (fileWith in parse) { - val tags = tagParser.parse(fileWith.file, fileWith.with) - val cover = fileWith.with.cover?.let { storedCovers.write(it) } - write.send( - RawSong( - fileWith.file, fileWith.with.properties, tags, cover, addingMs)) - } - write.close() - Result.success(Unit) - } catch (e: Exception) { - write.close(e) - Result.failure(e) - } - } - - val writeTasks = - List(8) { - async { - try { - for (song in write) { - cache.write(song) - extracted.send(ExtractedMusic.Valid.Song(song)) - } - Result.success(Unit) - } catch (e: Exception) { - Result.failure(e) - } - } - } - - exploreAssortTask.await().getOrThrow() - readTask.await().getOrThrow() - openTask.await().getOrThrow() - extractTask.await().getOrThrow() - parseTask.await().getOrThrow() - writeTasks.awaitAll().forEach { it.getOrThrow() } - cache.finalize() - - extracted.close() - Result.success(Unit) - } catch (e: Exception) { - extracted.close(e) - Result.failure(e) + val extractedFilter = + extractedSongs.divert { + if (it != null) Divert.Left(it) else Divert.Right(ExtractedMusic.Invalid) } - } + + val write = extractedFilter.left + val invalid = extractedFilter.right + + val writeDistributedFlow = write.distribute(8) + val writtenSongs = + writeDistributedFlow.flows + .map { flow -> + flow + .map { + wrap(it, cache::write) + ExtractedMusic.Valid.Song(it) + } + .flowOn(Dispatchers.IO) + .buffer(Channel.UNLIMITED) + } + .flattenMerge() + + val merged = + merge( + filterFlow.manager, + readDistributedFlow.manager, + cacheFlow.manager, + cachedSongs, + extractedFilter.manager, + writeDistributedFlow.manager, + writtenSongs, + invalid, + playlistNodes) + + return merged.onCompletion { cache.finalize() } } private data class FileWith(val file: DeviceFile, val with: T) diff --git a/musikr/src/main/java/org/oxycblt/musikr/pipeline/FlowUtil.kt b/musikr/src/main/java/org/oxycblt/musikr/pipeline/FlowUtil.kt new file mode 100644 index 000000000..58f2c6eb0 --- /dev/null +++ b/musikr/src/main/java/org/oxycblt/musikr/pipeline/FlowUtil.kt @@ -0,0 +1,82 @@ +/* + * Copyright (c) 2024 Auxio Project + * FlowUtil.kt is part of Auxio. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ + +package org.oxycblt.musikr.pipeline + +import kotlinx.coroutines.channels.Channel +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.asFlow +import kotlinx.coroutines.flow.flow +import kotlinx.coroutines.flow.map +import kotlinx.coroutines.flow.receiveAsFlow +import kotlinx.coroutines.flow.withIndex + +internal sealed interface Divert { + data class Left(val value: L) : Divert + + data class Right(val value: R) : Divert +} + +internal class DivertedFlow( + val manager: Flow, + val left: Flow, + val right: Flow +) + +internal inline fun Flow.divert( + crossinline predicate: (T) -> Divert +): DivertedFlow { + val leftChannel = Channel(Channel.UNLIMITED) + val rightChannel = Channel(Channel.UNLIMITED) + val managedFlow = + flow { + collect { + when (val result = predicate(it)) { + is Divert.Left -> leftChannel.send(result.value) + is Divert.Right -> rightChannel.send(result.value) + } + } + leftChannel.close() + rightChannel.close() + } + return DivertedFlow(managedFlow, leftChannel.receiveAsFlow(), rightChannel.receiveAsFlow()) +} + +internal class DistributedFlow(val manager: Flow, val flows: Flow>) + +/** + * Equally "distributes" the values of some flow across n new flows. + * + * Note that this function requires the "manager" flow to be consumed alongside the split flows in + * order to function. Without this, all of the newly split flows will simply block. + */ +internal fun Flow.distribute(n: Int): DistributedFlow { + val posChannels = List(n) { Channel(Channel.UNLIMITED) } + val managerFlow = + flow { + withIndex().collect { + val index = it.index % n + posChannels[index].send(it.value) + } + for (channel in posChannels) { + channel.close() + } + } + val hotFlows = posChannels.asFlow().map { it.receiveAsFlow() } + return DistributedFlow(managerFlow, hotFlows) +} diff --git a/musikr/src/main/java/org/oxycblt/musikr/pipeline/PipelineException.kt b/musikr/src/main/java/org/oxycblt/musikr/pipeline/PipelineException.kt new file mode 100644 index 000000000..1f6efc892 --- /dev/null +++ b/musikr/src/main/java/org/oxycblt/musikr/pipeline/PipelineException.kt @@ -0,0 +1,88 @@ +/* + * Copyright (c) 2024 Auxio Project + * PipelineException.kt is part of Auxio. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ + +package org.oxycblt.musikr.pipeline + +import org.oxycblt.musikr.fs.DeviceFile +import org.oxycblt.musikr.playlist.PlaylistFile +import org.oxycblt.musikr.playlist.interpret.PrePlaylist +import org.oxycblt.musikr.tag.interpret.PreSong + +class PipelineException(val processing: WhileProcessing, val error: Exception) : Exception() { + override val cause = error + + override val message = "Error while processing ${processing}: ${error.stackTraceToString()}" +} + +sealed interface WhileProcessing { + class AFile internal constructor(private val file: DeviceFile) : WhileProcessing { + override fun toString() = "File @ ${file.path}" + } + + class ARawSong internal constructor(private val rawSong: RawSong) : WhileProcessing { + override fun toString() = "Raw Song @ ${rawSong.file.path}" + } + + class APlaylistFile internal constructor(private val playlist: PlaylistFile) : WhileProcessing { + override fun toString() = "Playlist File @ ${playlist.name}" + } + + class APreSong internal constructor(private val preSong: PreSong) : WhileProcessing { + override fun toString() = "Pre Song @ ${preSong.path}" + } + + class APrePlaylist internal constructor(private val prePlaylist: PrePlaylist) : + WhileProcessing { + override fun toString() = "Pre Playlist @ ${prePlaylist.name}" + } +} + +internal suspend fun wrap(file: DeviceFile, block: suspend (DeviceFile) -> R): R = + try { + block(file) + } catch (e: Exception) { + throw PipelineException(WhileProcessing.AFile(file), e) + } + +internal suspend fun wrap(song: RawSong, block: suspend (RawSong) -> R): R = + try { + block(song) + } catch (e: Exception) { + throw PipelineException(WhileProcessing.ARawSong(song), e) + } + +internal suspend fun wrap(file: PlaylistFile, block: suspend (PlaylistFile) -> R): R = + try { + block(file) + } catch (e: Exception) { + throw PipelineException(WhileProcessing.APlaylistFile(file), e) + } + +internal suspend fun wrap(song: PreSong, block: suspend (PreSong) -> R): R = + try { + block(song) + } catch (e: Exception) { + throw PipelineException(WhileProcessing.APreSong(song), e) + } + +internal suspend fun wrap(playlist: PrePlaylist, block: suspend (PrePlaylist) -> R): R = + try { + block(playlist) + } catch (e: Exception) { + throw PipelineException(WhileProcessing.APrePlaylist(playlist), e) + }