music: throw on deadlocks
Attempt to throw an exception when any part of the loading routine times out.
This commit is contained in:
parent
4cb309f01f
commit
d3de34ed5e
5 changed files with 80 additions and 11 deletions
|
@ -41,6 +41,7 @@ import org.oxycblt.auxio.music.metadata.Separators
|
|||
import org.oxycblt.auxio.music.metadata.TagExtractor
|
||||
import org.oxycblt.auxio.music.user.MutableUserLibrary
|
||||
import org.oxycblt.auxio.music.user.UserLibrary
|
||||
import org.oxycblt.auxio.util.forEachWithTimeout
|
||||
import org.oxycblt.auxio.util.logD
|
||||
import org.oxycblt.auxio.util.logE
|
||||
import org.oxycblt.auxio.util.logW
|
||||
|
@ -448,6 +449,7 @@ constructor(
|
|||
try {
|
||||
tagExtractor.consume(incompleteSongs, completeSongs)
|
||||
} catch (e: Exception) {
|
||||
logD("Tag extraction failed: $e")
|
||||
completeSongs.close(e)
|
||||
return@async
|
||||
}
|
||||
|
@ -464,6 +466,7 @@ constructor(
|
|||
deviceLibraryFactory.create(
|
||||
completeSongs, processedSongs, separators, nameFactory)
|
||||
} catch (e: Exception) {
|
||||
logD("DeviceLibrary creation failed: $e")
|
||||
processedSongs.close(e)
|
||||
return@async Result.failure(e)
|
||||
}
|
||||
|
@ -474,8 +477,10 @@ constructor(
|
|||
// We could keep track of a total here, but we also need to collate this RawSong information
|
||||
// for when we write the cache later on in the finalization step.
|
||||
val rawSongs = LinkedList<RawSong>()
|
||||
for (rawSong in processedSongs) {
|
||||
rawSongs.add(rawSong)
|
||||
// Use a longer timeout so that dependent components can timeout and throw errors that
|
||||
// provide more context than if we timed out here.
|
||||
processedSongs.forEachWithTimeout(20000) {
|
||||
rawSongs.add(it)
|
||||
// Since discovery takes up the bulk of the music loading process, we switch to
|
||||
// indicating a defined amount of loaded songs in comparison to the projected amount
|
||||
// of songs that were queried.
|
||||
|
|
|
@ -33,7 +33,9 @@ import org.oxycblt.auxio.music.fs.contentResolverSafe
|
|||
import org.oxycblt.auxio.music.fs.useQuery
|
||||
import org.oxycblt.auxio.music.info.Name
|
||||
import org.oxycblt.auxio.music.metadata.Separators
|
||||
import org.oxycblt.auxio.util.forEachWithTimeout
|
||||
import org.oxycblt.auxio.util.logW
|
||||
import org.oxycblt.auxio.util.sendWithTimeout
|
||||
import org.oxycblt.auxio.util.unlikelyToBeNull
|
||||
|
||||
/**
|
||||
|
@ -130,7 +132,7 @@ class DeviceLibraryFactoryImpl @Inject constructor() : DeviceLibrary.Factory {
|
|||
// TODO: Use comparators here
|
||||
|
||||
// All music information is grouped as it is indexed by other components.
|
||||
for (rawSong in rawSongs) {
|
||||
rawSongs.forEachWithTimeout { rawSong ->
|
||||
val song = SongImpl(rawSong, nameFactory, separators)
|
||||
// At times the indexer produces duplicate songs, try to filter these. Comparing by
|
||||
// UID is sufficient for something like this, and also prevents collisions from
|
||||
|
@ -142,8 +144,8 @@ class DeviceLibraryFactoryImpl @Inject constructor() : DeviceLibrary.Factory {
|
|||
// We still want to say that we "processed" the song so that the user doesn't
|
||||
// get confused at why the bar was only partly filled by the end of the loading
|
||||
// process.
|
||||
processedSongs.send(rawSong)
|
||||
continue
|
||||
processedSongs.sendWithTimeout(rawSong)
|
||||
return@forEachWithTimeout
|
||||
}
|
||||
songGrouping[song.uid] = song
|
||||
|
||||
|
@ -206,7 +208,7 @@ class DeviceLibraryFactoryImpl @Inject constructor() : DeviceLibrary.Factory {
|
|||
}
|
||||
}
|
||||
|
||||
processedSongs.send(rawSong)
|
||||
processedSongs.sendWithTimeout(rawSong)
|
||||
}
|
||||
|
||||
// Now that all songs are processed, also process albums and group them into their
|
||||
|
|
|
@ -36,6 +36,7 @@ import org.oxycblt.auxio.music.metadata.parseId3v2PositionField
|
|||
import org.oxycblt.auxio.music.metadata.transformPositionField
|
||||
import org.oxycblt.auxio.util.getSystemServiceCompat
|
||||
import org.oxycblt.auxio.util.logD
|
||||
import org.oxycblt.auxio.util.sendWithTimeout
|
||||
|
||||
/**
|
||||
* The layer that loads music from the [MediaStore] database. This is an intermediate step in the
|
||||
|
@ -205,10 +206,10 @@ private abstract class BaseMediaStoreExtractor(protected val context: Context) :
|
|||
val rawSong = RawSong()
|
||||
query.populateFileInfo(rawSong)
|
||||
if (cache?.populate(rawSong) == true) {
|
||||
completeSongs.send(rawSong)
|
||||
completeSongs.sendWithTimeout(rawSong)
|
||||
} else {
|
||||
query.populateTags(rawSong)
|
||||
incompleteSongs.send(rawSong)
|
||||
incompleteSongs.sendWithTimeout(rawSong)
|
||||
}
|
||||
yield()
|
||||
}
|
||||
|
|
|
@ -23,7 +23,9 @@ import javax.inject.Inject
|
|||
import kotlinx.coroutines.channels.Channel
|
||||
import kotlinx.coroutines.yield
|
||||
import org.oxycblt.auxio.music.device.RawSong
|
||||
import org.oxycblt.auxio.util.forEachWithTimeout
|
||||
import org.oxycblt.auxio.util.logD
|
||||
import org.oxycblt.auxio.util.sendWithTimeout
|
||||
|
||||
/**
|
||||
* The extractor that leverages ExoPlayer's [MetadataRetriever] API to parse metadata. This is the
|
||||
|
@ -55,14 +57,14 @@ class TagExtractorImpl @Inject constructor(private val tagWorkerFactory: TagWork
|
|||
|
||||
logD("Beginning primary extraction loop")
|
||||
|
||||
for (incompleteRawSong in incompleteSongs) {
|
||||
incompleteSongs.forEachWithTimeout { incompleteRawSong ->
|
||||
spin@ while (true) {
|
||||
for (i in tagWorkerPool.indices) {
|
||||
val worker = tagWorkerPool[i]
|
||||
if (worker != null) {
|
||||
val completeRawSong = worker.poll()
|
||||
if (completeRawSong != null) {
|
||||
completeSongs.send(completeRawSong)
|
||||
completeSongs.sendWithTimeout(completeRawSong)
|
||||
yield()
|
||||
} else {
|
||||
continue
|
||||
|
@ -83,7 +85,7 @@ class TagExtractorImpl @Inject constructor(private val tagWorkerFactory: TagWork
|
|||
if (task != null) {
|
||||
val completeRawSong = task.poll()
|
||||
if (completeRawSong != null) {
|
||||
completeSongs.send(completeRawSong)
|
||||
completeSongs.sendWithTimeout(completeRawSong)
|
||||
tagWorkerPool[i] = null
|
||||
yield()
|
||||
} else {
|
||||
|
|
|
@ -22,11 +22,16 @@ import androidx.fragment.app.Fragment
|
|||
import androidx.lifecycle.Lifecycle
|
||||
import androidx.lifecycle.lifecycleScope
|
||||
import androidx.lifecycle.repeatOnLifecycle
|
||||
import java.util.concurrent.TimeoutException
|
||||
import kotlinx.coroutines.CoroutineScope
|
||||
import kotlinx.coroutines.TimeoutCancellationException
|
||||
import kotlinx.coroutines.channels.ReceiveChannel
|
||||
import kotlinx.coroutines.channels.SendChannel
|
||||
import kotlinx.coroutines.flow.MutableStateFlow
|
||||
import kotlinx.coroutines.flow.StateFlow
|
||||
import kotlinx.coroutines.flow.combine
|
||||
import kotlinx.coroutines.launch
|
||||
import kotlinx.coroutines.withTimeout
|
||||
|
||||
/**
|
||||
* A wrapper around [StateFlow] exposing a one-time consumable event.
|
||||
|
@ -146,3 +151,57 @@ private fun Fragment.launch(
|
|||
) {
|
||||
viewLifecycleOwner.lifecycleScope.launch { viewLifecycleOwner.repeatOnLifecycle(state, block) }
|
||||
}
|
||||
|
||||
/**
|
||||
* Wraps [SendChannel.send] with a specified timeout.
|
||||
*
|
||||
* @param element The element to send.
|
||||
* @param timeout The timeout in milliseconds. Defaults to 10 seconds.
|
||||
* @throws TimeoutException If the timeout is reached, provides context on what element
|
||||
* specifically.
|
||||
*/
|
||||
suspend fun <E> SendChannel<E>.sendWithTimeout(element: E, timeout: Long = 10000) {
|
||||
try {
|
||||
withTimeout(timeout) { send(element) }
|
||||
} catch (e: Exception) {
|
||||
throw TimeoutException("Timed out sending element $element to channel: $e")
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Wraps a [ReceiveChannel] consumption with a specified timeout. Note that the timeout will only
|
||||
* start on the first element received, as to prevent initialization of dependent coroutines being
|
||||
* interpreted as a timeout.
|
||||
*
|
||||
* @param action The action to perform on each element received.
|
||||
* @param timeout The timeout in milliseconds. Defaults to 10 seconds.
|
||||
* @throws TimeoutException If the timeout is reached, provides context on what element
|
||||
* specifically.
|
||||
*/
|
||||
suspend fun <E> ReceiveChannel<E>.forEachWithTimeout(
|
||||
timeout: Long = 10000,
|
||||
action: suspend (E) -> Unit
|
||||
) {
|
||||
var exhausted = false
|
||||
var subsequent = false
|
||||
val handler: suspend () -> Unit = {
|
||||
val value = receiveCatching()
|
||||
if (value.isClosed) {
|
||||
exhausted = true
|
||||
} else {
|
||||
action(value.getOrThrow())
|
||||
}
|
||||
}
|
||||
while (!exhausted) {
|
||||
try {
|
||||
if (subsequent) {
|
||||
withTimeout(timeout) { handler() }
|
||||
} else {
|
||||
handler()
|
||||
subsequent = true
|
||||
}
|
||||
} catch (e: TimeoutCancellationException) {
|
||||
throw TimeoutException("Timed out receiving element from channel: $e")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue