refactor: Simplify ExtractStep with unified parallel processing flow

This commit is contained in:
Alexander Capehart (aider) 2025-02-25 16:09:04 -07:00
parent 94f8457d69
commit 0387400a4a
No known key found for this signature in database
GPG key ID: 37DBE3621FE9AD47

View file

@ -78,101 +78,78 @@ private class ExtractStepImpl(
val audioNodes = filterFlow.right val audioNodes = filterFlow.right
val playlistNodes = filterFlow.left.map { ExtractedMusic.Valid.Playlist(it) } val playlistNodes = filterFlow.left.map { ExtractedMusic.Valid.Playlist(it) }
val readDistributedFlow = audioNodes.distribute(8) // Distribute audio nodes for parallel processing
val cacheResults = val processingDistributedFlow = audioNodes.distribute(8)
readDistributedFlow.flows
// Process each audio file in parallel flows
val processedSongs =
processingDistributedFlow.flows
.map { flow -> .map { flow ->
flow flow
.map { wrap(it) { file -> cache.read(file, storedCovers) } } .mapNotNull { file ->
.flowOn(Dispatchers.IO) // First try to read from cache
.buffer(Channel.UNLIMITED) wrap(file) { f ->
} when (val result = cache.read(f, storedCovers)) {
.flattenMerge() is CacheResult.Hit -> ExtractedMusic.Valid.Song(result.song)
.buffer(Channel.UNLIMITED) is CacheResult.Miss -> {
val cacheFlow = // If not in cache, process the file
cacheResults.divert { val fd = withContext(Dispatchers.IO) {
when (it) { context.contentResolver.openFileDescriptor(f.uri, "r")
is CacheResult.Hit -> Divert.Left(it.song) } ?: return@wrap null
is CacheResult.Miss -> Divert.Right(it.file)
}
}
val cachedSongs = cacheFlow.left.map { ExtractedMusic.Valid.Song(it) }
val uncachedSongs = cacheFlow.right
val fds = try {
uncachedSongs // Extract metadata
.mapNotNull { val extractedMetadata = metadataExtractor.extract(f, fd)
wrap(it) { file ->
withContext(Dispatchers.IO) {
context.contentResolver.openFileDescriptor(file.uri, "r")?.let { fd ->
FileWith(file, fd)
}
}
}
}
.flowOn(Dispatchers.IO)
.buffer(Channel.UNLIMITED)
val metadata = if (extractedMetadata != null) {
fds.mapNotNull { fileWith -> // Parse tags
wrap(fileWith.file) { _ -> val tags = tagParser.parse(extractedMetadata)
metadataExtractor
.extract(fileWith.file, fileWith.with)
.let { FileWith(fileWith.file, it) }
.also { withContext(Dispatchers.IO) { fileWith.with.close() } }
}
}
.flowOn(Dispatchers.IO)
// Covers are pretty big, so cap the amount of parsed metadata in-memory to at most
// 8 to minimize GCs.
.buffer(8)
val extractedSongs = // Store cover if present
metadata val cover = extractedMetadata.cover?.let {
.map { fileWith -> storedCovers.write(it)
if (fileWith.with != null) { }
val tags = tagParser.parse(fileWith.with)
val cover = fileWith.with.cover?.let { storedCovers.write(it) } // Create and write the raw song to cache
RawSong(fileWith.file, fileWith.with.properties, tags, cover, addingMs) val rawSong = RawSong(f, extractedMetadata.properties, tags, cover, addingMs)
wrap(rawSong, cache::write)
ExtractedMusic.Valid.Song(rawSong)
} else { } else {
null ExtractedMusic.Invalid
}
} finally {
withContext(Dispatchers.IO) { fd.close() }
}
} }
} }
.flowOn(Dispatchers.IO)
.buffer(Channel.UNLIMITED)
val extractedFilter =
extractedSongs.divert {
if (it != null) Divert.Left(it) else Divert.Right(ExtractedMusic.Invalid)
} }
val write = extractedFilter.left
val invalid = extractedFilter.right
val writeDistributedFlow = write.distribute(8)
val writtenSongs =
writeDistributedFlow.flows
.map { flow ->
flow
.map {
wrap(it, cache::write)
ExtractedMusic.Valid.Song(it)
} }
.flowOn(Dispatchers.IO) .flowOn(Dispatchers.IO)
.buffer(Channel.UNLIMITED) .buffer(Channel.UNLIMITED)
} }
.flattenMerge() .flattenMerge()
.buffer(Channel.UNLIMITED)
// Separate valid songs from invalid ones
val processedFlow = processedSongs.divert {
when (it) {
is ExtractedMusic.Valid.Song -> Divert.Left(it)
is ExtractedMusic.Invalid -> Divert.Right(it)
else -> Divert.Right(ExtractedMusic.Invalid) // Should never happen
}
}
val validSongs = processedFlow.left
val invalidSongs = processedFlow.right
val merged = val merged =
merge( merge(
filterFlow.manager, filterFlow.manager,
readDistributedFlow.manager, processingDistributedFlow.manager,
cacheFlow.manager, processedFlow.manager,
cachedSongs, validSongs,
extractedFilter.manager, invalidSongs,
writeDistributedFlow.manager,
writtenSongs,
invalid,
playlistNodes) playlistNodes)
return merged.onCompletion { cache.finalize() } return merged.onCompletion { cache.finalize() }