diff --git a/musikr/src/main/java/org/oxycblt/musikr/pipeline/ExtractStep.kt b/musikr/src/main/java/org/oxycblt/musikr/pipeline/ExtractStep.kt index f9fe7f612..8d001efec 100644 --- a/musikr/src/main/java/org/oxycblt/musikr/pipeline/ExtractStep.kt +++ b/musikr/src/main/java/org/oxycblt/musikr/pipeline/ExtractStep.kt @@ -78,50 +78,70 @@ private class ExtractStepImpl( val audioNodes = filterFlow.right val playlistNodes = filterFlow.left.map { ExtractedMusic.Valid.Playlist(it) } - // Distribute audio nodes for parallel processing - val processingDistributedFlow = audioNodes.distribute(8) + // First distribute audio nodes for parallel cache reading + val readDistributedFlow = audioNodes.distribute(8) + val cacheResults = + readDistributedFlow.flows + .map { flow -> + flow + .map { wrap(it) { file -> cache.read(file, storedCovers) } } + .flowOn(Dispatchers.IO) + .buffer(Channel.UNLIMITED) + } + .flattenMerge() + .buffer(Channel.UNLIMITED) + + // Divert cache hits and misses + val cacheFlow = + cacheResults.divert { + when (it) { + is CacheResult.Hit -> Divert.Left(it.song) + is CacheResult.Miss -> Divert.Right(it.file) + } + } + + // Cache hits can be directly converted to valid songs + val cachedSongs = cacheFlow.left.map { ExtractedMusic.Valid.Song(it) } - // Process each audio file in parallel flows + // Process uncached files in parallel + val uncachedFiles = cacheFlow.right + val processingDistributedFlow = uncachedFiles.distribute(8) + + // Process each uncached file in parallel flows val processedSongs = processingDistributedFlow.flows .map { flow -> flow .mapNotNull { file -> - // First try to read from cache wrap(file) { f -> - when (val result = cache.read(f, storedCovers)) { - is CacheResult.Hit -> ExtractedMusic.Valid.Song(result.song) - is CacheResult.Miss -> { - // If not in cache, process the file - val fd = withContext(Dispatchers.IO) { - context.contentResolver.openFileDescriptor(f.uri, "r") - } ?: return@wrap null + // Open file descriptor + val fd = withContext(Dispatchers.IO) { + context.contentResolver.openFileDescriptor(f.uri, "r") + } ?: return@wrap null + + try { + // Extract metadata + val extractedMetadata = metadataExtractor.extract(f, fd) + + if (extractedMetadata != null) { + // Parse tags + val tags = tagParser.parse(extractedMetadata) - try { - // Extract metadata - val extractedMetadata = metadataExtractor.extract(f, fd) - - if (extractedMetadata != null) { - // Parse tags - val tags = tagParser.parse(extractedMetadata) - - // Store cover if present - val cover = extractedMetadata.cover?.let { - storedCovers.write(it) - } - - // Create and write the raw song to cache - val rawSong = RawSong(f, extractedMetadata.properties, tags, cover, addingMs) - wrap(rawSong, cache::write) - - ExtractedMusic.Valid.Song(rawSong) - } else { - ExtractedMusic.Invalid - } - } finally { - withContext(Dispatchers.IO) { fd.close() } + // Store cover if present + val cover = extractedMetadata.cover?.let { + storedCovers.write(it) } + + // Create and write the raw song to cache + val rawSong = RawSong(f, extractedMetadata.properties, tags, cover, addingMs) + wrap(rawSong, cache::write) + + ExtractedMusic.Valid.Song(rawSong) + } else { + ExtractedMusic.Invalid } + } finally { + withContext(Dispatchers.IO) { fd.close() } } } } @@ -131,24 +151,27 @@ private class ExtractStepImpl( .flattenMerge() .buffer(Channel.UNLIMITED) - // Separate valid songs from invalid ones + // Separate valid processed songs from invalid ones val processedFlow = processedSongs.divert { when (it) { is ExtractedMusic.Valid.Song -> Divert.Left(it) is ExtractedMusic.Invalid -> Divert.Right(it) - else -> Divert.Right(ExtractedMusic.Invalid) // Should never happen + else -> Divert.Right(ExtractedMusic.Invalid) } } - val validSongs = processedFlow.left + val processedValidSongs = processedFlow.left val invalidSongs = processedFlow.right val merged = merge( filterFlow.manager, + readDistributedFlow.manager, + cacheFlow.manager, processingDistributedFlow.manager, processedFlow.manager, - validSongs, + cachedSongs, + processedValidSongs, invalidSongs, playlistNodes)