music: re-add event handling
Kinda scuffed, will probably split into low-level events and do the MusicRepository interpret step in Indexer.
This commit is contained in:
parent
ba29905aa6
commit
2f9ced2ac3
4 changed files with 77 additions and 13 deletions
|
@ -34,6 +34,7 @@ import org.oxycblt.auxio.music.metadata.Separators
|
||||||
import org.oxycblt.auxio.music.stack.Indexer
|
import org.oxycblt.auxio.music.stack.Indexer
|
||||||
import org.oxycblt.auxio.music.stack.interpret.Interpretation
|
import org.oxycblt.auxio.music.stack.interpret.Interpretation
|
||||||
import org.oxycblt.auxio.music.stack.interpret.model.MutableLibrary
|
import org.oxycblt.auxio.music.stack.interpret.model.MutableLibrary
|
||||||
|
import kotlin.math.exp
|
||||||
import timber.log.Timber as L
|
import timber.log.Timber as L
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -363,7 +364,27 @@ constructor(private val indexer: Indexer, private val musicSettings: MusicSettin
|
||||||
Name.Known.SimpleFactory
|
Name.Known.SimpleFactory
|
||||||
}
|
}
|
||||||
|
|
||||||
val newLibrary = indexer.run(listOf(), Interpretation(nameFactory, separators))
|
var explored = 0
|
||||||
|
var loaded = 0
|
||||||
|
var interpreted = 0
|
||||||
|
val newLibrary = indexer.run(listOf(), Interpretation(nameFactory, separators)) {
|
||||||
|
when (it) {
|
||||||
|
is Indexer.Event.Discovered -> {
|
||||||
|
explored = it.amount
|
||||||
|
emitIndexingProgress(IndexingProgress.Songs(loaded, explored))
|
||||||
|
}
|
||||||
|
is Indexer.Event.Extracted -> {
|
||||||
|
loaded = it.amount
|
||||||
|
emitIndexingProgress(IndexingProgress.Songs(loaded, explored))
|
||||||
|
}
|
||||||
|
is Indexer.Event.Interpret -> {
|
||||||
|
interpreted = it.amount
|
||||||
|
if (explored == loaded) {
|
||||||
|
emitIndexingProgress(IndexingProgress.Songs(loaded, explored))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// We want to make sure that all reads and writes are synchronized due to the sheer
|
// We want to make sure that all reads and writes are synchronized due to the sheer
|
||||||
// amount of consumers of MusicRepository.
|
// amount of consumers of MusicRepository.
|
||||||
|
|
|
@ -24,22 +24,37 @@ import kotlinx.coroutines.Dispatchers
|
||||||
import kotlinx.coroutines.coroutineScope
|
import kotlinx.coroutines.coroutineScope
|
||||||
import kotlinx.coroutines.flow.buffer
|
import kotlinx.coroutines.flow.buffer
|
||||||
import kotlinx.coroutines.flow.flowOn
|
import kotlinx.coroutines.flow.flowOn
|
||||||
|
import org.oxycblt.auxio.music.stack.Indexer.Event
|
||||||
import org.oxycblt.auxio.music.stack.explore.Explorer
|
import org.oxycblt.auxio.music.stack.explore.Explorer
|
||||||
import org.oxycblt.auxio.music.stack.interpret.Interpretation
|
import org.oxycblt.auxio.music.stack.interpret.Interpretation
|
||||||
import org.oxycblt.auxio.music.stack.interpret.Interpreter
|
import org.oxycblt.auxio.music.stack.interpret.Interpreter
|
||||||
import org.oxycblt.auxio.music.stack.interpret.model.MutableLibrary
|
import org.oxycblt.auxio.music.stack.interpret.model.MutableLibrary
|
||||||
|
|
||||||
interface Indexer {
|
interface Indexer {
|
||||||
suspend fun run(uris: List<Uri>, interpretation: Interpretation): MutableLibrary
|
suspend fun run(uris: List<Uri>, interpretation: Interpretation, eventHandler: suspend (Event) -> Unit = {}): MutableLibrary
|
||||||
|
|
||||||
|
sealed interface Event {
|
||||||
|
data class Discovered(
|
||||||
|
val amount: Int,
|
||||||
|
) : Event
|
||||||
|
|
||||||
|
data class Extracted(
|
||||||
|
val amount: Int
|
||||||
|
) : Event
|
||||||
|
|
||||||
|
data class Interpret(
|
||||||
|
val amount: Int
|
||||||
|
) : Event
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
class IndexerImpl
|
class IndexerImpl
|
||||||
@Inject
|
@Inject
|
||||||
constructor(private val explorer: Explorer, private val interpreter: Interpreter) : Indexer {
|
constructor(private val explorer: Explorer, private val interpreter: Interpreter) : Indexer {
|
||||||
override suspend fun run(uris: List<Uri>, interpretation: Interpretation) = coroutineScope {
|
override suspend fun run(uris: List<Uri>, interpretation: Interpretation, eventHandler: suspend (Event) -> Unit) = coroutineScope {
|
||||||
val files = explorer.explore(uris)
|
val files = explorer.explore(uris, eventHandler)
|
||||||
val audioFiles = files.audios.flowOn(Dispatchers.IO).buffer()
|
val audioFiles = files.audios.flowOn(Dispatchers.IO).buffer()
|
||||||
val playlistFiles = files.playlists.flowOn(Dispatchers.IO).buffer()
|
val playlistFiles = files.playlists.flowOn(Dispatchers.IO).buffer()
|
||||||
interpreter.interpret(audioFiles, playlistFiles, interpretation)
|
interpreter.interpret(audioFiles, playlistFiles, interpretation, eventHandler)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -33,8 +33,10 @@ import kotlinx.coroutines.flow.flattenMerge
|
||||||
import kotlinx.coroutines.flow.flowOn
|
import kotlinx.coroutines.flow.flowOn
|
||||||
import kotlinx.coroutines.flow.map
|
import kotlinx.coroutines.flow.map
|
||||||
import kotlinx.coroutines.flow.merge
|
import kotlinx.coroutines.flow.merge
|
||||||
|
import kotlinx.coroutines.flow.onEach
|
||||||
import kotlinx.coroutines.flow.shareIn
|
import kotlinx.coroutines.flow.shareIn
|
||||||
import kotlinx.coroutines.flow.withIndex
|
import kotlinx.coroutines.flow.withIndex
|
||||||
|
import org.oxycblt.auxio.music.stack.Indexer
|
||||||
import org.oxycblt.auxio.music.stack.explore.cache.CacheResult
|
import org.oxycblt.auxio.music.stack.explore.cache.CacheResult
|
||||||
import org.oxycblt.auxio.music.stack.explore.cache.TagCache
|
import org.oxycblt.auxio.music.stack.explore.cache.TagCache
|
||||||
import org.oxycblt.auxio.music.stack.explore.extractor.TagExtractor
|
import org.oxycblt.auxio.music.stack.explore.extractor.TagExtractor
|
||||||
|
@ -42,7 +44,7 @@ import org.oxycblt.auxio.music.stack.explore.fs.DeviceFiles
|
||||||
import org.oxycblt.auxio.music.stack.explore.playlists.StoredPlaylists
|
import org.oxycblt.auxio.music.stack.explore.playlists.StoredPlaylists
|
||||||
|
|
||||||
interface Explorer {
|
interface Explorer {
|
||||||
fun explore(uris: List<Uri>): Files
|
fun explore(uris: List<Uri>, eventHandler: suspend (Indexer.Event) -> Unit): Files
|
||||||
}
|
}
|
||||||
|
|
||||||
data class Files(val audios: Flow<AudioFile>, val playlists: Flow<PlaylistFile>)
|
data class Files(val audios: Flow<AudioFile>, val playlists: Flow<PlaylistFile>)
|
||||||
|
@ -56,8 +58,15 @@ constructor(
|
||||||
private val storedPlaylists: StoredPlaylists
|
private val storedPlaylists: StoredPlaylists
|
||||||
) : Explorer {
|
) : Explorer {
|
||||||
@OptIn(ExperimentalCoroutinesApi::class)
|
@OptIn(ExperimentalCoroutinesApi::class)
|
||||||
override fun explore(uris: List<Uri>): Files {
|
override fun explore(uris: List<Uri>, eventHandler: suspend (Indexer.Event) -> Unit): Files {
|
||||||
val deviceFiles = deviceFiles.explore(uris.asFlow()).flowOn(Dispatchers.IO).buffer()
|
var discovered = 0
|
||||||
|
val deviceFiles = deviceFiles.explore(uris.asFlow())
|
||||||
|
.onEach {
|
||||||
|
discovered++
|
||||||
|
eventHandler(Indexer.Event.Discovered(discovered))
|
||||||
|
}
|
||||||
|
.flowOn(Dispatchers.IO)
|
||||||
|
.buffer()
|
||||||
val tagRead = tagCache.read(deviceFiles).flowOn(Dispatchers.IO).buffer()
|
val tagRead = tagCache.read(deviceFiles).flowOn(Dispatchers.IO).buffer()
|
||||||
val (uncachedDeviceFiles, cachedAudioFiles) = tagRead.results()
|
val (uncachedDeviceFiles, cachedAudioFiles) = tagRead.results()
|
||||||
val extractedAudioFiles =
|
val extractedAudioFiles =
|
||||||
|
@ -67,8 +76,13 @@ constructor(
|
||||||
.asFlow()
|
.asFlow()
|
||||||
.flattenMerge()
|
.flattenMerge()
|
||||||
val writtenAudioFiles = tagCache.write(extractedAudioFiles).flowOn(Dispatchers.IO).buffer()
|
val writtenAudioFiles = tagCache.write(extractedAudioFiles).flowOn(Dispatchers.IO).buffer()
|
||||||
|
var loaded = 0
|
||||||
|
val audioFiles = merge(cachedAudioFiles, writtenAudioFiles).onEach {
|
||||||
|
loaded++
|
||||||
|
eventHandler(Indexer.Event.Extracted(loaded))
|
||||||
|
}
|
||||||
val playlistFiles = storedPlaylists.read()
|
val playlistFiles = storedPlaylists.read()
|
||||||
return Files(merge(cachedAudioFiles, writtenAudioFiles), playlistFiles)
|
return Files(audioFiles, playlistFiles)
|
||||||
}
|
}
|
||||||
|
|
||||||
private fun Flow<CacheResult>.results(): Pair<Flow<DeviceFile>, Flow<AudioFile>> {
|
private fun Flow<CacheResult>.results(): Pair<Flow<DeviceFile>, Flow<AudioFile>> {
|
||||||
|
@ -83,7 +97,8 @@ constructor(
|
||||||
val indexed = withIndex()
|
val indexed = withIndex()
|
||||||
val shared =
|
val shared =
|
||||||
indexed.shareIn(
|
indexed.shareIn(
|
||||||
CoroutineScope(Dispatchers.Main), SharingStarted.WhileSubscribed(), replay = 0)
|
CoroutineScope(Dispatchers.Main), SharingStarted.WhileSubscribed(), replay = 0
|
||||||
|
)
|
||||||
return Array(n) { shared.filter { it.index % n == 0 }.map { it.value } }
|
return Array(n) { shared.filter { it.index % n == 0 }.map { it.value } }
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -24,7 +24,9 @@ import kotlinx.coroutines.flow.Flow
|
||||||
import kotlinx.coroutines.flow.buffer
|
import kotlinx.coroutines.flow.buffer
|
||||||
import kotlinx.coroutines.flow.flowOn
|
import kotlinx.coroutines.flow.flowOn
|
||||||
import kotlinx.coroutines.flow.map
|
import kotlinx.coroutines.flow.map
|
||||||
|
import kotlinx.coroutines.flow.onEach
|
||||||
import kotlinx.coroutines.flow.toList
|
import kotlinx.coroutines.flow.toList
|
||||||
|
import org.oxycblt.auxio.music.stack.Indexer
|
||||||
import org.oxycblt.auxio.music.stack.explore.AudioFile
|
import org.oxycblt.auxio.music.stack.explore.AudioFile
|
||||||
import org.oxycblt.auxio.music.stack.explore.PlaylistFile
|
import org.oxycblt.auxio.music.stack.explore.PlaylistFile
|
||||||
import org.oxycblt.auxio.music.stack.interpret.linker.AlbumLinker
|
import org.oxycblt.auxio.music.stack.interpret.linker.AlbumLinker
|
||||||
|
@ -45,7 +47,8 @@ interface Interpreter {
|
||||||
suspend fun interpret(
|
suspend fun interpret(
|
||||||
audioFiles: Flow<AudioFile>,
|
audioFiles: Flow<AudioFile>,
|
||||||
playlistFiles: Flow<PlaylistFile>,
|
playlistFiles: Flow<PlaylistFile>,
|
||||||
interpretation: Interpretation
|
interpretation: Interpretation,
|
||||||
|
eventHandler: suspend (Indexer.Event) -> Unit
|
||||||
): MutableLibrary
|
): MutableLibrary
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -53,12 +56,15 @@ class InterpreterImpl @Inject constructor(private val preparer: Preparer) : Inte
|
||||||
override suspend fun interpret(
|
override suspend fun interpret(
|
||||||
audioFiles: Flow<AudioFile>,
|
audioFiles: Flow<AudioFile>,
|
||||||
playlistFiles: Flow<PlaylistFile>,
|
playlistFiles: Flow<PlaylistFile>,
|
||||||
interpretation: Interpretation
|
interpretation: Interpretation,
|
||||||
|
eventHandler: suspend (Indexer.Event) -> Unit
|
||||||
): MutableLibrary {
|
): MutableLibrary {
|
||||||
val preSongs =
|
val preSongs =
|
||||||
preparer.prepare(audioFiles, interpretation).flowOn(Dispatchers.Main).buffer()
|
preparer.prepare(audioFiles, interpretation).flowOn(Dispatchers.Main).buffer()
|
||||||
|
|
||||||
val genreLinker = GenreLinker()
|
val genreLinker = GenreLinker()
|
||||||
val genreLinkedSongs = genreLinker.register(preSongs).flowOn(Dispatchers.Main).buffer()
|
val genreLinkedSongs = genreLinker.register(preSongs).flowOn(Dispatchers.Main).buffer()
|
||||||
|
|
||||||
val artistLinker = ArtistLinker()
|
val artistLinker = ArtistLinker()
|
||||||
val artistLinkedSongs =
|
val artistLinkedSongs =
|
||||||
artistLinker.register(genreLinkedSongs).flowOn(Dispatchers.Main).buffer()
|
artistLinker.register(genreLinkedSongs).flowOn(Dispatchers.Main).buffer()
|
||||||
|
@ -67,14 +73,21 @@ class InterpreterImpl @Inject constructor(private val preparer: Preparer) : Inte
|
||||||
// before we go any further.
|
// before we go any further.
|
||||||
val genres = genreLinker.resolve()
|
val genres = genreLinker.resolve()
|
||||||
val artists = artistLinker.resolve()
|
val artists = artistLinker.resolve()
|
||||||
|
|
||||||
|
var interpreted = 0
|
||||||
val albumLinker = AlbumLinker()
|
val albumLinker = AlbumLinker()
|
||||||
val albumLinkedSongs =
|
val albumLinkedSongs =
|
||||||
albumLinker
|
albumLinker
|
||||||
.register(artistLinkedSongs)
|
.register(artistLinkedSongs)
|
||||||
.flowOn(Dispatchers.Main)
|
.flowOn(Dispatchers.Main)
|
||||||
|
.onEach {
|
||||||
|
interpreted++;
|
||||||
|
eventHandler(Indexer.Event.Interpret(interpreted))
|
||||||
|
}
|
||||||
.map { LinkedSongImpl(it) }
|
.map { LinkedSongImpl(it) }
|
||||||
.toList()
|
.toList()
|
||||||
val albums = albumLinker.resolve()
|
val albums = albumLinker.resolve()
|
||||||
|
|
||||||
val songs = albumLinkedSongs.map { SongImpl(it) }
|
val songs = albumLinkedSongs.map { SongImpl(it) }
|
||||||
return LibraryImpl(songs, albums, artists, genres)
|
return LibraryImpl(songs, albums, artists, genres)
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue