musikr: parallelize all extraction

This commit is contained in:
Alexander Capehart 2025-02-25 16:11:09 -07:00
parent 0387400a4a
commit 584af83a07
No known key found for this signature in database
GPG key ID: 37DBE3621FE9AD47

View file

@ -78,50 +78,70 @@ private class ExtractStepImpl(
val audioNodes = filterFlow.right val audioNodes = filterFlow.right
val playlistNodes = filterFlow.left.map { ExtractedMusic.Valid.Playlist(it) } val playlistNodes = filterFlow.left.map { ExtractedMusic.Valid.Playlist(it) }
// Distribute audio nodes for parallel processing // First distribute audio nodes for parallel cache reading
val processingDistributedFlow = audioNodes.distribute(8) val readDistributedFlow = audioNodes.distribute(8)
val cacheResults =
readDistributedFlow.flows
.map { flow ->
flow
.map { wrap(it) { file -> cache.read(file, storedCovers) } }
.flowOn(Dispatchers.IO)
.buffer(Channel.UNLIMITED)
}
.flattenMerge()
.buffer(Channel.UNLIMITED)
// Process each audio file in parallel flows // Divert cache hits and misses
val cacheFlow =
cacheResults.divert {
when (it) {
is CacheResult.Hit -> Divert.Left(it.song)
is CacheResult.Miss -> Divert.Right(it.file)
}
}
// Cache hits can be directly converted to valid songs
val cachedSongs = cacheFlow.left.map { ExtractedMusic.Valid.Song(it) }
// Process uncached files in parallel
val uncachedFiles = cacheFlow.right
val processingDistributedFlow = uncachedFiles.distribute(8)
// Process each uncached file in parallel flows
val processedSongs = val processedSongs =
processingDistributedFlow.flows processingDistributedFlow.flows
.map { flow -> .map { flow ->
flow flow
.mapNotNull { file -> .mapNotNull { file ->
// First try to read from cache
wrap(file) { f -> wrap(file) { f ->
when (val result = cache.read(f, storedCovers)) { // Open file descriptor
is CacheResult.Hit -> ExtractedMusic.Valid.Song(result.song) val fd = withContext(Dispatchers.IO) {
is CacheResult.Miss -> { context.contentResolver.openFileDescriptor(f.uri, "r")
// If not in cache, process the file } ?: return@wrap null
val fd = withContext(Dispatchers.IO) {
context.contentResolver.openFileDescriptor(f.uri, "r")
} ?: return@wrap null
try { try {
// Extract metadata // Extract metadata
val extractedMetadata = metadataExtractor.extract(f, fd) val extractedMetadata = metadataExtractor.extract(f, fd)
if (extractedMetadata != null) { if (extractedMetadata != null) {
// Parse tags // Parse tags
val tags = tagParser.parse(extractedMetadata) val tags = tagParser.parse(extractedMetadata)
// Store cover if present // Store cover if present
val cover = extractedMetadata.cover?.let { val cover = extractedMetadata.cover?.let {
storedCovers.write(it) storedCovers.write(it)
}
// Create and write the raw song to cache
val rawSong = RawSong(f, extractedMetadata.properties, tags, cover, addingMs)
wrap(rawSong, cache::write)
ExtractedMusic.Valid.Song(rawSong)
} else {
ExtractedMusic.Invalid
}
} finally {
withContext(Dispatchers.IO) { fd.close() }
} }
// Create and write the raw song to cache
val rawSong = RawSong(f, extractedMetadata.properties, tags, cover, addingMs)
wrap(rawSong, cache::write)
ExtractedMusic.Valid.Song(rawSong)
} else {
ExtractedMusic.Invalid
} }
} finally {
withContext(Dispatchers.IO) { fd.close() }
} }
} }
} }
@ -131,24 +151,27 @@ private class ExtractStepImpl(
.flattenMerge() .flattenMerge()
.buffer(Channel.UNLIMITED) .buffer(Channel.UNLIMITED)
// Separate valid songs from invalid ones // Separate valid processed songs from invalid ones
val processedFlow = processedSongs.divert { val processedFlow = processedSongs.divert {
when (it) { when (it) {
is ExtractedMusic.Valid.Song -> Divert.Left(it) is ExtractedMusic.Valid.Song -> Divert.Left(it)
is ExtractedMusic.Invalid -> Divert.Right(it) is ExtractedMusic.Invalid -> Divert.Right(it)
else -> Divert.Right(ExtractedMusic.Invalid) // Should never happen else -> Divert.Right(ExtractedMusic.Invalid)
} }
} }
val validSongs = processedFlow.left val processedValidSongs = processedFlow.left
val invalidSongs = processedFlow.right val invalidSongs = processedFlow.right
val merged = val merged =
merge( merge(
filterFlow.manager, filterFlow.manager,
readDistributedFlow.manager,
cacheFlow.manager,
processingDistributedFlow.manager, processingDistributedFlow.manager,
processedFlow.manager, processedFlow.manager,
validSongs, cachedSongs,
processedValidSongs,
invalidSongs, invalidSongs,
playlistNodes) playlistNodes)