music: document indexing process

Further document the music indexing process.

It's so aggressively parallelized as to require some more extensive
comments to actually make it clear what's going on.
This commit is contained in:
Alexander Capehart 2023-06-24 10:34:31 -06:00
parent ed7b4e1410
commit 992457f361
No known key found for this signature in database
GPG key ID: 37DBE3621FE9AD47
2 changed files with 95 additions and 44 deletions

View file

@ -12,6 +12,7 @@
#### What's Fixed #### What's Fixed
- Fixed an issue where the queue sheet would not collapse when scrolling - Fixed an issue where the queue sheet would not collapse when scrolling
the song list in some cases the song list in some cases
- Fixed music loading hanging if it encountered an error in certain places
## 3.1.2 ## 3.1.2

View file

@ -337,49 +337,56 @@ constructor(
} }
override fun index(worker: MusicRepository.IndexingWorker, withCache: Boolean) = override fun index(worker: MusicRepository.IndexingWorker, withCache: Boolean) =
worker.scope.launch { worker.scope.launch { indexWrapper(worker, withCache) }
try {
val start = System.currentTimeMillis() private suspend fun indexWrapper(worker: MusicRepository.IndexingWorker, withCache: Boolean) {
indexImpl(worker, withCache) try {
logD( indexImpl(worker, withCache)
"Music indexing completed successfully in " + } catch (e: CancellationException) {
"${System.currentTimeMillis() - start}ms") // Got cancelled, propagate upwards to top-level co-routine.
} catch (e: CancellationException) { logD("Loading routine was cancelled")
// Got cancelled, propagate upwards to top-level co-routine. throw e
logD("Loading routine was cancelled") } catch (e: Exception) {
throw e // Music loading process failed due to something we have not handled.
} catch (e: Exception) { // TODO: Still want to display this error eventually
// Music loading process failed due to something we have not handled. logE("Music indexing failed")
logE("Music indexing failed") logE(e.stackTraceToString())
logE(e.stackTraceToString()) emitIndexingCompletion(e)
emitIndexingCompletion(e)
}
} }
}
private suspend fun indexImpl(worker: MusicRepository.IndexingWorker, withCache: Boolean) { private suspend fun indexImpl(worker: MusicRepository.IndexingWorker, withCache: Boolean) {
val start = System.currentTimeMillis()
// Make sure we have permissions before going forward. Theoretically this would be better
// done at the UI level, but that intertwines logic and display too much.
if (ContextCompat.checkSelfPermission(worker.context, PERMISSION_READ_AUDIO) == if (ContextCompat.checkSelfPermission(worker.context, PERMISSION_READ_AUDIO) ==
PackageManager.PERMISSION_DENIED) { PackageManager.PERMISSION_DENIED) {
logE("Permissions were not granted") logE("Permissions were not granted")
// No permissions, signal that we can't do anything.
throw NoAudioPermissionException() throw NoAudioPermissionException()
} }
// Start initializing the extractors. Use an indeterminate state, as there is no ETA on // Begin with querying MediaStore and the music cache. The former is needed for Auxio
// how long a media database query will take. // to figure out what songs are (probably) on the device, and the latter will be needed
emitIndexingProgress(IndexingProgress.Indeterminate) // for discovery (described later). These have no shared state, so they are done in
// parallel.
// Do the initial query of the cache and media databases in parallel.
logD("Starting MediaStore query") logD("Starting MediaStore query")
emitIndexingProgress(IndexingProgress.Indeterminate)
val mediaStoreQueryJob = val mediaStoreQueryJob =
worker.scope.async { worker.scope.async {
val query = val query =
try { try {
mediaStoreExtractor.query() mediaStoreExtractor.query()
} catch (e: Exception) { } catch (e: Exception) {
// Normally, errors in an async call immediately bubble up to the Looper
// and crash the app. Thus, we have to wrap any error into a Result
// and then manually forward it to the try block that indexImpl is
// called from.
return@async Result.failure(e) return@async Result.failure(e)
} }
Result.success(query) Result.success(query)
} }
// Since this main thread is a co-routine, we can do operations in parallel in a way
// identical to calling async.
val cache = val cache =
if (withCache) { if (withCache) {
logD("Reading cache") logD("Reading cache")
@ -390,27 +397,36 @@ constructor(
logD("Awaiting MediaStore query") logD("Awaiting MediaStore query")
val query = mediaStoreQueryJob.await().getOrThrow() val query = mediaStoreQueryJob.await().getOrThrow()
// Now start processing the queried song information in parallel. Songs that can't be // We now have all the information required to start the "discovery" process. This
// received from the cache are consisted incomplete and pushed to a separate channel // is the point at which Auxio starts scanning each file given from MediaStore and
// that will eventually be processed into completed raw songs. // transforming it into a music library. MediaStore normally
logD("Starting song discovery") logD("Starting discovery")
val completeSongs = Channel<RawSong>(Channel.UNLIMITED) val incompleteSongs = Channel<RawSong>(Channel.UNLIMITED) // Not fully populated w/metadata
val incompleteSongs = Channel<RawSong>(Channel.UNLIMITED) val completeSongs = Channel<RawSong>(Channel.UNLIMITED) // Populated with quality metadata
val processedSongs = Channel<RawSong>(Channel.UNLIMITED) val processedSongs = Channel<RawSong>(Channel.UNLIMITED) // Transformed into SongImpl
logD("Started MediaStore discovery")
// MediaStoreExtractor discovers all music on the device, and forwards them to either
// DeviceLibrary if cached metadata exists for it, or TagExtractor if cached metadata
// does not exist. In the latter situation, it also applies it's own (inferior) metadata.
logD("Starting MediaStore discovery")
val mediaStoreJob = val mediaStoreJob =
worker.scope.async { worker.scope.async {
try { try {
mediaStoreExtractor.consume(query, cache, incompleteSongs, completeSongs) mediaStoreExtractor.consume(query, cache, incompleteSongs, completeSongs)
} catch (e: Exception) { } catch (e: Exception) {
// To prevent a deadlock, we want to close the channel with an exception
// to cascade to and cancel all other routines before finally bubbling up
// to the main extractor loop.
incompleteSongs.close(e) incompleteSongs.close(e)
return@async return@async
} }
incompleteSongs.close() incompleteSongs.close()
} }
logD("Started ExoPlayer tag extraction") // TagExtractor takes the incomplete songs from MediaStoreExtractor, parses up-to-date
val metadataJob = // metadata for them, and then forwards it to DeviceLibrary.
logD("Starting tag extraction")
val tagJob =
worker.scope.async { worker.scope.async {
try { try {
tagExtractor.consume(incompleteSongs, completeSongs) tagExtractor.consume(incompleteSongs, completeSongs)
@ -421,6 +437,8 @@ constructor(
completeSongs.close() completeSongs.close()
} }
// DeviceLibrary constructs music parent instances as song information is provided,
// and then forwards them to the primary loading loop.
logD("Starting DeviceLibrary creation") logD("Starting DeviceLibrary creation")
val deviceLibraryJob = val deviceLibraryJob =
worker.scope.async(Dispatchers.Default) { worker.scope.async(Dispatchers.Default) {
@ -435,26 +453,43 @@ constructor(
Result.success(deviceLibrary) Result.success(deviceLibrary)
} }
// Await completed raw songs as they are processed. // We could keep track of a total here, but we also need to collate this RawSong information
// for when we write the cache later on in the finalization step.
val rawSongs = LinkedList<RawSong>() val rawSongs = LinkedList<RawSong>()
for (rawSong in processedSongs) { for (rawSong in processedSongs) {
rawSongs.add(rawSong) rawSongs.add(rawSong)
// Since discovery takes up the bulk of the music loading process, we switch to
// indicating a defined amount of loaded songs in comparison to the projected amount
// of songs that were queried.
emitIndexingProgress(IndexingProgress.Songs(rawSongs.size, query.projectedTotal)) emitIndexingProgress(IndexingProgress.Songs(rawSongs.size, query.projectedTotal))
} }
logD("Awaiting discovery completion")
// These should be no-ops, but we need the error state to see if we should keep going.
mediaStoreJob.await()
metadataJob.await()
// This shouldn't occur, but keep them around just in case there's a regression.
// Note that DeviceLibrary might still actually be doing work (specifically parent
// processing), so we don't check if it's deadlocked.
check(!mediaStoreJob.isActive) { "MediaStore discovery is deadlocked" }
check(!tagJob.isActive) { "Tag extraction is deadlocked" }
// Deliberately done after the involved initialization step to make it less likely
// that the short-circuit occurs so quickly as to break the UI.
// TODO: Do not error, instead just wipe the entire library.
if (rawSongs.isEmpty()) { if (rawSongs.isEmpty()) {
logE("Music library was empty") logE("Music library was empty")
throw NoMusicException() throw NoMusicException()
} }
// Successfully loaded the library, now save the cache and read playlist information // Now that the library is effectively loaded, we can start the finalization step, which
// in parallel. // involves writing new cache information and creating more music data that is derived
// from the library (e.g playlists)
logD("Discovered ${rawSongs.size} songs, starting finalization") logD("Discovered ${rawSongs.size} songs, starting finalization")
// We have no idea how long the cache will take, and the playlist construction
// will be too fast to indicate, so switch back to an indeterminate state.
emitIndexingProgress(IndexingProgress.Indeterminate) emitIndexingProgress(IndexingProgress.Indeterminate)
// The UserLibrary job is split into a query and construction step, a la MediaStore.
// This way, we can start working on playlists even as DeviceLibrary might still be
// working on parent information.
logD("Starting UserLibrary query") logD("Starting UserLibrary query")
val userLibraryQueryJob = val userLibraryQueryJob =
worker.scope.async { worker.scope.async {
@ -466,10 +501,17 @@ constructor(
} }
Result.success(rawPlaylists) Result.success(rawPlaylists)
} }
// The cache might not exist, or we might have encountered a song not present in it.
// Both situations require us to rewrite the cache in bulk. This is also done parallel
// since the playlist read will probably take some time.
// TODO: Read/write from the cache incrementally instead of in bulk?
if (cache == null || cache.invalidated) { if (cache == null || cache.invalidated) {
logD("Writing cache [why=${cache?.invalidated}]") logD("Writing cache [why=${cache?.invalidated}]")
cacheRepository.writeCache(rawSongs) cacheRepository.writeCache(rawSongs)
} }
// Create UserLibrary once we finally get the required components for it.
logD("Awaiting UserLibrary query") logD("Awaiting UserLibrary query")
val rawPlaylists = userLibraryQueryJob.await().getOrThrow() val rawPlaylists = userLibraryQueryJob.await().getOrThrow()
logD("Awaiting DeviceLibrary creation") logD("Awaiting DeviceLibrary creation")
@ -477,12 +519,14 @@ constructor(
logD("Starting UserLibrary creation") logD("Starting UserLibrary creation")
val userLibrary = userLibraryFactory.create(rawPlaylists, deviceLibrary) val userLibrary = userLibraryFactory.create(rawPlaylists, deviceLibrary)
logD("Successfully indexed music library [device=$deviceLibrary user=$userLibrary]")
emitIndexingCompletion(null)
val deviceLibraryChanged: Boolean val deviceLibraryChanged: Boolean
val userLibraryChanged: Boolean val userLibraryChanged: Boolean
// We want to make sure that all reads and writes are synchronized due to the sheer
// amount of consumers of MusicRepository.
// TODO: Would Atomics not be a better fit here?
synchronized(this) { synchronized(this) {
// It's possible that this reload might have changed nothing, so make sure that
// hasn't happened before dispatching a change to all consumers.
deviceLibraryChanged = this.deviceLibrary != deviceLibrary deviceLibraryChanged = this.deviceLibrary != deviceLibrary
userLibraryChanged = this.userLibrary != userLibrary userLibraryChanged = this.userLibrary != userLibrary
if (!deviceLibraryChanged && !userLibraryChanged) { if (!deviceLibraryChanged && !userLibraryChanged) {
@ -494,7 +538,13 @@ constructor(
this.userLibrary = userLibrary this.userLibrary = userLibrary
} }
// Listeners are expecting a callback in the main thread, switch // We are finally done. Indicate that loading is no longer occurring, and dispatch the
// results of the loading process to consumers.
logD("Successfully indexed music library [device=$deviceLibrary " +
"user=$userLibrary time=${System.currentTimeMillis() - start}]")
emitIndexingCompletion(null)
// Consumers expect their updates to be on the main thread (notably PlaybackService),
// so switch to it.
withContext(Dispatchers.Main) { withContext(Dispatchers.Main) {
dispatchLibraryChange(deviceLibraryChanged, userLibraryChanged) dispatchLibraryChange(deviceLibraryChanged, userLibraryChanged)
} }