diff --git a/musikr/src/main/java/org/oxycblt/musikr/Musikr.kt b/musikr/src/main/java/org/oxycblt/musikr/Musikr.kt index c18a01684..9616699bf 100644 --- a/musikr/src/main/java/org/oxycblt/musikr/Musikr.kt +++ b/musikr/src/main/java/org/oxycblt/musikr/Musikr.kt @@ -19,16 +19,15 @@ package org.oxycblt.musikr import android.content.Context +import kotlinx.coroutines.async import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.coroutineScope -import kotlinx.coroutines.flow.buffer -import kotlinx.coroutines.flow.onCompletion -import kotlinx.coroutines.flow.onEach -import kotlinx.coroutines.flow.onStart import org.oxycblt.musikr.fs.MusicLocation import org.oxycblt.musikr.pipeline.EvaluateStep +import org.oxycblt.musikr.pipeline.ExploreNode import org.oxycblt.musikr.pipeline.ExploreStep import org.oxycblt.musikr.pipeline.ExtractStep +import org.oxycblt.musikr.pipeline.ExtractedMusic /** * A highly opinionated, multi-threaded device music library. @@ -119,21 +118,57 @@ private class MusikrImpl( locations: List, onProgress: suspend (IndexingProgress) -> Unit ) = coroutineScope { - var exploredCount = 0 - var extractedCount = 0 - val explored = - exploreStep - .explore(locations) - .buffer(Channel.UNLIMITED) - .onStart { onProgress(IndexingProgress.Songs(0, 0)) } - .onEach { onProgress(IndexingProgress.Songs(extractedCount, ++exploredCount)) } - val extracted = - extractStep - .extract(explored) - .buffer(Channel.UNLIMITED) - .onEach { onProgress(IndexingProgress.Songs(++extractedCount, exploredCount)) } - .onCompletion { onProgress(IndexingProgress.Indeterminate) } - val library = evaluateStep.evaluate(extracted) + var explored = 0 + var loaded = 0 + val intermediateNodes = Channel(Channel.UNLIMITED) + val nodes = Channel(Channel.UNLIMITED) + + val exploreTask = exploreStep.explore(locations, intermediateNodes) + val exploreMonitor = async { + try { + onProgress(IndexingProgress.Songs(loaded, explored)) + for (node in intermediateNodes) { + explored++ + onProgress(IndexingProgress.Songs(loaded, explored)) + nodes.send(node) + } + + nodes.close() + Result.success(Unit) + } catch (e: Exception) { + nodes.close(e) + Result.failure(e) + } + } + + val intermediateExtracted = Channel(Channel.UNLIMITED) + val extracted = Channel(Channel.UNLIMITED) + + val extractTask = extractStep.extract(nodes, intermediateExtracted) + val extractMonitor = async { + try { + onProgress(IndexingProgress.Songs(loaded, explored)) + for (music in intermediateExtracted) { + loaded++ + onProgress(IndexingProgress.Songs(loaded, explored)) + extracted.send(music) + } + + extracted.close() + Result.success(Unit) + } catch (e: Exception) { + extracted.close(e) + Result.failure(e) + } + } + + val libraryTask = evaluateStep.evaluate(extracted) + + exploreTask.await().getOrThrow() + exploreMonitor.await().getOrThrow() + extractTask.await().getOrThrow() + extractMonitor.await().getOrThrow() + val library = libraryTask.await().getOrThrow() LibraryResultImpl(storage, library) } } diff --git a/musikr/src/main/java/org/oxycblt/musikr/pipeline/EvaluateStep.kt b/musikr/src/main/java/org/oxycblt/musikr/pipeline/EvaluateStep.kt index df4f72cb1..538763f82 100644 --- a/musikr/src/main/java/org/oxycblt/musikr/pipeline/EvaluateStep.kt +++ b/musikr/src/main/java/org/oxycblt/musikr/pipeline/EvaluateStep.kt @@ -18,16 +18,10 @@ package org.oxycblt.musikr.pipeline -import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.Deferred +import kotlinx.coroutines.async import kotlinx.coroutines.channels.Channel -import kotlinx.coroutines.flow.Flow -import kotlinx.coroutines.flow.buffer -import kotlinx.coroutines.flow.collect -import kotlinx.coroutines.flow.filterIsInstance -import kotlinx.coroutines.flow.flowOn -import kotlinx.coroutines.flow.map -import kotlinx.coroutines.flow.merge -import kotlinx.coroutines.flow.onEach +import kotlinx.coroutines.coroutineScope import org.oxycblt.musikr.Interpretation import org.oxycblt.musikr.MutableLibrary import org.oxycblt.musikr.Storage @@ -38,7 +32,7 @@ import org.oxycblt.musikr.playlist.interpret.PlaylistInterpreter import org.oxycblt.musikr.tag.interpret.TagInterpreter internal interface EvaluateStep { - suspend fun evaluate(extractedMusic: Flow): MutableLibrary + suspend fun evaluate(extractedMusic: Channel): Deferred> companion object { fun new(storage: Storage, interpretation: Interpretation): EvaluateStep = @@ -56,33 +50,30 @@ private class EvaluateStepImpl( private val storedPlaylists: StoredPlaylists, private val libraryFactory: LibraryFactory ) : EvaluateStep { - override suspend fun evaluate(extractedMusic: Flow): MutableLibrary { - val filterFlow = - extractedMusic.filterIsInstance().divert { - when (it) { - is ExtractedMusic.Valid.Song -> Divert.Right(it.song) - is ExtractedMusic.Valid.Playlist -> Divert.Left(it.file) + override suspend fun evaluate( + extractedMusic: Channel + ): Deferred> = coroutineScope { + async { + try { + val graphBuilder = MusicGraph.builder() + for (music in extractedMusic) { + when (music) { + is ExtractedMusic.Valid.Song -> + graphBuilder.add(tagInterpreter.interpret(music.song)) + is ExtractedMusic.Valid.Playlist -> + graphBuilder.add(playlistInterpreter.interpret(music.file)) + is ExtractedMusic.Invalid -> {} + } } + val graph = graphBuilder.build() + val library = libraryFactory.create(graph, storedPlaylists, playlistInterpreter) + + extractedMusic.close() + Result.success(library) + } catch (e: Exception) { + extractedMusic.close(e) + Result.failure(e) } - val rawSongs = filterFlow.right - val preSongs = - rawSongs - .map { wrap(it, tagInterpreter::interpret) } - .flowOn(Dispatchers.Default) - .buffer(Channel.UNLIMITED) - val prePlaylists = - filterFlow.left - .map { wrap(it, playlistInterpreter::interpret) } - .flowOn(Dispatchers.Default) - .buffer(Channel.UNLIMITED) - val graphBuilder = MusicGraph.builder() - val graphBuild = - merge( - filterFlow.manager, - preSongs.onEach { wrap(it, graphBuilder::add) }, - prePlaylists.onEach { wrap(it, graphBuilder::add) }) - graphBuild.collect() - val graph = graphBuilder.build() - return libraryFactory.create(graph, storedPlaylists, playlistInterpreter) + } } } diff --git a/musikr/src/main/java/org/oxycblt/musikr/pipeline/ExploreStep.kt b/musikr/src/main/java/org/oxycblt/musikr/pipeline/ExploreStep.kt index 1e77659e1..82a589660 100644 --- a/musikr/src/main/java/org/oxycblt/musikr/pipeline/ExploreStep.kt +++ b/musikr/src/main/java/org/oxycblt/musikr/pipeline/ExploreStep.kt @@ -19,16 +19,12 @@ package org.oxycblt.musikr.pipeline import android.content.Context -import kotlinx.coroutines.Dispatchers -import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.Deferred +import kotlinx.coroutines.async +import kotlinx.coroutines.channels.SendChannel +import kotlinx.coroutines.coroutineScope import kotlinx.coroutines.flow.asFlow -import kotlinx.coroutines.flow.buffer -import kotlinx.coroutines.flow.emitAll -import kotlinx.coroutines.flow.flow -import kotlinx.coroutines.flow.flowOn -import kotlinx.coroutines.flow.map import kotlinx.coroutines.flow.mapNotNull -import kotlinx.coroutines.flow.merge import org.oxycblt.musikr.Storage import org.oxycblt.musikr.fs.DeviceFile import org.oxycblt.musikr.fs.MusicLocation @@ -38,7 +34,10 @@ import org.oxycblt.musikr.playlist.db.StoredPlaylists import org.oxycblt.musikr.playlist.m3u.M3U internal interface ExploreStep { - fun explore(locations: List): Flow + suspend fun explore( + locations: List, + explored: SendChannel + ): Deferred> companion object { fun from(context: Context, storage: Storage): ExploreStep = @@ -50,25 +49,49 @@ private class ExploreStepImpl( private val deviceFiles: DeviceFiles, private val storedPlaylists: StoredPlaylists ) : ExploreStep { - override fun explore(locations: List): Flow { - val audios = - deviceFiles - .explore(locations.asFlow()) - .mapNotNull { - when { - it.mimeType == M3U.MIME_TYPE -> null - it.mimeType.startsWith("audio/") -> ExploreNode.Audio(it) - else -> null + override suspend fun explore( + locations: List, + explored: SendChannel + ) = coroutineScope { + async { + try { + val audioTask = async { + try { + deviceFiles + .explore(locations.asFlow()) + .mapNotNull { + when { + it.mimeType == M3U.MIME_TYPE -> null + it.mimeType.startsWith("audio/") -> ExploreNode.Audio(it) + else -> null + } + } + .collect { explored.send(it) } + Result.success(Unit) + } catch (e: Exception) { + Result.failure(e) } } - .flowOn(Dispatchers.IO) - .buffer() - val playlists = - flow { emitAll(storedPlaylists.read().asFlow()) } - .map { ExploreNode.Playlist(it) } - .flowOn(Dispatchers.IO) - .buffer() - return merge(audios, playlists) + + val playlistTask = async { + try { + storedPlaylists.read().forEach { explored.send(ExploreNode.Playlist(it)) } + Result.success(Unit) + } catch (e: Exception) { + Result.failure(e) + } + } + + audioTask.await().getOrThrow() + playlistTask.await().getOrThrow() + + explored.close() + Result.success(Unit) + } catch (e: Exception) { + explored.close(e) + Result.failure(e) + } + } } } diff --git a/musikr/src/main/java/org/oxycblt/musikr/pipeline/ExtractStep.kt b/musikr/src/main/java/org/oxycblt/musikr/pipeline/ExtractStep.kt index 858421457..cb208f488 100644 --- a/musikr/src/main/java/org/oxycblt/musikr/pipeline/ExtractStep.kt +++ b/musikr/src/main/java/org/oxycblt/musikr/pipeline/ExtractStep.kt @@ -19,17 +19,15 @@ package org.oxycblt.musikr.pipeline import android.content.Context +import android.os.ParcelFileDescriptor +import kotlinx.coroutines.Deferred import kotlinx.coroutines.Dispatchers -import kotlinx.coroutines.ExperimentalCoroutinesApi +import kotlinx.coroutines.async +import kotlinx.coroutines.awaitAll import kotlinx.coroutines.channels.Channel -import kotlinx.coroutines.flow.Flow -import kotlinx.coroutines.flow.buffer -import kotlinx.coroutines.flow.flattenMerge -import kotlinx.coroutines.flow.flowOn -import kotlinx.coroutines.flow.map -import kotlinx.coroutines.flow.mapNotNull -import kotlinx.coroutines.flow.merge -import kotlinx.coroutines.flow.onCompletion +import kotlinx.coroutines.channels.ReceiveChannel +import kotlinx.coroutines.channels.SendChannel +import kotlinx.coroutines.coroutineScope import kotlinx.coroutines.withContext import org.oxycblt.musikr.Storage import org.oxycblt.musikr.cache.Cache @@ -37,6 +35,7 @@ import org.oxycblt.musikr.cache.CacheResult import org.oxycblt.musikr.cover.Cover import org.oxycblt.musikr.cover.MutableCovers import org.oxycblt.musikr.fs.DeviceFile +import org.oxycblt.musikr.metadata.Metadata import org.oxycblt.musikr.metadata.MetadataExtractor import org.oxycblt.musikr.metadata.Properties import org.oxycblt.musikr.playlist.PlaylistFile @@ -44,7 +43,10 @@ import org.oxycblt.musikr.tag.parse.ParsedTags import org.oxycblt.musikr.tag.parse.TagParser internal interface ExtractStep { - fun extract(nodes: Flow): Flow + suspend fun extract( + explored: ReceiveChannel, + extracted: SendChannel + ): Deferred> companion object { fun from(context: Context, storage: Storage): ExtractStep = @@ -64,118 +66,155 @@ private class ExtractStepImpl( private val cacheFactory: Cache.Factory, private val storedCovers: MutableCovers ) : ExtractStep { - @OptIn(ExperimentalCoroutinesApi::class) - override fun extract(nodes: Flow): Flow { - val cache = cacheFactory.open() - val addingMs = System.currentTimeMillis() - val filterFlow = - nodes.divert { - when (it) { - is ExploreNode.Audio -> Divert.Right(it.file) - is ExploreNode.Playlist -> Divert.Left(it.file) - } - } - val audioNodes = filterFlow.right - val playlistNodes = filterFlow.left.map { ExtractedMusic.Valid.Playlist(it) } + override suspend fun extract( + explored: ReceiveChannel, + extracted: SendChannel + ) = coroutineScope { + async { + try { + val cache = cacheFactory.open() + val addingMs = System.currentTimeMillis() - val readDistributedFlow = audioNodes.distribute(8) - val cacheResults = - readDistributedFlow.flows - .map { flow -> - flow - .map { wrap(it) { file -> cache.read(file, storedCovers) } } - .flowOn(Dispatchers.IO) - .buffer(Channel.UNLIMITED) - } - .flattenMerge() - .buffer(Channel.UNLIMITED) - val cacheFlow = - cacheResults.divert { - when (it) { - is CacheResult.Hit -> Divert.Left(it.song) - is CacheResult.Miss -> Divert.Right(it.file) - } - } - val cachedSongs = cacheFlow.left.map { ExtractedMusic.Valid.Song(it) } - val uncachedSongs = cacheFlow.right + val read = Channel(Channel.UNLIMITED) + val open = Channel(Channel.UNLIMITED) + val extract = Channel>(Channel.UNLIMITED) + // Covers are pretty big, so cap the amount of parsed metadata in-memory to at most + // 8 to minimize GCs. + val parse = Channel>(8) + val write = Channel(Channel.UNLIMITED) - val fds = - uncachedSongs - .mapNotNull { - wrap(it) { file -> - withContext(Dispatchers.IO) { - context.contentResolver.openFileDescriptor(file.uri, "r")?.let { fd -> - FileWith(file, fd) + val exploreAssortTask = async { + try { + for (node in explored) { + when (node) { + is ExploreNode.Audio -> read.send(node.file) + is ExploreNode.Playlist -> + extracted.send(ExtractedMusic.Valid.Playlist(node.file)) + } + } + read.close() + Result.success(Unit) + } catch (e: Exception) { + read.close(e) + Result.failure(e) + } + } + + val readTasks = + List(8) { + async { + try { + for (file in read) { + when (val result = cache.read(file, storedCovers)) { + is CacheResult.Hit -> + extracted.send(ExtractedMusic.Valid.Song(result.song)) + is CacheResult.Miss -> open.send(result.file) + } + } + Result.success(Unit) + } catch (e: Exception) { + Result.failure(e) } } } - } - .flowOn(Dispatchers.IO) - .buffer(Channel.UNLIMITED) - val metadata = - fds.mapNotNull { fileWith -> - wrap(fileWith.file) { _ -> - metadataExtractor - .extract(fileWith.file, fileWith.with) - .let { FileWith(fileWith.file, it) } - .also { withContext(Dispatchers.IO) { fileWith.with.close() } } + val readTask = async { + try { + readTasks.awaitAll().forEach { it.getOrThrow() } + open.close() + Result.success(Unit) + } catch (e: Exception) { + open.close(e) + Result.failure(e) } } - .flowOn(Dispatchers.IO) - // Covers are pretty big, so cap the amount of parsed metadata in-memory to at most - // 8 to minimize GCs. - .buffer(8) - val extractedSongs = - metadata - .map { fileWith -> - if (fileWith.with != null) { - val tags = tagParser.parse(fileWith.file, fileWith.with) - val cover = fileWith.with.cover?.let { storedCovers.write(it) } - RawSong(fileWith.file, fileWith.with.properties, tags, cover, addingMs) - } else { - null - } - } - .flowOn(Dispatchers.IO) - .buffer(Channel.UNLIMITED) - - val extractedFilter = - extractedSongs.divert { - if (it != null) Divert.Left(it) else Divert.Right(ExtractedMusic.Invalid) - } - - val write = extractedFilter.left - val invalid = extractedFilter.right - - val writeDistributedFlow = write.distribute(8) - val writtenSongs = - writeDistributedFlow.flows - .map { flow -> - flow - .map { - wrap(it, cache::write) - ExtractedMusic.Valid.Song(it) + val openTask = async { + try { + for (file in open) { + withContext(Dispatchers.IO) { + val fd = context.contentResolver.openFileDescriptor(file.uri, "r") + if (fd != null) { + extract.send(FileWith(file, fd)) + } else { + extracted.send(ExtractedMusic.Invalid) + } + } } - .flowOn(Dispatchers.IO) - .buffer(Channel.UNLIMITED) + extract.close() + Result.success(Unit) + } catch (e: Exception) { + extract.close(e) + Result.failure(e) + } } - .flattenMerge() - val merged = - merge( - filterFlow.manager, - readDistributedFlow.manager, - cacheFlow.manager, - cachedSongs, - extractedFilter.manager, - writeDistributedFlow.manager, - writtenSongs, - invalid, - playlistNodes) + val extractTask = async { + try { + for (fileWith in extract) { + val metadata = metadataExtractor.extract(fileWith.file, fileWith.with) + if (metadata != null) { + parse.send(FileWith(fileWith.file, metadata)) + } else { + extracted.send(ExtractedMusic.Invalid) + } + fileWith.with.close() + } + parse.close() + Result.success(Unit) + } catch (e: Exception) { + parse.close(e) + Result.failure(e) + } + } - return merged.onCompletion { cache.finalize() } + val parseTask = async { + try { + for (fileWith in parse) { + val tags = tagParser.parse(fileWith.file, fileWith.with) + val cover = fileWith.with.cover?.let { storedCovers.write(it) } + write.send( + RawSong( + fileWith.file, fileWith.with.properties, tags, cover, addingMs)) + } + write.close() + Result.success(Unit) + } catch (e: Exception) { + write.close(e) + Result.failure(e) + } + } + + val writeTasks = + List(8) { + async { + try { + for (song in write) { + cache.write(song) + extracted.send(ExtractedMusic.Valid.Song(song)) + } + Result.success(Unit) + } catch (e: Exception) { + Result.failure(e) + } + } + } + + exploreAssortTask.await().getOrThrow() + readTask.await().getOrThrow() + openTask.await().getOrThrow() + extractTask.await().getOrThrow() + parseTask.await().getOrThrow() + writeTasks.awaitAll().forEach { it.getOrThrow() } + cache.finalize() + + extracted.close() + Result.success(Unit) + } catch (e: Exception) { + extracted.close(e) + Result.failure(e) + } + } } private data class FileWith(val file: DeviceFile, val with: T) diff --git a/musikr/src/main/java/org/oxycblt/musikr/pipeline/FlowUtil.kt b/musikr/src/main/java/org/oxycblt/musikr/pipeline/FlowUtil.kt deleted file mode 100644 index 58f2c6eb0..000000000 --- a/musikr/src/main/java/org/oxycblt/musikr/pipeline/FlowUtil.kt +++ /dev/null @@ -1,82 +0,0 @@ -/* - * Copyright (c) 2024 Auxio Project - * FlowUtil.kt is part of Auxio. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program. If not, see . - */ - -package org.oxycblt.musikr.pipeline - -import kotlinx.coroutines.channels.Channel -import kotlinx.coroutines.flow.Flow -import kotlinx.coroutines.flow.asFlow -import kotlinx.coroutines.flow.flow -import kotlinx.coroutines.flow.map -import kotlinx.coroutines.flow.receiveAsFlow -import kotlinx.coroutines.flow.withIndex - -internal sealed interface Divert { - data class Left(val value: L) : Divert - - data class Right(val value: R) : Divert -} - -internal class DivertedFlow( - val manager: Flow, - val left: Flow, - val right: Flow -) - -internal inline fun Flow.divert( - crossinline predicate: (T) -> Divert -): DivertedFlow { - val leftChannel = Channel(Channel.UNLIMITED) - val rightChannel = Channel(Channel.UNLIMITED) - val managedFlow = - flow { - collect { - when (val result = predicate(it)) { - is Divert.Left -> leftChannel.send(result.value) - is Divert.Right -> rightChannel.send(result.value) - } - } - leftChannel.close() - rightChannel.close() - } - return DivertedFlow(managedFlow, leftChannel.receiveAsFlow(), rightChannel.receiveAsFlow()) -} - -internal class DistributedFlow(val manager: Flow, val flows: Flow>) - -/** - * Equally "distributes" the values of some flow across n new flows. - * - * Note that this function requires the "manager" flow to be consumed alongside the split flows in - * order to function. Without this, all of the newly split flows will simply block. - */ -internal fun Flow.distribute(n: Int): DistributedFlow { - val posChannels = List(n) { Channel(Channel.UNLIMITED) } - val managerFlow = - flow { - withIndex().collect { - val index = it.index % n - posChannels[index].send(it.value) - } - for (channel in posChannels) { - channel.close() - } - } - val hotFlows = posChannels.asFlow().map { it.receiveAsFlow() } - return DistributedFlow(managerFlow, hotFlows) -} diff --git a/musikr/src/main/java/org/oxycblt/musikr/pipeline/PipelineException.kt b/musikr/src/main/java/org/oxycblt/musikr/pipeline/PipelineException.kt deleted file mode 100644 index 1f6efc892..000000000 --- a/musikr/src/main/java/org/oxycblt/musikr/pipeline/PipelineException.kt +++ /dev/null @@ -1,88 +0,0 @@ -/* - * Copyright (c) 2024 Auxio Project - * PipelineException.kt is part of Auxio. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program. If not, see . - */ - -package org.oxycblt.musikr.pipeline - -import org.oxycblt.musikr.fs.DeviceFile -import org.oxycblt.musikr.playlist.PlaylistFile -import org.oxycblt.musikr.playlist.interpret.PrePlaylist -import org.oxycblt.musikr.tag.interpret.PreSong - -class PipelineException(val processing: WhileProcessing, val error: Exception) : Exception() { - override val cause = error - - override val message = "Error while processing ${processing}: ${error.stackTraceToString()}" -} - -sealed interface WhileProcessing { - class AFile internal constructor(private val file: DeviceFile) : WhileProcessing { - override fun toString() = "File @ ${file.path}" - } - - class ARawSong internal constructor(private val rawSong: RawSong) : WhileProcessing { - override fun toString() = "Raw Song @ ${rawSong.file.path}" - } - - class APlaylistFile internal constructor(private val playlist: PlaylistFile) : WhileProcessing { - override fun toString() = "Playlist File @ ${playlist.name}" - } - - class APreSong internal constructor(private val preSong: PreSong) : WhileProcessing { - override fun toString() = "Pre Song @ ${preSong.path}" - } - - class APrePlaylist internal constructor(private val prePlaylist: PrePlaylist) : - WhileProcessing { - override fun toString() = "Pre Playlist @ ${prePlaylist.name}" - } -} - -internal suspend fun wrap(file: DeviceFile, block: suspend (DeviceFile) -> R): R = - try { - block(file) - } catch (e: Exception) { - throw PipelineException(WhileProcessing.AFile(file), e) - } - -internal suspend fun wrap(song: RawSong, block: suspend (RawSong) -> R): R = - try { - block(song) - } catch (e: Exception) { - throw PipelineException(WhileProcessing.ARawSong(song), e) - } - -internal suspend fun wrap(file: PlaylistFile, block: suspend (PlaylistFile) -> R): R = - try { - block(file) - } catch (e: Exception) { - throw PipelineException(WhileProcessing.APlaylistFile(file), e) - } - -internal suspend fun wrap(song: PreSong, block: suspend (PreSong) -> R): R = - try { - block(song) - } catch (e: Exception) { - throw PipelineException(WhileProcessing.APreSong(song), e) - } - -internal suspend fun wrap(playlist: PrePlaylist, block: suspend (PrePlaylist) -> R): R = - try { - block(playlist) - } catch (e: Exception) { - throw PipelineException(WhileProcessing.APrePlaylist(playlist), e) - }