diff --git a/musikr/src/main/java/org/oxycblt/musikr/pipeline/ExtractStep.kt b/musikr/src/main/java/org/oxycblt/musikr/pipeline/ExtractStep.kt index 1d204fe5a..f9fe7f612 100644 --- a/musikr/src/main/java/org/oxycblt/musikr/pipeline/ExtractStep.kt +++ b/musikr/src/main/java/org/oxycblt/musikr/pipeline/ExtractStep.kt @@ -78,101 +78,78 @@ private class ExtractStepImpl( val audioNodes = filterFlow.right val playlistNodes = filterFlow.left.map { ExtractedMusic.Valid.Playlist(it) } - val readDistributedFlow = audioNodes.distribute(8) - val cacheResults = - readDistributedFlow.flows + // Distribute audio nodes for parallel processing + val processingDistributedFlow = audioNodes.distribute(8) + + // Process each audio file in parallel flows + val processedSongs = + processingDistributedFlow.flows .map { flow -> flow - .map { wrap(it) { file -> cache.read(file, storedCovers) } } - .flowOn(Dispatchers.IO) - .buffer(Channel.UNLIMITED) - } - .flattenMerge() - .buffer(Channel.UNLIMITED) - val cacheFlow = - cacheResults.divert { - when (it) { - is CacheResult.Hit -> Divert.Left(it.song) - is CacheResult.Miss -> Divert.Right(it.file) - } - } - val cachedSongs = cacheFlow.left.map { ExtractedMusic.Valid.Song(it) } - val uncachedSongs = cacheFlow.right - - val fds = - uncachedSongs - .mapNotNull { - wrap(it) { file -> - withContext(Dispatchers.IO) { - context.contentResolver.openFileDescriptor(file.uri, "r")?.let { fd -> - FileWith(file, fd) + .mapNotNull { file -> + // First try to read from cache + wrap(file) { f -> + when (val result = cache.read(f, storedCovers)) { + is CacheResult.Hit -> ExtractedMusic.Valid.Song(result.song) + is CacheResult.Miss -> { + // If not in cache, process the file + val fd = withContext(Dispatchers.IO) { + context.contentResolver.openFileDescriptor(f.uri, "r") + } ?: return@wrap null + + try { + // Extract metadata + val extractedMetadata = metadataExtractor.extract(f, fd) + + if (extractedMetadata != null) { + // Parse tags + val tags = tagParser.parse(extractedMetadata) + + // Store cover if present + val cover = extractedMetadata.cover?.let { + storedCovers.write(it) + } + + // Create and write the raw song to cache + val rawSong = RawSong(f, extractedMetadata.properties, tags, cover, addingMs) + wrap(rawSong, cache::write) + + ExtractedMusic.Valid.Song(rawSong) + } else { + ExtractedMusic.Invalid + } + } finally { + withContext(Dispatchers.IO) { fd.close() } + } + } + } } } - } - } - .flowOn(Dispatchers.IO) - .buffer(Channel.UNLIMITED) - - val metadata = - fds.mapNotNull { fileWith -> - wrap(fileWith.file) { _ -> - metadataExtractor - .extract(fileWith.file, fileWith.with) - .let { FileWith(fileWith.file, it) } - .also { withContext(Dispatchers.IO) { fileWith.with.close() } } - } - } - .flowOn(Dispatchers.IO) - // Covers are pretty big, so cap the amount of parsed metadata in-memory to at most - // 8 to minimize GCs. - .buffer(8) - - val extractedSongs = - metadata - .map { fileWith -> - if (fileWith.with != null) { - val tags = tagParser.parse(fileWith.with) - val cover = fileWith.with.cover?.let { storedCovers.write(it) } - RawSong(fileWith.file, fileWith.with.properties, tags, cover, addingMs) - } else { - null - } - } - .flowOn(Dispatchers.IO) - .buffer(Channel.UNLIMITED) - - val extractedFilter = - extractedSongs.divert { - if (it != null) Divert.Left(it) else Divert.Right(ExtractedMusic.Invalid) - } - - val write = extractedFilter.left - val invalid = extractedFilter.right - - val writeDistributedFlow = write.distribute(8) - val writtenSongs = - writeDistributedFlow.flows - .map { flow -> - flow - .map { - wrap(it, cache::write) - ExtractedMusic.Valid.Song(it) - } .flowOn(Dispatchers.IO) .buffer(Channel.UNLIMITED) } .flattenMerge() + .buffer(Channel.UNLIMITED) + + // Separate valid songs from invalid ones + val processedFlow = processedSongs.divert { + when (it) { + is ExtractedMusic.Valid.Song -> Divert.Left(it) + is ExtractedMusic.Invalid -> Divert.Right(it) + else -> Divert.Right(ExtractedMusic.Invalid) // Should never happen + } + } + + val validSongs = processedFlow.left + val invalidSongs = processedFlow.right val merged = merge( filterFlow.manager, - readDistributedFlow.manager, - cacheFlow.manager, - cachedSongs, - extractedFilter.manager, - writeDistributedFlow.manager, - writtenSongs, - invalid, + processingDistributedFlow.manager, + processedFlow.manager, + validSongs, + invalidSongs, playlistNodes) return merged.onCompletion { cache.finalize() }