From d3de34ed5e4793630b59e785f147db98736efb20 Mon Sep 17 00:00:00 2001 From: Alexander Capehart Date: Mon, 18 Dec 2023 18:33:13 -0700 Subject: [PATCH] music: throw on deadlocks Attempt to throw an exception when any part of the loading routine times out. --- .../oxycblt/auxio/music/MusicRepository.kt | 9 ++- .../auxio/music/device/DeviceLibrary.kt | 10 ++-- .../auxio/music/fs/MediaStoreExtractor.kt | 5 +- .../auxio/music/metadata/TagExtractor.kt | 8 ++- .../java/org/oxycblt/auxio/util/StateUtil.kt | 59 +++++++++++++++++++ 5 files changed, 80 insertions(+), 11 deletions(-) diff --git a/app/src/main/java/org/oxycblt/auxio/music/MusicRepository.kt b/app/src/main/java/org/oxycblt/auxio/music/MusicRepository.kt index 55cfeaf0a..c61c4c5ad 100644 --- a/app/src/main/java/org/oxycblt/auxio/music/MusicRepository.kt +++ b/app/src/main/java/org/oxycblt/auxio/music/MusicRepository.kt @@ -41,6 +41,7 @@ import org.oxycblt.auxio.music.metadata.Separators import org.oxycblt.auxio.music.metadata.TagExtractor import org.oxycblt.auxio.music.user.MutableUserLibrary import org.oxycblt.auxio.music.user.UserLibrary +import org.oxycblt.auxio.util.forEachWithTimeout import org.oxycblt.auxio.util.logD import org.oxycblt.auxio.util.logE import org.oxycblt.auxio.util.logW @@ -448,6 +449,7 @@ constructor( try { tagExtractor.consume(incompleteSongs, completeSongs) } catch (e: Exception) { + logD("Tag extraction failed: $e") completeSongs.close(e) return@async } @@ -464,6 +466,7 @@ constructor( deviceLibraryFactory.create( completeSongs, processedSongs, separators, nameFactory) } catch (e: Exception) { + logD("DeviceLibrary creation failed: $e") processedSongs.close(e) return@async Result.failure(e) } @@ -474,8 +477,10 @@ constructor( // We could keep track of a total here, but we also need to collate this RawSong information // for when we write the cache later on in the finalization step. val rawSongs = LinkedList() - for (rawSong in processedSongs) { - rawSongs.add(rawSong) + // Use a longer timeout so that dependent components can timeout and throw errors that + // provide more context than if we timed out here. + processedSongs.forEachWithTimeout(20000) { + rawSongs.add(it) // Since discovery takes up the bulk of the music loading process, we switch to // indicating a defined amount of loaded songs in comparison to the projected amount // of songs that were queried. diff --git a/app/src/main/java/org/oxycblt/auxio/music/device/DeviceLibrary.kt b/app/src/main/java/org/oxycblt/auxio/music/device/DeviceLibrary.kt index c694a65ea..389b34fb6 100644 --- a/app/src/main/java/org/oxycblt/auxio/music/device/DeviceLibrary.kt +++ b/app/src/main/java/org/oxycblt/auxio/music/device/DeviceLibrary.kt @@ -33,7 +33,9 @@ import org.oxycblt.auxio.music.fs.contentResolverSafe import org.oxycblt.auxio.music.fs.useQuery import org.oxycblt.auxio.music.info.Name import org.oxycblt.auxio.music.metadata.Separators +import org.oxycblt.auxio.util.forEachWithTimeout import org.oxycblt.auxio.util.logW +import org.oxycblt.auxio.util.sendWithTimeout import org.oxycblt.auxio.util.unlikelyToBeNull /** @@ -130,7 +132,7 @@ class DeviceLibraryFactoryImpl @Inject constructor() : DeviceLibrary.Factory { // TODO: Use comparators here // All music information is grouped as it is indexed by other components. - for (rawSong in rawSongs) { + rawSongs.forEachWithTimeout { rawSong -> val song = SongImpl(rawSong, nameFactory, separators) // At times the indexer produces duplicate songs, try to filter these. Comparing by // UID is sufficient for something like this, and also prevents collisions from @@ -142,8 +144,8 @@ class DeviceLibraryFactoryImpl @Inject constructor() : DeviceLibrary.Factory { // We still want to say that we "processed" the song so that the user doesn't // get confused at why the bar was only partly filled by the end of the loading // process. - processedSongs.send(rawSong) - continue + processedSongs.sendWithTimeout(rawSong) + return@forEachWithTimeout } songGrouping[song.uid] = song @@ -206,7 +208,7 @@ class DeviceLibraryFactoryImpl @Inject constructor() : DeviceLibrary.Factory { } } - processedSongs.send(rawSong) + processedSongs.sendWithTimeout(rawSong) } // Now that all songs are processed, also process albums and group them into their diff --git a/app/src/main/java/org/oxycblt/auxio/music/fs/MediaStoreExtractor.kt b/app/src/main/java/org/oxycblt/auxio/music/fs/MediaStoreExtractor.kt index 392103d80..b25a360a7 100644 --- a/app/src/main/java/org/oxycblt/auxio/music/fs/MediaStoreExtractor.kt +++ b/app/src/main/java/org/oxycblt/auxio/music/fs/MediaStoreExtractor.kt @@ -36,6 +36,7 @@ import org.oxycblt.auxio.music.metadata.parseId3v2PositionField import org.oxycblt.auxio.music.metadata.transformPositionField import org.oxycblt.auxio.util.getSystemServiceCompat import org.oxycblt.auxio.util.logD +import org.oxycblt.auxio.util.sendWithTimeout /** * The layer that loads music from the [MediaStore] database. This is an intermediate step in the @@ -205,10 +206,10 @@ private abstract class BaseMediaStoreExtractor(protected val context: Context) : val rawSong = RawSong() query.populateFileInfo(rawSong) if (cache?.populate(rawSong) == true) { - completeSongs.send(rawSong) + completeSongs.sendWithTimeout(rawSong) } else { query.populateTags(rawSong) - incompleteSongs.send(rawSong) + incompleteSongs.sendWithTimeout(rawSong) } yield() } diff --git a/app/src/main/java/org/oxycblt/auxio/music/metadata/TagExtractor.kt b/app/src/main/java/org/oxycblt/auxio/music/metadata/TagExtractor.kt index 4cca1a824..de49e2e81 100644 --- a/app/src/main/java/org/oxycblt/auxio/music/metadata/TagExtractor.kt +++ b/app/src/main/java/org/oxycblt/auxio/music/metadata/TagExtractor.kt @@ -23,7 +23,9 @@ import javax.inject.Inject import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.yield import org.oxycblt.auxio.music.device.RawSong +import org.oxycblt.auxio.util.forEachWithTimeout import org.oxycblt.auxio.util.logD +import org.oxycblt.auxio.util.sendWithTimeout /** * The extractor that leverages ExoPlayer's [MetadataRetriever] API to parse metadata. This is the @@ -55,14 +57,14 @@ class TagExtractorImpl @Inject constructor(private val tagWorkerFactory: TagWork logD("Beginning primary extraction loop") - for (incompleteRawSong in incompleteSongs) { + incompleteSongs.forEachWithTimeout { incompleteRawSong -> spin@ while (true) { for (i in tagWorkerPool.indices) { val worker = tagWorkerPool[i] if (worker != null) { val completeRawSong = worker.poll() if (completeRawSong != null) { - completeSongs.send(completeRawSong) + completeSongs.sendWithTimeout(completeRawSong) yield() } else { continue @@ -83,7 +85,7 @@ class TagExtractorImpl @Inject constructor(private val tagWorkerFactory: TagWork if (task != null) { val completeRawSong = task.poll() if (completeRawSong != null) { - completeSongs.send(completeRawSong) + completeSongs.sendWithTimeout(completeRawSong) tagWorkerPool[i] = null yield() } else { diff --git a/app/src/main/java/org/oxycblt/auxio/util/StateUtil.kt b/app/src/main/java/org/oxycblt/auxio/util/StateUtil.kt index eb74d8f15..eb8358867 100644 --- a/app/src/main/java/org/oxycblt/auxio/util/StateUtil.kt +++ b/app/src/main/java/org/oxycblt/auxio/util/StateUtil.kt @@ -22,11 +22,16 @@ import androidx.fragment.app.Fragment import androidx.lifecycle.Lifecycle import androidx.lifecycle.lifecycleScope import androidx.lifecycle.repeatOnLifecycle +import java.util.concurrent.TimeoutException import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.TimeoutCancellationException +import kotlinx.coroutines.channels.ReceiveChannel +import kotlinx.coroutines.channels.SendChannel import kotlinx.coroutines.flow.MutableStateFlow import kotlinx.coroutines.flow.StateFlow import kotlinx.coroutines.flow.combine import kotlinx.coroutines.launch +import kotlinx.coroutines.withTimeout /** * A wrapper around [StateFlow] exposing a one-time consumable event. @@ -146,3 +151,57 @@ private fun Fragment.launch( ) { viewLifecycleOwner.lifecycleScope.launch { viewLifecycleOwner.repeatOnLifecycle(state, block) } } + +/** + * Wraps [SendChannel.send] with a specified timeout. + * + * @param element The element to send. + * @param timeout The timeout in milliseconds. Defaults to 10 seconds. + * @throws TimeoutException If the timeout is reached, provides context on what element + * specifically. + */ +suspend fun SendChannel.sendWithTimeout(element: E, timeout: Long = 10000) { + try { + withTimeout(timeout) { send(element) } + } catch (e: Exception) { + throw TimeoutException("Timed out sending element $element to channel: $e") + } +} + +/** + * Wraps a [ReceiveChannel] consumption with a specified timeout. Note that the timeout will only + * start on the first element received, as to prevent initialization of dependent coroutines being + * interpreted as a timeout. + * + * @param action The action to perform on each element received. + * @param timeout The timeout in milliseconds. Defaults to 10 seconds. + * @throws TimeoutException If the timeout is reached, provides context on what element + * specifically. + */ +suspend fun ReceiveChannel.forEachWithTimeout( + timeout: Long = 10000, + action: suspend (E) -> Unit +) { + var exhausted = false + var subsequent = false + val handler: suspend () -> Unit = { + val value = receiveCatching() + if (value.isClosed) { + exhausted = true + } else { + action(value.getOrThrow()) + } + } + while (!exhausted) { + try { + if (subsequent) { + withTimeout(timeout) { handler() } + } else { + handler() + subsequent = true + } + } catch (e: TimeoutCancellationException) { + throw TimeoutException("Timed out receiving element from channel: $e") + } + } +}