music: avoid absurd thread creation in indexing

Instead of running MetadataRetriever multiple times, creating possibly
thousands of threads, instead just have one thread that loads multiple
MediaItems at once on a rolling basis using a patched MetadataRetriever.
This commit is contained in:
Alexander Capehart 2024-06-22 12:44:50 -06:00
parent 9083d2ae72
commit 043bc22eea
No known key found for this signature in database
GPG key ID: 37DBE3621FE9AD47
3 changed files with 250 additions and 132 deletions

View file

@ -26,9 +26,7 @@ import dagger.hilt.components.SingletonComponent
@Module @Module
@InstallIn(SingletonComponent::class) @InstallIn(SingletonComponent::class)
interface MetadataModule { interface MetadataModule {
@Binds fun tagExtractor(extractor: TagExtractorImpl): TagExtractor @Binds fun tagInterpreter(interpreter: TagInterpreterImpl): TagInterpreter
@Binds fun tagWorkerFactory(factory: TagWorkerFactoryImpl): TagWorker.Factory
@Binds fun audioPropertiesFactory(factory: AudioPropertiesFactoryImpl): AudioProperties.Factory @Binds fun audioPropertiesFactory(factory: AudioPropertiesFactoryImpl): AudioProperties.Factory
} }

View file

@ -1,5 +1,5 @@
/* /*
* Copyright (c) 2023 Auxio Project * Copyright (c) 2024 Auxio Project
* TagExtractor.kt is part of Auxio. * TagExtractor.kt is part of Auxio.
* *
* This program is free software: you can redistribute it and/or modify * This program is free software: you can redistribute it and/or modify
@ -18,85 +18,258 @@
package org.oxycblt.auxio.music.metadata package org.oxycblt.auxio.music.metadata
import androidx.media3.exoplayer.MetadataRetriever import android.os.Handler
import android.os.HandlerThread
import android.os.Message
import androidx.media3.common.C
import androidx.media3.common.MediaItem
import androidx.media3.common.Timeline
import androidx.media3.common.util.Clock
import androidx.media3.common.util.HandlerWrapper
import androidx.media3.exoplayer.LoadingInfo
import androidx.media3.exoplayer.analytics.PlayerId
import androidx.media3.exoplayer.source.MediaPeriod
import androidx.media3.exoplayer.source.MediaSource
import androidx.media3.exoplayer.source.MediaSource.Factory
import androidx.media3.exoplayer.source.TrackGroupArray
import androidx.media3.exoplayer.upstream.Allocator
import androidx.media3.exoplayer.upstream.DefaultAllocator
import com.google.common.util.concurrent.SettableFuture
import javax.inject.Inject import javax.inject.Inject
import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.yield import kotlinx.coroutines.yield
import org.oxycblt.auxio.music.device.RawSong import org.oxycblt.auxio.music.device.RawSong
import org.oxycblt.auxio.music.fs.toAudioUri
import org.oxycblt.auxio.util.forEachWithTimeout import org.oxycblt.auxio.util.forEachWithTimeout
import org.oxycblt.auxio.util.logD import org.oxycblt.auxio.util.logD
import org.oxycblt.auxio.util.logE
import org.oxycblt.auxio.util.sendWithTimeout import org.oxycblt.auxio.util.sendWithTimeout
/** class TagExtractor
* The extractor that leverages ExoPlayer's [MetadataRetriever] API to parse metadata. This is the @Inject
* last step in the music extraction process and is mostly responsible for papering over the bad constructor(private val mediaSourceFactory: Factory, private val tagInterpreter: TagInterpreter) {
* metadata that other extractors produce. suspend fun consume(incompleteSongs: Channel<RawSong>, completeSongs: Channel<RawSong>) {
* val worker = MetadataWorker(mediaSourceFactory, tagInterpreter)
* @author Alexander Capehart (OxygenCobalt) worker.start()
*/
interface TagExtractor {
/**
* Extract the metadata of songs from [incompleteSongs] and send them to [completeSongs]. Will
* terminate as soon as [incompleteSongs] is closed.
*
* @param incompleteSongs A [Channel] of incomplete songs to process.
* @param completeSongs A [Channel] to send completed songs to.
*/
suspend fun consume(incompleteSongs: Channel<RawSong>, completeSongs: Channel<RawSong>)
}
class TagExtractorImpl @Inject constructor(private val tagWorkerFactory: TagWorker.Factory) :
TagExtractor {
override suspend fun consume(
incompleteSongs: Channel<RawSong>,
completeSongs: Channel<RawSong>
) {
// We can parallelize MetadataRetriever Futures to work around it's speed issues,
// producing similar throughput's to other kinds of manual metadata extraction.
val tagWorkerPool: Array<TagWorker?> = arrayOfNulls(TASK_CAPACITY)
logD("Beginning primary extraction loop")
var songsIn = 0
incompleteSongs.forEachWithTimeout { incompleteRawSong -> incompleteSongs.forEachWithTimeout { incompleteRawSong ->
spin@ while (true) { spin@ while (!worker.push(incompleteRawSong)) {
for (i in tagWorkerPool.indices) { val completeRawSong = worker.pull()
val worker = tagWorkerPool[i] if (completeRawSong != null) {
if (worker != null) { completeSongs.sendWithTimeout(completeRawSong)
val completeRawSong = worker.poll() yield()
if (completeRawSong != null) { songsIn--
completeSongs.sendWithTimeout(completeRawSong) } else {
yield() continue
} else {
continue
}
}
tagWorkerPool[i] = tagWorkerFactory.create(incompleteRawSong)
break@spin
} }
} }
songsIn++
} }
logD("All incomplete songs exhausted, starting cleanup loop") logD("All incomplete songs exhausted, starting cleanup loop")
while (!worker.idle()) {
do { val completeRawSong = worker.pull()
var ongoingTasks = false if (completeRawSong != null) {
for (i in tagWorkerPool.indices) { completeSongs.sendWithTimeout(completeRawSong)
val task = tagWorkerPool[i] yield()
if (task != null) { songsIn--
val completeRawSong = task.poll() } else {
if (completeRawSong != null) { continue
completeSongs.sendWithTimeout(completeRawSong)
tagWorkerPool[i] = null
yield()
} else {
ongoingTasks = true
}
}
} }
} while (ongoingTasks) }
} worker.stop()
}
private companion object { }
const val TASK_CAPACITY = 8
private const val MESSAGE_CHECK_JOBS = 0
private const val MESSAGE_CONTINUE_LOADING = 1
private const val MESSAGE_RELEASE = 2
private const val MESSAGE_RELEASE_ALL = 3
private const val CHECK_INTERVAL_MS = 100
/**
* Patched version of Media3's MetadataRetriever that extracts metadata from several tracks at
* once on one thread. This is generally more efficient than stacking several threads at once.
*
* @author Media3 Team, Alexander Capehart (OxygenCobalt)
*/
private class MetadataWorker(
private val mediaSourceFactory: Factory,
private val tagInterpreter: TagInterpreter
) : Handler.Callback {
private val mediaSourceThread = HandlerThread("Auxio:ChunkedMetadataRetriever")
private val mediaSourceHandler: HandlerWrapper
private val jobs = Array<MetadataJob?>(8) { null }
private class MetadataJob(
val rawSong: RawSong,
val mediaItem: MediaItem,
val future: SettableFuture<TrackGroupArray>,
var mediaSource: MediaSource?,
var mediaPeriod: MediaPeriod?,
var mediaSourceCaller: MediaSourceCaller?
) {}
init {
mediaSourceThread.start()
mediaSourceHandler = Clock.DEFAULT.createHandler(mediaSourceThread.looper, this)
}
fun start() {
mediaSourceHandler.sendEmptyMessage(MESSAGE_CHECK_JOBS)
}
fun idle() = jobs.all { it == null }
fun stop() {
mediaSourceHandler.sendEmptyMessage(MESSAGE_RELEASE_ALL)
}
fun push(rawSong: RawSong): Boolean {
for (i in jobs.indices) {
if (jobs[i] == null) {
val uri =
requireNotNull(rawSong.mediaStoreId) { "Invalid raw: No URI" }.toAudioUri()
val job =
MetadataJob(
rawSong,
MediaItem.fromUri(uri),
SettableFuture.create<TrackGroupArray>(),
null,
null,
null)
jobs[i] = job
return true
}
}
return false
}
fun pull(): RawSong? {
for (i in jobs.indices) {
val job = jobs[i]
if (job != null && job.future.isDone) {
try {
tagInterpreter.interpret(job.rawSong, job.future.get())
} catch (e: Exception) {
logE("Failed to extract metadata")
logE(e.stackTraceToString())
}
jobs[i] = null
return job.rawSong
}
}
return null
}
override fun handleMessage(msg: Message): Boolean {
when (msg.what) {
MESSAGE_CHECK_JOBS -> {
for (job in jobs) {
if (job == null) continue
val currentMediaSource = job.mediaSource
val currentMediaSourceCaller = job.mediaSourceCaller
val mediaSource: MediaSource
val mediaSourceCaller: MediaSourceCaller
if (currentMediaSource != null && currentMediaSourceCaller != null) {
mediaSource = currentMediaSource
mediaSourceCaller = currentMediaSourceCaller
} else {
logD("new media source yahoo")
mediaSource = mediaSourceFactory.createMediaSource(job.mediaItem)
mediaSourceCaller = MediaSourceCaller(job)
mediaSource.prepareSource(
mediaSourceCaller, /* mediaTransferListener= */ null, PlayerId.UNSET)
job.mediaSource = mediaSource
job.mediaSourceCaller = mediaSourceCaller
}
try {
val mediaPeriod = job.mediaPeriod
if (mediaPeriod == null) {
mediaSource.maybeThrowSourceInfoRefreshError()
} else {
mediaPeriod.maybeThrowPrepareError()
}
} catch (e: Exception) {
logE("Failed to extract MediaSource")
logE(e.stackTraceToString())
job.mediaPeriod?.let(mediaSource::releasePeriod)
mediaSource.releaseSource(mediaSourceCaller)
job.future.setException(e)
}
}
mediaSourceHandler.sendEmptyMessageDelayed(
MESSAGE_CHECK_JOBS, /* delayMs= */ CHECK_INTERVAL_MS)
return true
}
MESSAGE_CONTINUE_LOADING -> {
checkNotNull((msg.obj as MetadataJob).mediaPeriod)
.continueLoading(LoadingInfo.Builder().setPlaybackPositionUs(0).build())
return true
}
MESSAGE_RELEASE -> {
val job = msg.obj as MetadataJob
job.mediaPeriod?.let { job.mediaSource?.releasePeriod(it) }
job.mediaSourceCaller?.let { job.mediaSource?.releaseSource(it) }
return true
}
MESSAGE_RELEASE_ALL -> {
for (job in jobs) {
if (job == null) continue
job.mediaPeriod?.let { job.mediaSource?.releasePeriod(it) }
job.mediaSourceCaller?.let { job.mediaSource?.releaseSource(it) }
}
mediaSourceHandler.removeCallbacksAndMessages(/* token= */ null)
mediaSourceThread.quit()
return true
}
else -> return false
}
}
private inner class MediaSourceCaller(private val job: MetadataJob) :
MediaSource.MediaSourceCaller {
private val mediaPeriodCallback: MediaPeriodCallback = MediaPeriodCallback(job)
private val allocator: Allocator =
DefaultAllocator(
/* trimOnReset= */ true,
/* individualAllocationSize= */ C.DEFAULT_BUFFER_SEGMENT_SIZE)
private var mediaPeriodCreated = false
override fun onSourceInfoRefreshed(source: MediaSource, timeline: Timeline) {
if (mediaPeriodCreated) {
// Ignore dynamic updates.
return
}
logD("yay source created")
mediaPeriodCreated = true
val mediaPeriod =
source.createPeriod(
MediaSource.MediaPeriodId(timeline.getUidOfPeriod(/* periodIndex= */ 0)),
allocator,
/* startPositionUs= */ 0)
job.mediaPeriod = mediaPeriod
mediaPeriod.prepare(mediaPeriodCallback, /* positionUs= */ 0)
}
private inner class MediaPeriodCallback(private val job: MetadataJob) :
MediaPeriod.Callback {
override fun onPrepared(mediaPeriod: MediaPeriod) {
job.future.set(mediaPeriod.getTrackGroups())
mediaSourceHandler.obtainMessage(MESSAGE_RELEASE, job).sendToTarget()
}
@Override
override fun onContinueLoadingRequested(source: MediaPeriod) {
mediaSourceHandler.obtainMessage(MESSAGE_CONTINUE_LOADING, job).sendToTarget()
}
}
} }
} }

View file

@ -1,6 +1,6 @@
/* /*
* Copyright (c) 2023 Auxio Project * Copyright (c) 2023 Auxio Project
* TagWorker.kt is part of Auxio. * TagInterpreter.kt is part of Auxio.
* *
* This program is free software: you can redistribute it and/or modify * This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by * it under the terms of the GNU General Public License as published by
@ -19,19 +19,14 @@
package org.oxycblt.auxio.music.metadata package org.oxycblt.auxio.music.metadata
import androidx.core.text.isDigitsOnly import androidx.core.text.isDigitsOnly
import androidx.media3.common.MediaItem
import androidx.media3.exoplayer.MetadataRetriever import androidx.media3.exoplayer.MetadataRetriever
import androidx.media3.exoplayer.source.MediaSource
import androidx.media3.exoplayer.source.TrackGroupArray import androidx.media3.exoplayer.source.TrackGroupArray
import java.util.concurrent.Future
import javax.inject.Inject import javax.inject.Inject
import kotlin.math.min import kotlin.math.min
import org.oxycblt.auxio.image.extractor.CoverExtractor import org.oxycblt.auxio.image.extractor.CoverExtractor
import org.oxycblt.auxio.music.device.RawSong import org.oxycblt.auxio.music.device.RawSong
import org.oxycblt.auxio.music.fs.toAudioUri
import org.oxycblt.auxio.music.info.Date import org.oxycblt.auxio.music.info.Date
import org.oxycblt.auxio.util.logD import org.oxycblt.auxio.util.logD
import org.oxycblt.auxio.util.logW
import org.oxycblt.auxio.util.nonZeroOrNull import org.oxycblt.auxio.util.nonZeroOrNull
/** /**
@ -40,70 +35,24 @@ import org.oxycblt.auxio.util.nonZeroOrNull
* *
* @author Alexander Capehart (OxygenCobalt) * @author Alexander Capehart (OxygenCobalt)
*/ */
interface TagWorker { interface TagInterpreter {
/** /**
* Poll to see if this worker is done processing. * Poll to see if this worker is done processing.
* *
* @return A completed [RawSong] if done, null otherwise. * @return A completed [RawSong] if done, null otherwise.
*/ */
fun poll(): RawSong? fun interpret(rawSong: RawSong, trackGroupArray: TrackGroupArray)
/** Factory for new [TagWorker] jobs. */
interface Factory {
/**
* Create a new [TagWorker] to complete the given [RawSong].
*
* @param rawSong The [RawSong] to assign a new [TagWorker] to.
* @return A new [TagWorker] wrapping the given [RawSong].
*/
fun create(rawSong: RawSong): TagWorker
}
} }
class TagWorkerFactoryImpl class TagInterpreterImpl @Inject constructor(private val coverExtractor: CoverExtractor) :
@Inject TagInterpreter {
constructor( override fun interpret(rawSong: RawSong, trackGroupArray: TrackGroupArray) {
private val mediaSourceFactory: MediaSource.Factory, val format = trackGroupArray.get(0).getFormat(0)
private val coverExtractor: CoverExtractor
) : TagWorker.Factory {
override fun create(rawSong: RawSong): TagWorker =
// Note that we do not leverage future callbacks. This is because errors in the
// (highly fallible) extraction process will not bubble up to Indexer when a
// listener is used, instead crashing the app entirely.
TagWorkerImpl(
rawSong,
MetadataRetriever.retrieveMetadata(
mediaSourceFactory,
MediaItem.fromUri(
requireNotNull(rawSong.mediaStoreId) { "Invalid raw: No id" }.toAudioUri())),
coverExtractor)
}
private class TagWorkerImpl(
private val rawSong: RawSong,
private val future: Future<TrackGroupArray>,
private val coverExtractor: CoverExtractor
) : TagWorker {
override fun poll(): RawSong? {
if (!future.isDone) {
// Not done yet, nothing to do.
return null
}
val format =
try {
future.get()[0].getFormat(0)
} catch (e: Exception) {
logW("Unable to extract metadata for ${rawSong.name}")
logW(e.stackTraceToString())
return rawSong
}
val metadata = format.metadata val metadata = format.metadata
if (metadata != null) { if (metadata != null) {
val textTags = TextTags(metadata) val textTags = TextTags(metadata)
populateWithId3v2(textTags.id3v2) populateWithId3v2(rawSong, textTags.id3v2)
populateWithVorbis(textTags.vorbis) populateWithVorbis(rawSong, textTags.vorbis)
coverExtractor.findCoverDataInMetadata(metadata)?.use { coverExtractor.findCoverDataInMetadata(metadata)?.use {
val available = it.available() val available = it.available()
@ -147,11 +96,9 @@ private class TagWorkerImpl(
} else { } else {
logD("No metadata could be extracted for ${rawSong.name}") logD("No metadata could be extracted for ${rawSong.name}")
} }
return rawSong
} }
private fun populateWithId3v2(textFrames: Map<String, List<String>>) { private fun populateWithId3v2(rawSong: RawSong, textFrames: Map<String, List<String>>) {
// Song // Song
(textFrames["TXXX:musicbrainz release track id"] (textFrames["TXXX:musicbrainz release track id"]
?: textFrames["TXXX:musicbrainz_releasetrackid"]) ?: textFrames["TXXX:musicbrainz_releasetrackid"])
@ -278,7 +225,7 @@ private class TagWorkerImpl(
} }
} }
private fun populateWithVorbis(comments: Map<String, List<String>>) { private fun populateWithVorbis(rawSong: RawSong, comments: Map<String, List<String>>) {
// Song // Song
(comments["musicbrainz_releasetrackid"] ?: comments["musicbrainz release track id"])?.let { (comments["musicbrainz_releasetrackid"] ?: comments["musicbrainz release track id"])?.let {
rawSong.musicBrainzId = it.first() rawSong.musicBrainzId = it.first()