diff --git a/cmd/daemon/api_server.go b/cmd/daemon/api_server.go index 0ed43401..dc5e439a 100644 --- a/cmd/daemon/api_server.go +++ b/cmd/daemon/api_server.go @@ -202,12 +202,16 @@ func (p *AppPlayer) newApiResponseStatusTrack(media *librespot.Media, position i albumCoverId = getBestImageIdForSize(track.Album.CoverGroup.Image, p.app.cfg.Server.ImageSize) } + var trackCoverUrl *string + if p.prodInfo != nil { + trackCoverUrl = p.prodInfo.ImageUrl(albumCoverId) + } return &ApiResponseStatusTrack{ Uri: librespot.SpotifyIdFromGid(librespot.SpotifyIdTypeTrack, track.Gid).Uri(), Name: *track.Name, ArtistNames: artists, AlbumName: *track.Album.Name, - AlbumCoverUrl: p.prodInfo.ImageUrl(albumCoverId), + AlbumCoverUrl: trackCoverUrl, Position: position, Duration: int(*track.Duration), ReleaseDate: track.Album.Date.String(), @@ -219,12 +223,16 @@ func (p *AppPlayer) newApiResponseStatusTrack(media *librespot.Media, position i albumCoverId := getBestImageIdForSize(episode.CoverImage.Image, p.app.cfg.Server.ImageSize) + var episodeCoverUrl *string + if p.prodInfo != nil { + episodeCoverUrl = p.prodInfo.ImageUrl(albumCoverId) + } return &ApiResponseStatusTrack{ Uri: librespot.SpotifyIdFromGid(librespot.SpotifyIdTypeEpisode, episode.Gid).Uri(), Name: *episode.Name, ArtistNames: []string{*episode.Show.Name}, AlbumName: *episode.Show.Name, - AlbumCoverUrl: p.prodInfo.ImageUrl(albumCoverId), + AlbumCoverUrl: episodeCoverUrl, Position: position, Duration: int(*episode.Duration), ReleaseDate: "", diff --git a/cmd/daemon/controls.go b/cmd/daemon/controls.go index 4b9600e6..c7e8164b 100644 --- a/cmd/daemon/controls.go +++ b/cmd/daemon/controls.go @@ -17,6 +17,7 @@ import ( "github.com/devgianlu/go-librespot/player" connectpb "github.com/devgianlu/go-librespot/proto/spotify/connectstate" playerpb "github.com/devgianlu/go-librespot/proto/spotify/player" + "github.com/devgianlu/go-librespot/spclient" "github.com/devgianlu/go-librespot/tracks" "google.golang.org/protobuf/proto" ) @@ -182,6 +183,17 @@ func (p *AppPlayer) handlePlayerEvent(ctx context.Context, ev *player.Event) { }, }) case player.EventTypeNotPlaying: + // If a DJ narration just finished, load the music track immediately + // instead of advancing the context queue. + if p.djPendingMusicId != nil { + pendingId := p.djPendingMusicId + p.djPendingMusicId = nil + if err := p.loadDJPendingMusic(ctx, pendingId); err != nil { + p.app.log.WithError(err).Error("failed loading DJ music after narration") + } + return + } + p.sess.Events().OnPlayerEnd(p.primaryStream, p.state.trackPosition()) p.app.server.Emit(&ApiEvent{ @@ -225,8 +237,116 @@ type skipToFunc func(*connectpb.ContextTrack) bool func (p *AppPlayer) loadContext(ctx context.Context, spotCtx *connectpb.Context, skipTo skipToFunc, paused, drop bool) error { ctxTracks, err := tracks.NewTrackListFromContext(ctx, p.app.log, p.sess.Spclient(), spotCtx) + p.app.log.Debugf("loadContext %s: resolve err=%v (featureId=%s)", spotCtx.Uri, err, func() string { + if p.state.player.PlayOrigin != nil { + return p.state.player.PlayOrigin.FeatureIdentifier + } + return "" + }()) if err != nil { - return fmt.Errorf("failed creating track list: %w", err) + // Dynamic contexts (e.g. Spotify DJ) return empty pages from spclient. + // Use whatever tracks Spotify sent in the play command's context pages. + var staticTracks []*connectpb.ContextTrack + for _, page := range spotCtx.Pages { + staticTracks = append(staticTracks, page.Tracks...) + } + if len(staticTracks) == 0 { + // DJ contexts send no tracks in the play command payload. + p.app.log.WithError(err).Warnf("no tracks in play command payload for %s", spotCtx.Uri) + p.app.log.Debugf("djAwaitingLoad: PlayOrigin.FeatureIdentifier=%q stateActive=%t prevTrack=%v prevContextUri=%q", + func() string { + if p.state.player.PlayOrigin != nil { + return p.state.player.PlayOrigin.FeatureIdentifier + } + return "" + }(), + p.state.active, + func() string { + if p.state.player.Track != nil { + return p.state.player.Track.Uri + } + return "" + }(), + p.state.player.ContextUri, + ) + p.state.player.ContextUri = spotCtx.Uri + p.state.player.ContextUrl = spotCtx.Url + p.state.player.ContextRestrictions = spotCtx.Restrictions + p.app.djCachedContextUri = spotCtx.Uri + // Keep djSectionBuffer — sections buffered from a previous handover are still + // valid for a fresh start of the same DJ context. Clearing them would leave + // fresh starts with an empty buffer in interactive mode (where the server only + // pushes a handful of playlists per session, not the full 50). + + if p.state.player.ContextMetadata == nil { + p.state.player.ContextMetadata = map[string]string{} + } + + // Always use state_restore to get full session metadata (playlist_volatile_context_id, + // lexicon_current_time, session_control_display, etc.) — the phone requires these + // fields to activate "Switch it up". + p.app.log.Debugf("lexicon: fresh DJ start reason=state_restore") + + lexCtx, lexErr := p.sess.Spclient().LexiconContextResolve(ctx, spotCtx.Uri, "state_restore") + if lexErr == nil { + for _, page := range lexCtx.GetPages() { + for _, t := range page.GetTracks() { + if t.Uri != "spotify:delimiter" && t.Uri != "" { + staticTracks = append(staticTracks, t) + } + } + } + if len(staticTracks) > 0 { + p.app.log.Infof("lexicon: pre-fetched %d DJ tracks for %s (volatile_id=%s lexicon_time=%s)", + len(staticTracks), spotCtx.Uri, + lexCtx.Metadata["playlist_volatile_context_id"], + lexCtx.Metadata["lexicon_current_time"]) + for k, v := range lexCtx.Metadata { + p.state.player.ContextMetadata[k] = v + } + // Log track metadata to find vibe section playlist URIs. + for i, t := range staticTracks { + su := t.Metadata["station_uri"] + sc := t.Metadata["source.components"] + jid := t.Metadata["narration.jump.commentary_id"] + if su != "" || jid != "" { + p.app.log.Debugf("lexicon track[%d] %s station_uri=%q source=%q jump=%q", i, t.Uri, su, sc, jid) + } + } + p.app.djCachedNextTracks = staticTracks + p.app.djCacheIsOurs = true + } + } else { + p.app.log.Debugf("lexicon: resolve failed (%v), will wait for cluster", lexErr) + } + p.state.player.ContextMetadata["dj.interactivity_enabled"] = "true" + + // Send IsPlaying=false + full metadata first. + // This signals Spotify to register a fresh DJ session server-side, + // which causes it to eventually broadcast a ClusterUpdate that enables + // "Switch it up" on the phone. + p.player.Stop() + p.primaryStream = nil + p.secondaryStream = nil + p.state.player.NextTracks = nil + p.state.player.PrevTracks = nil + p.state.player.PositionAsOfTimestamp = 0 + p.state.player.IsPlaying = false + p.state.player.IsBuffering = false + p.updateState(ctx) + + if len(staticTracks) == 0 { + // Lexicon failed — wait for poll to get tracks. + p.djPollAttempts = 0 + p.djPollTimer.Reset(5 * time.Second) + p.djAwaitingLoad = true + return nil + } + // Lexicon succeeded — fall through to static resolver and start playing immediately. + } + p.app.log.WithError(err).Warnf("context resolution failed, building static track list for %s (%d tracks)", spotCtx.Uri, len(staticTracks)) + resolver := spclient.NewStaticContextResolver(p.app.log, spotCtx.Uri, staticTracks) + ctxTracks = tracks.NewTrackListFromResolver(p.app.log, resolver) } p.state.setPaused(paused) @@ -305,13 +425,77 @@ func (p *AppPlayer) loadCurrentTrack(ctx context.Context, paused, drop bool) err p.primaryStream = nil } - spotId, err := librespot.SpotifyIdFromUri(p.state.player.Track.Uri) + // Skip delimiter tracks (used as queue separators in DJ mode). + if p.state.player.Track.Uri == "spotify:delimiter" { + return librespot.ErrMediaRestricted + } + // Normalize spotify:media: → spotify:track: (used in some DJ queue pushes). + trackUri := strings.ReplaceAll(p.state.player.Track.Uri, "spotify:media:", "spotify:track:") + spotId, err := librespot.SpotifyIdFromUri(trackUri) if err != nil { return fmt.Errorf("failed parsing uri: %w", err) } else if spotId.Type() != librespot.SpotifyIdTypeTrack && spotId.Type() != librespot.SpotifyIdTypeEpisode { return fmt.Errorf("unsupported spotify type: %s", spotId.Type()) } + // Clear any stale DJ narration state. + p.djPendingMusicId = nil + + // If this is a DJ track with narration, load the narration clip as the primary + // stream and remember the music track to play after it. + // Try intro (session start), then jump (between tracks), then outro. + if player.IsDJTrack(p.state.player.Track) { + var narrKeys []string + for k := range p.state.player.Track.Metadata { + if strings.HasPrefix(k, "narration.") { + narrKeys = append(narrKeys, k) + } + } + introId := p.state.player.Track.Metadata["narration.intro.commentary_id"] + jumpId := p.state.player.Track.Metadata["narration.jump.commentary_id"] + p.app.log.Debugf("DJ track narration: keys=%d intro_id=%q jump_id=%q", len(narrKeys), introId, jumpId) + var narr *player.DJNarration + for _, narrType := range []string{"intro", "jump", "outro"} { + if n := player.NarrationForTrack(p.state.player.Track, narrType); n != nil { + narr = n + break + } + } + if narr != nil { + narrId, err := player.NarrationSpotifyId(narr.CommentaryId) + if err != nil { + p.app.log.WithError(err).Warn("failed parsing DJ narration id, skipping to music") + } else { + narrStream, err := p.player.NewNarrationStream(ctx, p.app.client, narrId, 160, 0) + if err != nil { + p.app.log.WithError(err).Warn("failed loading DJ narration stream, skipping to music") + } else { + p.app.log.WithField("commentary_id", narr.CommentaryId). + Infof("playing DJ intro narration before %s", spotId.Uri()) + + p.primaryStream = narrStream + p.djPendingMusicId = spotId + + if err := p.player.SetPrimaryStream(narrStream.Source, paused, drop); err != nil { + return fmt.Errorf("failed setting DJ narration stream: %w", err) + } + + p.sess.Events().PostPrimaryStreamLoad(narrStream, paused) + + p.state.updateTimestamp() + p.state.player.PlaybackId = hex.EncodeToString(narrStream.PlaybackId) + p.state.player.Duration = int64(narrStream.Media.Duration()) + p.state.player.IsPlaying = true + p.state.player.IsBuffering = false + p.state.setPaused(paused) + p.updateState(ctx) + + return nil + } + } + } + } + trackPosition := p.state.trackPosition() p.app.log.WithField("uri", spotId.Uri()). Debugf("loading %s (paused: %t, position: %dms)", spotId.Type(), paused, trackPosition) @@ -343,6 +527,12 @@ func (p *AppPlayer) loadCurrentTrack(ctx context.Context, paused, drop bool) err var err error p.primaryStream, err = p.player.NewStream(ctx, p.app.client, *spotId, p.app.cfg.Bitrate, trackPosition) + if err != nil && trackPosition > 0 { + p.app.log.WithError(err).Warnf("failed creating stream at %dms for %s, retrying from 0", trackPosition, spotId) + p.state.player.PositionAsOfTimestamp = 0 + trackPosition = 0 + p.primaryStream, err = p.player.NewStream(ctx, p.app.client, *spotId, p.app.cfg.Bitrate, 0) + } if err != nil { return fmt.Errorf("failed creating stream for %s: %w", spotId, err) } @@ -375,6 +565,50 @@ func (p *AppPlayer) loadCurrentTrack(ctx context.Context, paused, drop bool) err return nil } +// loadDJPendingMusic loads the music track that follows a DJ narration clip. +// Called from handlePlayerEvent when EventTypeNotPlaying fires while +// djPendingMusicId is set (i.e. the narration just finished). +func (p *AppPlayer) loadDJPendingMusic(ctx context.Context, spotId *librespot.SpotifyId) error { + p.app.log.WithField("uri", spotId.Uri()).Info("narration finished, loading DJ music track") + + if p.primaryStream != nil { + p.sess.Events().OnPrimaryStreamUnload(p.primaryStream, p.player.PositionMs()) + p.primaryStream = nil + } + p.secondaryStream = nil + + stream, err := p.player.NewStream(ctx, p.app.client, *spotId, p.app.cfg.Bitrate, 0) + if err != nil { + return fmt.Errorf("failed creating DJ music stream for %s: %w", spotId, err) + } + + p.primaryStream = stream + if err := p.player.SetPrimaryStream(stream.Source, false, true); err != nil { + return fmt.Errorf("failed setting DJ music stream: %w", err) + } + + p.sess.Events().PostPrimaryStreamLoad(stream, false) + + p.app.log.WithField("uri", spotId.Uri()). + Infof("loaded DJ music %s (duration: %dms)", strconv.QuoteToGraphic(stream.Media.Name()), stream.Media.Duration()) + + p.state.updateTimestamp() + p.state.player.PlaybackId = hex.EncodeToString(stream.PlaybackId) + p.state.player.Duration = int64(stream.Media.Duration()) + p.state.player.IsPlaying = true + p.state.player.IsBuffering = false + p.state.setPaused(false) + p.updateState(ctx) + p.schedulePrefetchNext() + + p.app.server.Emit(&ApiEvent{ + Type: ApiEventTypeMetadata, + Data: ApiEventDataMetadata(*p.newApiResponseStatusTrack(stream.Media, 0)), + }) + + return nil +} + func (p *AppPlayer) setOptions(ctx context.Context, repeatingContext *bool, repeatingTrack *bool, shufflingContext *bool) { var requiresUpdate bool if repeatingContext != nil && *repeatingContext != p.state.player.Options.RepeatingContext { @@ -450,11 +684,20 @@ func (p *AppPlayer) addToQueue(ctx context.Context, track *connectpb.ContextTrac } func (p *AppPlayer) setQueue(ctx context.Context, prev []*connectpb.ContextTrack, next []*connectpb.ContextTrack) { + p.app.log.Debugf("set_queue received: prev=%d next=%d (djAwaitingLoad=%t)", len(prev), len(next), p.djAwaitingLoad) + if p.state.tracks == nil { p.app.log.Warnf("cannot set queue without a context") return } + // If Spotify delivers DJ tracks via set_queue (rather than cluster nextTracks), + // cache them so the pendingDJ path can build a real track list from them. + if len(next) > 0 && p.app.djCachedContextUri != "" && p.state.player.ContextUri == p.app.djCachedContextUri { + p.app.log.Debugf("caching %d DJ tracks from set_queue for %s", len(next), p.app.djCachedContextUri) + p.app.djCachedNextTracks = next + } + p.state.tracks.SetQueue(prev, next) p.state.player.PrevTracks = p.state.tracks.PrevTracks() p.state.player.NextTracks = p.state.tracks.NextTracks(ctx, next) @@ -584,7 +827,16 @@ func (p *AppPlayer) skipNext(ctx context.Context, track *connectpb.ContextTrack) if track != nil { contextSpotType := librespot.InferSpotifyIdTypeFromContextUri(p.state.player.ContextUri) - if err := p.state.tracks.TrySeek(ctx, tracks.ContextTrackComparator(contextSpotType, track)); err != nil { + if err := p.state.tracks.Seek(ctx, tracks.ContextTrackComparator(contextSpotType, track)); err != nil { + // Track not found in our list. For DJ mode, load it directly from + // the hint Spotify sent rather than silently restarting from track 0. + if player.IsDJTrack(p.state.player.Track) { + p.app.log.Warnf("DJ skip target %s not in track list, loading directly", track.Uri) + p.state.player.Timestamp = time.Now().UnixMilli() + p.state.player.PositionAsOfTimestamp = 0 + p.state.player.Track = librespot.ContextTrackToProvidedTrack(contextSpotType, track) + return p.loadCurrentTrack(ctx, p.state.player.IsPaused, true) + } return err } @@ -596,7 +848,31 @@ func (p *AppPlayer) skipNext(ctx context.Context, track *connectpb.ContextTrack) p.state.player.NextTracks = p.state.tracks.NextTracks(ctx, nil) p.state.player.Index = p.state.tracks.Index() + // Proactive DJ queue refresh on targeted skip (Switch it up). The + // normal low-queue check lives in advanceNext which is NOT called when + // skip_next carries a target track, so we mirror it here. + isDJSkip := p.state.player.PlayOrigin != nil && p.state.player.PlayOrigin.FeatureIdentifier == "dynamic-sessions" + if isDJSkip && !p.djAwaitingLoad && len(p.state.player.NextTracks) < 8 { + p.app.log.Infof("skipNext: DJ queue low (%d tracks), scheduling lexicon refresh", len(p.state.player.NextTracks)) + p.djPollAttempts = 0 + if !p.djPollTimer.Stop() { + select { + case <-p.djPollTimer.C: + default: + } + } + p.djPollTimer.Reset(3 * time.Second) + } + if err := p.loadCurrentTrack(ctx, p.state.player.IsPaused, true); err != nil { + // In DJ mode, narration/media clips appear in the queue as spotify:track: but + // return 404 when fetched — skip past them automatically. + isDJ := p.state.player.PlayOrigin != nil && p.state.player.PlayOrigin.FeatureIdentifier == "dynamic-sessions" + if isDJ { + p.app.log.WithError(err).Warnf("DJ track %s failed to load, auto-advancing to next", p.state.player.Track.GetUri()) + _, advErr := p.advanceNext(ctx, false, true) + return advErr + } return err } return nil @@ -628,14 +904,41 @@ func (p *AppPlayer) advanceNext(ctx context.Context, forceNext, drop bool) (bool hasNextTrack = true p.state.player.IsPaused = false } else { + // If we are still waiting for the initial DJ cluster update, the track + // list is the old (pre-DJ) context. Advancing it would play a playlist + // song. Signal the server and wait for the cluster push instead. + if p.djAwaitingLoad { + if p.state.player.ContextUri == p.app.djCachedContextUri { + p.app.log.Debugf("advanceNext: djAwaitingLoad=true, keeping stream alive (context=%s)", p.state.player.ContextUri) + p.updateState(ctx) + return false, nil + } + // Stale flag from a previous DJ session; clear it so normal + // playlists are not blocked. + p.app.log.Debugf("advanceNext: clearing stale djAwaitingLoad (context=%s, cachedDJ=%s)", p.state.player.ContextUri, p.app.djCachedContextUri) + p.djAwaitingLoad = false + } + // try to get the next track hasNextTrack = p.state.tracks.GoNext(ctx) // if we could not get the next track we probably ended the context if !hasNextTrack { + // DJ contexts manage their queue externally via ClusterUpdate/set_queue — + // do not loop back to track 0 or attempt autoplay when exhausted. + // Use PlayOrigin.FeatureIdentifier so this only fires when we are + // actually in an active DJ session, not whenever any playlist whose + // URI was previously used as a DJ seed reaches its end. + isDJ := p.state.player.PlayOrigin != nil && p.state.player.PlayOrigin.FeatureIdentifier == "dynamic-sessions" + if isDJ { + p.app.log.Debugf("advanceNext: isDJ no next tracks, setting djAwaitingLoad and triggering poll") + p.djAwaitingLoad = true + p.djPollAttempts = 0 + p.djPollTimer.Reset(5 * time.Second) + p.updateState(ctx) + return false, nil + } hasNextTrack = p.state.tracks.GoStart(ctx) - - // if repeating is disabled move to the first track, but do not start it if !p.state.player.Options.RepeatingContext { hasNextTrack = false } @@ -649,6 +952,23 @@ func (p *AppPlayer) advanceNext(ctx context.Context, forceNext, drop bool) (bool p.state.player.NextTracks = p.state.tracks.NextTracks(ctx, nil) p.state.player.Index = p.state.tracks.Index() + // Proactive DJ queue refresh: when nextTracks drops below 8, schedule a + // lexicon poll to fetch 15 fresh tracks with new jump points. We do NOT set + // IsPlaying=false here — that disrupts the phone's DJ state and greys out + // "Switch it up" even after the queue is refreshed. + isDJActive := p.state.player.PlayOrigin != nil && p.state.player.PlayOrigin.FeatureIdentifier == "dynamic-sessions" + if isDJActive && !p.djAwaitingLoad && len(p.state.player.NextTracks) < 8 { + p.app.log.Infof("advanceNext: DJ queue low (%d tracks), scheduling lexicon refresh", len(p.state.player.NextTracks)) + p.djPollAttempts = 0 + if !p.djPollTimer.Stop() { + select { + case <-p.djPollTimer.C: + default: + } + } + p.djPollTimer.Reset(3 * time.Second) + } + uri = p.state.player.Track.Uri } @@ -697,9 +1017,23 @@ func (p *AppPlayer) advanceNext(ctx context.Context, forceNext, drop bool) (bool } // load current track into stream - if err := p.loadCurrentTrack(ctx, !hasNextTrack, drop); errors.Is(err, librespot.ErrMediaRestricted) || errors.Is(err, librespot.ErrNoSupportedFormats) { + isDJSession := p.state.player.PlayOrigin != nil && p.state.player.PlayOrigin.FeatureIdentifier == "dynamic-sessions" + if err := p.loadCurrentTrack(ctx, !hasNextTrack, drop); errors.Is(err, librespot.ErrMediaRestricted) || errors.Is(err, librespot.ErrNoSupportedFormats) || (isDJSession && err != nil && !forceNext) { p.app.log.WithError(err).Infof("skipping unplayable media: %s", uri) if forceNext { + if isDJSession { + // Two consecutive unplayable DJ tracks (e.g. back-to-back narration clips). + // Signal Spotify for a fresh queue rather than failing hard. + p.app.log.WithError(err).Warnf("DJ: consecutive unplayable tracks, signaling Spotify for more") + p.player.Stop() + p.primaryStream = nil + p.secondaryStream = nil + p.state.player.IsPlaying = false + p.state.player.IsBuffering = false + p.djAwaitingLoad = true + p.updateState(ctx) + return false, nil + } // we failed in finding another track to play, just stop return false, err } diff --git a/cmd/daemon/main.go b/cmd/daemon/main.go index 048cbccb..bb1ac4b9 100644 --- a/cmd/daemon/main.go +++ b/cmd/daemon/main.go @@ -18,6 +18,7 @@ import ( "github.com/devgianlu/go-librespot/apresolve" "github.com/devgianlu/go-librespot/player" + connectpb "github.com/devgianlu/go-librespot/proto/spotify/connectstate" devicespb "github.com/devgianlu/go-librespot/proto/spotify/connectstate/devices" "github.com/devgianlu/go-librespot/session" "github.com/devgianlu/go-librespot/zeroconf" @@ -52,6 +53,18 @@ type App struct { server ApiServer mpris mpris.Server logoutCh chan *AppPlayer + + // DJ cache persists across zeroconf reconnects so that a transfer command + // arriving on a new session can still use the queue from the last cluster push. + djCachedContextUri string + djCachedNextTracks []*connectpb.ContextTrack + djCacheIsOurs bool // true when cache was populated while we were the active device + + // djSectionBuffer holds vibe-section playlist tracks received via hm://playlist/ pushes. + // Each entry is one section (from a different vibe playlist). When the lexicon 15-track + // window is exhausted, djPoll pops the next section from here to keep playback going + // without looping the same 15 tracks. + djSectionBuffer [][]*connectpb.ContextTrack } func parseDeviceType(val string) (devicespb.DeviceType, error) { @@ -132,6 +145,10 @@ func (app *App) newAppPlayer(ctx context.Context, creds any) (_ *AppPlayer, err appPlayer.prefetchTimer = time.NewTimer(math.MaxInt64) appPlayer.prefetchTimer.Stop() + appPlayer.djPollTimer = time.NewTimer(math.MaxInt64) + appPlayer.djPollTimer.Stop() + appPlayer.djPollAttempts = 0 + if appPlayer.sess, err = session.NewSessionFromOptions(ctx, &session.Options{ Log: app.log, DeviceType: app.deviceType, @@ -149,6 +166,7 @@ func (app *App) newAppPlayer(ctx context.Context, creds any) (_ *AppPlayer, err if appPlayer.player, err = player.NewPlayer(&player.Options{ Spclient: appPlayer.sess.Spclient(), + Mercury: appPlayer.sess.Mercury(), AudioKey: appPlayer.sess.AudioKey(), Events: appPlayer.sess.Events(), Log: app.log, diff --git a/cmd/daemon/player.go b/cmd/daemon/player.go index 4839e4e8..ce8f6bcb 100644 --- a/cmd/daemon/player.go +++ b/cmd/daemon/player.go @@ -25,6 +25,7 @@ import ( "github.com/devgianlu/go-librespot/player" connectpb "github.com/devgianlu/go-librespot/proto/spotify/connectstate" "github.com/devgianlu/go-librespot/session" + "github.com/devgianlu/go-librespot/spclient" "github.com/devgianlu/go-librespot/tracks" ) @@ -49,6 +50,22 @@ type AppPlayer struct { secondaryStream *player.Stream prefetchTimer *time.Timer + + // djPollTimer periodically retries ContextResolve for the DJ playlist after a transfer + // with no tracks, in case the playlist becomes available before Spotify sends the ~53s push. + djPollTimer *time.Timer + djPollAttempts int + + // djPendingMusicId is set when a DJ narration clip is playing as the + // primary stream. It holds the SpotifyId of the actual music track that + // should start once the narration finishes (EventTypeNotPlaying). + djPendingMusicId *librespot.SpotifyId + + // djAwaitingLoad is set when a DJ play command was accepted but context resolution + // returned no tracks (empty spclient pages). Cleared once the first DJ track loads. + // This lets the ClusterUpdate handler distinguish "transitioning into DJ" (should load) + // from "already playing music within DJ" (should not reload). + djAwaitingLoad bool } func (p *AppPlayer) handleAccesspointPacket(pktType ap.PacketType, payload []byte) error { @@ -109,6 +126,79 @@ func (p *AppPlayer) handleDealerMessage(ctx context.Context, msg dealer.Message) p.app.log.WithField("username", librespot.ObfuscateUsername(p.sess.Username())). Debugf("requested logout out") p.logout <- p + } else if strings.HasPrefix(msg.Uri, "hm://playlist/v2/playlist/") { + // Spotify responds to djAwaitingLoad (IsPlaying=false + DJ context) by updating a + // companion playlist with the next batch of DJ tracks and pushing this notification. + // Extract the playlist ID, fetch its content, and use the tracks to resume playback. + playlistId := strings.TrimPrefix(msg.Uri, "hm://playlist/v2/playlist/") + if idx := strings.IndexByte(playlistId, '/'); idx >= 0 { + playlistId = playlistId[:idx] + } + p.app.log.Debugf("playlist update notification: %s (payloadLen=%d, djAwaitingLoad=%t, djContextUri=%q)", + playlistId, len(msg.Payload), p.djAwaitingLoad, p.app.djCachedContextUri) + + // Only process if we are in a known DJ context — otherwise this is an unrelated update. + if p.app.djCachedContextUri == "" { + return nil + } + + playlistUri := "spotify:playlist:" + playlistId + spotCtx, err := p.sess.Spclient().ContextResolve(ctx, playlistUri) + if err != nil { + p.app.log.WithError(err).Debugf("failed resolving playlist update %s", playlistUri) + return nil + } + + // Collect all tracks from the resolved context. + var newTracks []*connectpb.ContextTrack + for _, page := range spotCtx.Pages { + for _, track := range page.Tracks { + if track.Uri != "spotify:delimiter" { + newTracks = append(newTracks, track) + } + } + } + + if len(newTracks) == 0 { + p.app.log.Debugf("playlist update %s resolved with 0 tracks (ignoring)", playlistUri) + return nil + } + + p.app.log.Infof("DJ playlist update %s: %d tracks (djAwaitingLoad=%t)", playlistUri, len(newTracks), p.djAwaitingLoad) + p.app.djCachedNextTracks = newTracks + p.app.djCacheIsOurs = true + p.djPollTimer.Stop() // cancel any in-progress poll — we got the tracks via push + + if p.djAwaitingLoad { + // Load the first track immediately — same as the pendingDJ path in the cluster handler. + currentTrack := newTracks[0] + ctxTracks := make([]*connectpb.ContextTrack, len(newTracks)) + copy(ctxTracks, newTracks) + resolver := spclient.NewStaticContextResolver(p.app.log, p.app.djCachedContextUri, ctxTracks) + newList := tracks.NewTrackListFromResolver(p.app.log, resolver) + ctxType := librespot.InferSpotifyIdTypeFromContextUri(p.app.djCachedContextUri) + _ = newList.TrySeek(ctx, tracks.ContextTrackComparator(ctxType, currentTrack)) + + p.state.tracks = newList + p.state.player.Track = p.state.tracks.CurrentTrack() + p.state.player.NextTracks = p.state.tracks.NextTracks(ctx, nil) + p.state.player.PositionAsOfTimestamp = 0 + + p.djAwaitingLoad = false + p.app.log.Infof("loading DJ track from playlist update (%d next tracks)", len(newTracks)-1) + if err := p.loadCurrentTrack(ctx, false, true); err != nil { + p.app.log.WithError(err).Warn("failed loading DJ track from playlist update, reverting to djAwaitingLoad") + p.djAwaitingLoad = true + } + } else { + // Buffer this section for later use when the queue runs low. We buffer + // unconditionally here (not gated on ContextUri) because the push can + // arrive while a temporary regular-playlist context is active (e.g. + // during a Switch-it-up transition), and we must not silently drop it. + p.app.djSectionBuffer = append(p.app.djSectionBuffer, newTracks) + p.app.log.Debugf("buffered DJ section %d (%d tracks) from playlist update", len(p.app.djSectionBuffer), len(newTracks)) + } + return nil } else if strings.HasPrefix(msg.Uri, "hm://connect-state/v1/cluster") { var clusterUpdate connectpb.ClusterUpdate if err := proto.Unmarshal(msg.Payload, &clusterUpdate); err != nil { @@ -116,9 +206,163 @@ func (p *AppPlayer) handleDealerMessage(ctx context.Context, msg dealer.Message) } stopBeingActive := p.state.active && clusterUpdate.Cluster.ActiveDeviceId != p.app.deviceId && clusterUpdate.Cluster.PlayerState.Timestamp > p.state.lastTransferTimestamp + p.app.log.Debugf("cluster decision: activeDeviceId=%q ourDeviceId=%q clusterPlayerTs=%d lastTransferTs=%d stateActive=%t stopBeingActive=%t djAwaitingLoad=%t stateContextUri=%q", + clusterUpdate.Cluster.ActiveDeviceId, p.app.deviceId, + clusterUpdate.Cluster.PlayerState.Timestamp, p.state.lastTransferTimestamp, + p.state.active, stopBeingActive, p.djAwaitingLoad, p.state.player.ContextUri) // We are still the active device, do not quit if !stopBeingActive { + clusterState := clusterUpdate.Cluster.GetPlayerState() + nextCount := 0 + if clusterState != nil { + nextCount = len(clusterState.NextTracks) + } + isDJCluster := false + if clusterState != nil { + // Primary check: PlayOrigin.FeatureIdentifier == "dynamic-sessions". + // This is reliably set by the server for DJ sessions on both desktop + // and speaker/zeroconf devices. + if clusterState.PlayOrigin != nil && clusterState.PlayOrigin.FeatureIdentifier == "dynamic-sessions" { + isDJCluster = true + } + // Fallback: check source.components on individual tracks (populated + // on desktop/interactive clients but often absent on speaker devices). + if !isDJCluster { + for _, t := range clusterState.NextTracks { + if player.IsDJTrack(t) { + isDJCluster = true + break + } + } + } + // Third check: context URI matches our known DJ playlist URI. + // Spotify sometimes sends featureId="home" when DJ is pressed from the + // home/browse screen rather than the now-playing DJ button. The cluster + // still carries the DJ playlist URI and next tracks — treat it as DJ. + clusterCtxUri := clusterState.ContextUri + if clusterCtxUri == "" { + clusterCtxUri = p.state.player.ContextUri + } + if !isDJCluster && p.app.djCachedContextUri != "" && clusterCtxUri == p.app.djCachedContextUri { + isDJCluster = true + } + // Fourth check: we are explicitly waiting for a DJ cluster (djAwaitingLoad=true) + // and the cluster's context URI matches the DJ context URI we accepted in the + // play command. This catches fresh-start DJ sessions where featureId="home" is + // sent and djCachedContextUri is not yet populated (empty on first boot/restart). + if !isDJCluster && p.djAwaitingLoad && clusterCtxUri != "" && clusterCtxUri == p.state.player.ContextUri { + isDJCluster = true + } + } + p.app.log.Debugf("cluster update received (active=%t, nextTracks=%d, djCluster=%t, featureId=%s)", + p.state.active, nextCount, isDJCluster, func() string { + if clusterState != nil && clusterState.PlayOrigin != nil { + return clusterState.PlayOrigin.FeatureIdentifier + } + return "" + }()) + + // Log what the server echoes back about our device's capabilities. + if ourDevice := clusterUpdate.Cluster.Device[p.app.deviceId]; ourDevice != nil && ourDevice.Capabilities != nil { + caps := ourDevice.Capabilities + p.app.log.Debugf("server-reflected caps: SupportsDj=%t IsVoiceEnabled=%t", caps.SupportsDj, caps.IsVoiceEnabled) + } + + if isDJCluster { + // Cache the DJ next tracks for use when a transfer command arrives shortly after. + contextUri := clusterState.ContextUri + if contextUri == "" { + contextUri = p.state.player.ContextUri + } + if nextCount > 0 { + p.app.djCachedContextUri = contextUri + p.app.djCachedNextTracks = make([]*connectpb.ContextTrack, 0, nextCount) + for _, t := range clusterState.NextTracks { + if t.Uri != "spotify:delimiter" { + p.app.djCachedNextTracks = append(p.app.djCachedNextTracks, librespot.ProvidedTrackToContextTrack(t)) + } + } + p.app.djCacheIsOurs = clusterUpdate.Cluster.ActiveDeviceId == p.app.deviceId + p.app.log.Debugf("cached DJ next tracks from cluster push (%d tracks for %s, ours=%t)", nextCount, contextUri, p.app.djCacheIsOurs) + } else { + p.app.log.Debugf("skipping DJ cache update for %s — cluster has 0 next tracks (keeping %d cached)", contextUri, len(p.app.djCachedNextTracks)) + } + + // Update the live track list if we are the active player with a DJ context. + // This covers two cases: + // (a) Already playing a DJ track — refresh queue in place. + // (b) Accepted a DJ play command but have no tracks yet (ContextUri set, + // no track loaded) — start playing from the cluster's current track. + // Use djCachedContextUri to detect active DJ sessions. IsDJTrack() is + // unreliable because regular music tracks in a DJ queue don't carry + // YourDJ source metadata, and transferred tracks never do. + // Guard with !djAwaitingLoad so this path doesn't fire during the initial + // DJ selection (when we're still waiting for the first track from the + // cluster) — that case is handled by pendingDJ below. + alreadyDJ := p.state.active && p.state.player.Track != nil && contextUri == p.app.djCachedContextUri && !p.djAwaitingLoad + pendingDJ := p.djAwaitingLoad && p.state.player.ContextUri == contextUri + p.app.log.Debugf("DJ path eval: alreadyDJ=%t pendingDJ=%t djAwaitingLoad=%t stateContextUri=%q clusterContextUri=%q clusterTrack=%v nextTracks=%d", + alreadyDJ, pendingDJ, p.djAwaitingLoad, p.state.player.ContextUri, contextUri, + func() string { + if clusterState.Track != nil { + return clusterState.Track.Uri + } + return "" + }(), nextCount) + if alreadyDJ || pendingDJ { + // If this is a pendingDJ activation cluster but it has no next tracks, + // Spotify sent a lightweight heartbeat instead of the full queue. + // Stay in djAwaitingLoad and wait for the real cluster with tracks. + if nextCount == 0 { + p.app.log.Debugf("DJ cluster has 0 next tracks (pendingDJ=%t alreadyDJ=%t) — ignoring, keeping %d cached tracks", pendingDJ, alreadyDJ, len(p.app.djCachedNextTracks)) + } else { + currentTrack := func() *connectpb.ContextTrack { + if alreadyDJ { + return librespot.ProvidedTrackToContextTrack(p.state.player.Track) + } + // For pendingDJ (cold start): Spotify sets clusterState.Track to whatever + // was already playing - not a new DJ track. Use djCachedNextTracks[0] + // so we start on the actual first DJ track, same as the cache path. + if len(p.app.djCachedNextTracks) > 0 { + return p.app.djCachedNextTracks[0] + } + if clusterState.Track != nil { + return librespot.ProvidedTrackToContextTrack(clusterState.Track) + } + return nil + }() + ctxTracks := make([]*connectpb.ContextTrack, 0, nextCount+1) + ctxTracks = append(ctxTracks, currentTrack) + if alreadyDJ { + ctxTracks = append(ctxTracks, p.app.djCachedNextTracks...) + } else if len(p.app.djCachedNextTracks) > 1 { + ctxTracks = append(ctxTracks, p.app.djCachedNextTracks[1:]...) + } + resolver := spclient.NewStaticContextResolver(p.app.log, contextUri, ctxTracks) + newList := tracks.NewTrackListFromResolver(p.app.log, resolver) + ctxType := librespot.InferSpotifyIdTypeFromContextUri(contextUri) + _ = newList.TrySeek(ctx, tracks.ContextTrackComparator(ctxType, currentTrack)) + p.state.tracks = newList + p.state.player.Track = p.state.tracks.CurrentTrack() + p.state.player.NextTracks = p.state.tracks.NextTracks(ctx, nil) + p.app.log.Debugf("updated active DJ track list (%d next tracks, pendingDJ=%t)", nextCount, pendingDJ) + + // If we were waiting for the first DJ track (pendingDJ), load it now. + // Also restart if stuck (alreadyDJ but no primary stream — e.g. after back-to-back narration clips). + stuckDJ := alreadyDJ && p.primaryStream == nil + if pendingDJ || stuckDJ { + p.djAwaitingLoad = false + p.state.player.ContextUri = contextUri + p.state.player.PositionAsOfTimestamp = 0 + p.app.log.Debugf("loading DJ track from cluster (pendingDJ=%t stuckDJ=%t)", pendingDJ, stuckDJ) + if err := p.loadCurrentTrack(ctx, false, true); err != nil { + p.app.log.WithError(err).Warn("failed loading DJ track from cluster push") + } + } + } // end else (nextCount > 0) + } + } return nil } @@ -155,9 +399,64 @@ func (p *AppPlayer) handlePlayerCommand(ctx context.Context, req dealer.RequestP } p.state.lastTransferTimestamp = transferState.Playback.Timestamp + // Log transfer context metadata and options modes for DJ debugging + p.app.log.Debugf("transfer context metadata: %v", transferState.CurrentSession.Context.Metadata) + if transferState.Options != nil { + p.app.log.Debugf("transfer options modes: %v", transferState.Options.Modes) + } + ctxTracks, err := tracks.NewTrackListFromContext(ctx, p.app.log, p.sess.Spclient(), transferState.CurrentSession.Context) if err != nil { - return fmt.Errorf("failed creating track list: %w", err) + // Dynamic contexts (e.g. Spotify DJ) return empty pages from spclient. + // Use cached DJ next tracks from a recent ClusterUpdate if available, + // otherwise fall back to the current track + queue from the transfer state. + contextUri := transferState.CurrentSession.Context.Uri + staticTracks := []*connectpb.ContextTrack{transferState.Playback.CurrentTrack} + if len(p.app.djCachedNextTracks) > 0 && p.app.djCachedContextUri == contextUri { + staticTracks = append(staticTracks, p.app.djCachedNextTracks...) + p.app.log.WithError(err).Warnf("context resolution failed, using cached DJ queue for %s (%d tracks)", contextUri, len(staticTracks)) + } else { + // Try lexicon-session-provider to get the full DJ queue immediately. + lexCtx, lexErr := p.sess.Spclient().LexiconContextResolve(ctx, contextUri, "state_restore") + if lexErr == nil { + var lexTracks []*connectpb.ContextTrack + for _, page := range lexCtx.GetPages() { + for _, t := range page.GetTracks() { + if t.Uri != "spotify:delimiter" && t.Uri != "" { + lexTracks = append(lexTracks, t) + } + } + } + if len(lexTracks) > 0 { + p.app.log.Infof("lexicon: got %d DJ tracks for transfer %s", len(lexTracks), contextUri) + staticTracks = append(staticTracks, lexTracks...) + p.app.djCachedNextTracks = lexTracks + p.app.djCacheIsOurs = true + // Apply DJ context metadata from lexicon response. + if transferState.CurrentSession.Context.Metadata == nil { + transferState.CurrentSession.Context.Metadata = map[string]string{} + } + for k, v := range lexCtx.Metadata { + transferState.CurrentSession.Context.Metadata[k] = v + } + } else { + p.app.log.Debugf("lexicon: 0 tracks for transfer %s, falling back to djPoll", contextUri) + staticTracks = append(staticTracks, transferState.Queue.Tracks...) + } + } else { + p.app.log.Debugf("lexicon: transfer resolve failed (%v), falling back to djPoll", lexErr) + staticTracks = append(staticTracks, transferState.Queue.Tracks...) + } + p.app.log.WithError(err).Warnf("context resolution failed, building static track list for %s (tracks=%d)", contextUri, len(staticTracks)) + p.app.djCachedContextUri = contextUri + if len(staticTracks) <= 1 { + // Lexicon failed — fall back to polling. + p.djPollAttempts = 0 + p.djPollTimer.Reset(3 * time.Second) + } + } + resolver := spclient.NewStaticContextResolver(p.app.log, contextUri, staticTracks) + ctxTracks = tracks.NewTrackListFromResolver(p.app.log, resolver) } if sessId := transferState.CurrentSession.OriginalSessionId; sessId != nil { @@ -178,13 +477,17 @@ func (p *AppPlayer) handlePlayerCommand(ctx context.Context, req dealer.RequestP // playback // Note: this sets playback speed to 0 or 1 because that's all we're // capable of, depending on whether the playback is paused or not. - p.state.player.Timestamp = transferState.Playback.Timestamp + // Pin Timestamp to now so updateTimestamp() doesn't advance position by + // stale elapsed time. The raw PositionAsOfTimestamp from the transfer is + // the position the phone was at when it sent the command; we start from there. + p.state.player.Timestamp = time.Now().UnixMilli() p.state.player.PositionAsOfTimestamp = int64(transferState.Playback.PositionAsOfTimestamp) p.state.setPaused(pause) // current session p.state.player.PlayOrigin = transferState.CurrentSession.PlayOrigin p.state.player.PlayOrigin.DeviceIdentifier = req.SentByDeviceId + p.app.log.Debugf("transfer PlayOrigin.FeatureIdentifier=%q", p.state.player.PlayOrigin.FeatureIdentifier) p.state.player.ContextUri = transferState.CurrentSession.Context.Uri p.state.player.ContextUrl = transferState.CurrentSession.Context.Url p.state.player.ContextRestrictions = transferState.CurrentSession.Context.Restrictions @@ -255,6 +558,8 @@ func (p *AppPlayer) handlePlayerCommand(ctx context.Context, req dealer.RequestP p.state.player.PlayOrigin = req.Command.PlayOrigin p.state.player.PlayOrigin.DeviceIdentifier = req.SentByDeviceId p.state.player.Suppressions = req.Command.Options.Suppressions + p.app.log.Debugf("play command: contextUri=%s featureId=%s prevContextUri=%s", + req.Command.Context.GetUri(), req.Command.PlayOrigin.GetFeatureIdentifier(), p.state.player.ContextUri) // apply overrides if req.Command.Options.PlayerOptionsOverride != nil { @@ -287,6 +592,13 @@ func (p *AppPlayer) handlePlayerCommand(ctx context.Context, req dealer.RequestP case "pause": return p.pause(ctx) case "resume": + // If we are waiting for DJ tracks, a resume cannot succeed (no stream yet). + // Acknowledge it silently — the state we already sent shows IsPlaying=false, + // which should stop the phone from retrying indefinitely. + if p.djAwaitingLoad { + p.app.log.Debugf("resume while djAwaitingLoad — ignoring (waiting for playlist update)") + return nil + } return p.play(ctx) case "seek_to": var position int64 @@ -357,6 +669,113 @@ func (p *AppPlayer) handlePlayerCommand(ctx context.Context, req dealer.RequestP } } +// djPollContextResolve is called from the Run() select loop when djPollTimer fires. +// It retries ContextResolve for the current DJ playlist to see if tracks are available +// before Spotify sends the ~53s background push notification. +func (p *AppPlayer) djPollContextResolve(ctx context.Context) { + const maxAttempts = 20 + const pollInterval = 3 * time.Second + + if p.app.djCachedContextUri == "" { + return // nothing to poll + } + + p.djPollAttempts++ + p.app.log.Debugf("djPoll: attempt %d for %s", p.djPollAttempts, p.app.djCachedContextUri) + + lexCtx, err := p.sess.Spclient().LexiconContextResolve(ctx, p.app.djCachedContextUri, "state_restore") + if err != nil || lexCtx == nil { + if p.djPollAttempts < maxAttempts { + p.djPollTimer.Reset(pollInterval) + } else { + p.app.log.Debugf("djPoll: giving up after %d attempts for %s", p.djPollAttempts, p.app.djCachedContextUri) + } + return + } + + var newTracks []*connectpb.ContextTrack + for _, page := range lexCtx.GetPages() { + for _, track := range page.GetTracks() { + if track.Uri != "spotify:delimiter" && track.Uri != "" { + newTracks = append(newTracks, track) + } + } + } + + if len(newTracks) == 0 { + if p.djPollAttempts < maxAttempts { + p.djPollTimer.Reset(pollInterval) + } + return + } + + // Merge DJ metadata from lexicon response (includes interactivity fields) + if lexCtx.Metadata != nil { + if p.state.player.ContextMetadata == nil { + p.state.player.ContextMetadata = map[string]string{} + } + for k, v := range lexCtx.Metadata { + p.state.player.ContextMetadata[k] = v + } + } + p.state.player.ContextMetadata["dj.interactivity_enabled"] = "true" + + p.app.log.Infof("djPoll: resolved %d tracks for %s (volatile_id=%s)", len(newTracks), p.app.djCachedContextUri, lexCtx.Metadata["playlist_volatile_context_id"]) + p.app.djCachedNextTracks = newTracks + p.app.djCacheIsOurs = true + + if p.djAwaitingLoad { + currentTrack := newTracks[0] + ctxTracks := make([]*connectpb.ContextTrack, len(newTracks)) + copy(ctxTracks, newTracks) + resolver := spclient.NewStaticContextResolver(p.app.log, p.app.djCachedContextUri, ctxTracks) + newList := tracks.NewTrackListFromResolver(p.app.log, resolver) + ctxType := librespot.InferSpotifyIdTypeFromContextUri(p.app.djCachedContextUri) + _ = newList.TrySeek(ctx, tracks.ContextTrackComparator(ctxType, currentTrack)) + p.state.tracks = newList + p.state.player.Track = p.state.tracks.CurrentTrack() + p.state.player.NextTracks = p.state.tracks.NextTracks(ctx, nil) + p.state.player.PositionAsOfTimestamp = 0 + p.djAwaitingLoad = false + if err := p.loadCurrentTrack(ctx, false, true); err != nil { + p.app.log.WithError(err).Warn("djPoll: failed loading first DJ track") + p.djAwaitingLoad = true + } + } else if p.state.active && p.state.player.ContextUri == p.app.djCachedContextUri { + // Prefer a buffered vibe section over repeating the same lexicon tracks. + // djSectionBuffer is populated by the hm://playlist/ push handler at DJ startup; + // each entry is one section's worth of different tracks. Using it here prevents + // the same 15 lexicon tracks from looping. + var combined []*connectpb.ContextTrack + if p.state.player.Track != nil { + combined = append(combined, &connectpb.ContextTrack{Uri: p.state.player.Track.Uri}) + } + + if len(p.app.djSectionBuffer) > 0 { + // Pop the oldest section and use its tracks. + sectionTracks := p.app.djSectionBuffer[0] + p.app.djSectionBuffer = p.app.djSectionBuffer[1:] + combined = append(combined, sectionTracks...) + p.app.log.Infof("djPoll: using buffered section (%d tracks, %d sections remaining)", len(sectionTracks), len(p.app.djSectionBuffer)) + } else { + // Buffer exhausted — fall back to repeating the lexicon tracks. + combined = append(combined, newTracks...) + p.app.log.Infof("djPoll: section buffer empty, repeating lexicon tracks (%d tracks)", len(newTracks)) + } + + resolver := spclient.NewStaticContextResolver(p.app.log, p.app.djCachedContextUri, combined) + newList := tracks.NewTrackListFromResolver(p.app.log, resolver) + if p.state.player.Track != nil { + ctxType := librespot.InferSpotifyIdTypeFromContextUri(p.app.djCachedContextUri) + _ = newList.TrySeek(ctx, tracks.ContextTrackComparator(ctxType, librespot.ProvidedTrackToContextTrack(p.state.player.Track))) + } + p.state.tracks = newList + p.state.player.NextTracks = p.state.tracks.NextTracks(ctx, nil) + p.updateState(ctx) + p.app.log.Debugf("djPoll: refreshed queue (%d next tracks ahead)", len(p.state.player.NextTracks)) + } +} + func (p *AppPlayer) handleDealerRequest(ctx context.Context, req dealer.Request) error { // Limit ourselves to 30 seconds for handling dealer requests ctx, cancel := context.WithTimeout(ctx, 30*time.Second) @@ -650,8 +1069,12 @@ func (p *AppPlayer) Run(ctx context.Context, apiRecv <-chan ApiRequest, mprisRec } apRecv := p.sess.Accesspoint().Receive(ap.PacketTypeProductInfo, ap.PacketTypeCountryCode) - msgRecv := p.sess.Dealer().ReceiveMessage("hm://pusher/v1/connections/", "hm://connect-state/v1/") + msgRecv := p.sess.Dealer().ReceiveMessage("hm://pusher/v1/connections/", "hm://connect-state/v1/", "hm://playlist/v2/playlist/") reqRecv := p.sess.Dealer().ReceiveRequest("hm://connect-state/v1/player/command") + // Also receive playlist pushes that arrive via the Mercury AP event channel + // (PacketTypeMercuryEvent). These are the vibe-section playlists the server + // sends during an active DJ session — they outnumber the dealer pushes ~10:1. + mercuryPlaylistRecv := p.sess.Mercury().SubscribeEvent("hm://playlist/v2/playlist/") playerRecv := p.player.Receive() volumeTimer := time.NewTimer(time.Minute) @@ -677,6 +1100,11 @@ func (p *AppPlayer) Run(ctx context.Context, apiRecv <-chan ApiRequest, mprisRec if err := p.handleDealerMessage(ctx, msg); err != nil { p.app.log.WithError(err).Warn("failed handling dealer message") } + case evMsg := <-mercuryPlaylistRecv: + // Playlist push via the Mercury AP event channel — same handling as dealer. + if err := p.handleDealerMessage(ctx, dealer.Message{Uri: evMsg.Uri, Payload: evMsg.Payload}); err != nil { + p.app.log.WithError(err).Warn("failed handling mercury playlist event") + } case req, ok := <-reqRecv: if !ok { continue @@ -720,6 +1148,8 @@ func (p *AppPlayer) Run(ctx context.Context, apiRecv <-chan ApiRequest, mprisRec p.handlePlayerEvent(ctx, &ev) case <-p.prefetchTimer.C: p.prefetchNext(ctx) + case <-p.djPollTimer.C: + p.djPollContextResolve(ctx) case volume := <-p.volumeUpdate: // Received a new volume: from Spotify Connect, from the REST API, // or from the system volume mixer. diff --git a/cmd/daemon/state.go b/cmd/daemon/state.go index 2faa2ff5..6c1a1a56 100644 --- a/cmd/daemon/state.go +++ b/cmd/daemon/state.go @@ -144,12 +144,13 @@ func (p *AppPlayer) initState() { SupportsSetBackendMetadata: true, SupportsTransferCommand: true, SupportsCommandRequest: true, - IsVoiceEnabled: false, + IsVoiceEnabled: true, NeedsFullPlayerState: false, SupportsGzipPushes: true, SupportsSetOptionsCommand: true, SupportsHifi: nil, // TODO: nice to have? ConnectCapabilities: "", + SupportsDj: true, }, }, } @@ -191,6 +192,15 @@ func (p *AppPlayer) putConnectState(ctx context.Context, reason connectpb.PutSta putStateReq.LastCommandSentByDeviceId = p.state.lastCommand.SentByDeviceId } + // Debug: log DJ-relevant fields in player state + if p.state.player.PlayOrigin != nil && p.state.player.PlayOrigin.FeatureIdentifier == "dynamic-sessions" { + djEnabled := p.state.player.ContextMetadata["dj.interactivity_enabled"] + jumpBtn := p.state.player.ContextMetadata["dj.interactivity.localized_jump_button"] + p.app.log.Debugf("putConnectState DJ: reason=%v nextTracks=%d isPlaying=%t isBuffering=%t djEnabled=%q jumpBtn=%q contextUri=%s", + reason, len(p.state.player.NextTracks), p.state.player.IsPlaying, p.state.player.IsBuffering, + djEnabled, jumpBtn, p.state.player.ContextUri) + } + // finally send the state update return p.sess.Spclient().PutConnectState(ctx, p.spotConnId, putStateReq) } diff --git a/ids.go b/ids.go index e08de6b5..18d52291 100644 --- a/ids.go +++ b/ids.go @@ -50,6 +50,16 @@ func ContextTrackToProvidedTrack(typ SpotifyIdType, track *connectpb.ContextTrac } } +// ProvidedTrackToContextTrack converts a ProvidedTrack back to a ContextTrack. +// Used to rebuild a ContextResolver from NextTracks received in a ClusterUpdate. +func ProvidedTrackToContextTrack(track *connectpb.ProvidedTrack) *connectpb.ContextTrack { + return &connectpb.ContextTrack{ + Uri: track.Uri, + Uid: track.Uid, + Metadata: track.Metadata, + } +} + type SpotifyIdType string const ( diff --git a/mercury/client.go b/mercury/client.go index 88bef259..6e037135 100644 --- a/mercury/client.go +++ b/mercury/client.go @@ -9,6 +9,7 @@ import ( "github.com/devgianlu/go-librespot/ap" spotifypb "github.com/devgianlu/go-librespot/proto/spotify" "google.golang.org/protobuf/proto" + "strings" "sync" "time" ) @@ -27,6 +28,19 @@ type hermesResponse struct { err error } +// eventSubscriber receives Mercury AP push events (PacketTypeMercuryEvent) whose +// URI matches one of the registered prefixes. Used for playlist section pushes. +type eventSubscriber struct { + uriPrefixes []string + c chan eventMessage +} + +// EventMessage carries the URI and raw payload of a Mercury AP push event. +type eventMessage struct { + Uri string + Payload []byte +} + type Client struct { log librespot.Logger ap *ap.Accesspoint @@ -35,12 +49,18 @@ type Client struct { reqChan chan hermesRequest stopChan chan struct{} + + eventSubsLock sync.RWMutex + eventSubs []eventSubscriber } func NewClient(log librespot.Logger, accesspoint *ap.Accesspoint) *Client { c := &Client{log: log, ap: accesspoint} c.reqChan = make(chan hermesRequest) c.stopChan = make(chan struct{}, 1) + // Start receiving immediately so MercuryEvent packets that arrive right after + // AP authentication (before any Request() is called) are not dropped. + c.startReceiving() return c } @@ -60,7 +80,74 @@ func (c *Client) recvLoop() { c.stopChan <- struct{}{} return case pkt := <-ch: - if pkt.Type != ap.PacketTypeMercuryReq { + if pkt.Type == ap.PacketTypeMercuryEvent { + // Decode and log the event so we can inspect what URI/payload arrives + // immediately after a DJ transfer (these come via the AP Mercury channel). + evResp := bytes.NewReader(pkt.Payload) + var evSeqLen uint16 + _ = binary.Read(evResp, binary.BigEndian, &evSeqLen) + var evSeq uint64 + switch evSeqLen { + case 8: + _ = binary.Read(evResp, binary.BigEndian, &evSeq) + case 4: + var s uint32 + _ = binary.Read(evResp, binary.BigEndian, &s) + evSeq = uint64(s) + case 2: + var s uint16 + _ = binary.Read(evResp, binary.BigEndian, &s) + evSeq = uint64(s) + } + var evFlags uint8 + _ = binary.Read(evResp, binary.BigEndian, &evFlags) + var evPartsCount uint16 + _ = binary.Read(evResp, binary.BigEndian, &evPartsCount) + evParts := make([][]byte, evPartsCount) + for i := uint16(0); i < evPartsCount; i++ { + var partLen uint16 + _ = binary.Read(evResp, binary.BigEndian, &partLen) + part := make([]byte, partLen) + _, _ = evResp.Read(part) + evParts[i] = part + } + if len(evParts) > 0 { + var evHeader spotifypb.MercuryHeader + if err := proto.Unmarshal(evParts[0], &evHeader); err == nil { + uri := evHeader.GetUri() + var payload []byte + if len(evParts) > 1 { + payload = evParts[1] + } + payloadLen := 0 + for _, p := range evParts[1:] { + payloadLen += len(p) + } + c.log.Debugf("mercury event: seq=%d flags=%d uri=%s statusCode=%v parts=%d payloadLen=%d", + evSeq, evFlags, uri, evHeader.StatusCode, len(evParts), payloadLen) + // Route to any registered event subscribers. + c.eventSubsLock.RLock() + for _, sub := range c.eventSubs { + for _, prefix := range sub.uriPrefixes { + if strings.HasPrefix(uri, prefix) { + select { + case sub.c <- eventMessage{Uri: uri, Payload: payload}: + default: + c.log.Debugf("mercury event subscriber full, dropping %s", uri) + } + break + } + } + } + c.eventSubsLock.RUnlock() + } else { + c.log.Debugf("mercury event: seq=%d flags=%d totalPayload=%d (header parse err: %v)", evSeq, evFlags, len(pkt.Payload), err) + } + } else { + c.log.Debugf("mercury event: seq=%d flags=%d totalPayload=%d (no parts)", evSeq, evFlags, len(pkt.Payload)) + } + continue + } else if pkt.Type != ap.PacketTypeMercuryReq { c.log.Warnf("skipping mercury packet with type: %s", pkt.Type.String()) continue } @@ -215,3 +302,14 @@ func (c *Client) Close() { c.stopChan <- struct{}{} <-c.stopChan } + +// SubscribeEvent returns a channel that receives Mercury AP push events (PacketTypeMercuryEvent) +// whose URI starts with one of the given prefixes. The channel is buffered to avoid blocking +// the receive loop when the caller is temporarily busy. +func (c *Client) SubscribeEvent(uriPrefixes ...string) <-chan eventMessage { + ch := make(chan eventMessage, 64) + c.eventSubsLock.Lock() + c.eventSubs = append(c.eventSubs, eventSubscriber{uriPrefixes: uriPrefixes, c: ch}) + c.eventSubsLock.Unlock() + return ch +} diff --git a/player/dj.go b/player/dj.go new file mode 100644 index 00000000..e627190a --- /dev/null +++ b/player/dj.go @@ -0,0 +1,63 @@ +package player + +import ( + "encoding/hex" + "strconv" + "strings" + + librespot "github.com/devgianlu/go-librespot" + connectpb "github.com/devgianlu/go-librespot/proto/spotify/connectstate" +) + +// IsDJTrack reports whether the provided track is part of a Spotify DJ session. +func IsDJTrack(track *connectpb.ProvidedTrack) bool { + return strings.HasPrefix(track.Metadata["source.components"], "YourDJ,") +} + +// DJNarration holds the metadata for a single TTS narration clip. +type DJNarration struct { + CommentaryId string // UUID string — is a Spotify GID (16 bytes) + SSML string + Voice string + Loudness string + SampleRate string + TtsProvider string + DecisionId string + Image string +} + +// NarrationForTrack extracts the DJ narration for the given type ("intro", "jump", "outro"). +// Returns nil if no narration is present for this track/type. +func NarrationForTrack(track *connectpb.ProvidedTrack, typ string) *DJNarration { + id := track.Metadata["narration."+typ+".commentary_id"] + if id == "" { + return nil + } + return &DJNarration{ + CommentaryId: id, + SSML: track.Metadata["narration."+typ+".ssml"], + Voice: track.Metadata["narration."+typ+".voice"], + Loudness: track.Metadata["narration."+typ+".loudness"], + SampleRate: track.Metadata["narration."+typ+".sample_rate"], + TtsProvider: track.Metadata["narration."+typ+".tts_provider"], + DecisionId: track.Metadata["narration."+typ+".decision_id"], + Image: track.Metadata["narration."+typ+".image"], + } +} + +// AutomixCueMs returns the fade-out cue point position in milliseconds for the track. +// Returns 0 if not present. +func AutomixCueMs(track *connectpb.ProvidedTrack) int64 { + ms, _ := strconv.ParseInt(track.Metadata["automix.fade_out_cuepoint.position"], 10, 64) + return ms +} + +// NarrationSpotifyId converts a commentary_id UUID string to a SpotifyId (GID). +func NarrationSpotifyId(commentaryId string) (librespot.SpotifyId, error) { + hexStr := strings.ReplaceAll(commentaryId, "-", "") + gid, err := hex.DecodeString(hexStr) + if err != nil { + return librespot.SpotifyId{}, err + } + return librespot.SpotifyIdFromGid(librespot.SpotifyIdTypeTrack, gid), nil +} diff --git a/player/player.go b/player/player.go index 8af38587..ff2b7e0b 100644 --- a/player/player.go +++ b/player/player.go @@ -12,6 +12,7 @@ import ( librespot "github.com/devgianlu/go-librespot" "github.com/devgianlu/go-librespot/audio" "github.com/devgianlu/go-librespot/flac" + "github.com/devgianlu/go-librespot/mercury" "github.com/devgianlu/go-librespot/output" "github.com/devgianlu/go-librespot/playplay" downloadpb "github.com/devgianlu/go-librespot/proto/spotify/download" @@ -22,6 +23,7 @@ import ( "github.com/devgianlu/go-librespot/spclient" "github.com/devgianlu/go-librespot/vorbis" "golang.org/x/exp/rand" + "google.golang.org/protobuf/proto" ) const ( @@ -47,6 +49,7 @@ type Player struct { countryCode *string sp *spclient.Spclient + mercury *mercury.Client audioKey *audio.KeyProvider events EventManager @@ -90,6 +93,7 @@ type playerCmdDataSet struct { type Options struct { Spclient *spclient.Spclient + Mercury *mercury.Client AudioKey *audio.KeyProvider Events EventManager @@ -162,6 +166,7 @@ func NewPlayer(opts *Options) (*Player, error) { p := &Player{ log: opts.Log, sp: opts.Spclient, + mercury: opts.Mercury, audioKey: opts.AudioKey, events: opts.Events, cdnQuarantine: make(map[string]time.Time), @@ -720,3 +725,90 @@ func (p *Player) NewStream(ctx context.Context, client *http.Client, spotId libr return &Stream{PlaybackId: playbackId, Source: stream, Media: media, File: file}, nil } + +// getMercuryTrack fetches track metadata via the Mercury AP endpoint. +// TTS narration tracks are not available via ExtendedMetadata (returns 404). +func (p *Player) getMercuryTrack(ctx context.Context, spotId librespot.SpotifyId) (*metadatapb.Track, error) { + uri := "hm://metadata/4/track/" + spotId.Base62() + data, err := p.mercury.Request(ctx, "GET", uri, nil, nil) + if err != nil { + return nil, fmt.Errorf("mercury track metadata failed: %w", err) + } + var track metadatapb.Track + if err := proto.Unmarshal(data, &track); err != nil { + return nil, fmt.Errorf("failed unmarshaling mercury track: %w", err) + } + return &track, nil +} + +// NewNarrationStream creates a Stream for a DJ TTS narration clip. +// Uses Mercury for metadata because TTS tracks are absent from ExtendedMetadata. +func (p *Player) NewNarrationStream(ctx context.Context, client *http.Client, spotId librespot.SpotifyId, bitrate int, mediaPosition int64) (*Stream, error) { + log := p.log.WithField("uri", spotId.Uri()) + + playbackId := make([]byte, 16) + _, _ = rand.Read(playbackId) + + p.events.PreStreamLoadNew(playbackId, spotId, mediaPosition) + + trackMeta, err := p.getMercuryTrack(ctx, spotId) + if err != nil { + return nil, err + } + + media := librespot.NewMediaFromTrack(trackMeta) + spotId = media.Id() + + file := selectBestMediaFormat(trackMeta.File, bitrate, false) + if file == nil { + return nil, librespot.ErrNoSupportedFormats + } + + p.events.PostStreamResolveAudioFile(playbackId, int32(bitrate), media, file) + + log.Debugf("selected narration format %s (%x)", file.Format.String(), file.FileId) + + audioKey, err := p.retrieveAudioKey(ctx, spotId, file.FileId) + if err != nil { + return nil, fmt.Errorf("failed retrieving narration audio key: %w", err) + } + + p.events.PostStreamRequestAudioKey(playbackId) + + storageResolve, err := p.sp.ResolveStorageInteractive(ctx, file.FileId, file.Format, false) + if err != nil { + return nil, fmt.Errorf("failed resolving narration storage: %w", err) + } + + p.events.PostStreamResolveStorage(playbackId) + + rawStream, err := p.httpChunkedReaderFromStorageResolve(log, client, storageResolve) + if err != nil { + return nil, fmt.Errorf("failed creating narration chunked reader: %w", err) + } + + p.events.PostStreamInitHttpChunkReader(playbackId, rawStream) + + decryptedStream, err := audio.NewAesAudioDecryptor(rawStream, audioKey) + if err != nil { + return nil, fmt.Errorf("failed initializing narration audio decryptor: %w", err) + } + + audioStream, metaPage, err := vorbis.ExtractMetadataPage(p.log, decryptedStream, rawStream.Size()) + if err != nil { + return nil, fmt.Errorf("failed reading narration metadata page: %w", err) + } + + vorbisStream, err := vorbis.New(log, audioStream, metaPage, 1.0) + if err != nil { + return nil, fmt.Errorf("failed initializing narration vorbis stream: %w", err) + } + + if vorbisStream.SampleRate != SampleRate { + return nil, fmt.Errorf("unsupported narration sample rate: %d", vorbisStream.SampleRate) + } else if vorbisStream.Channels != Channels { + return nil, fmt.Errorf("unsupported narration channels: %d", vorbisStream.Channels) + } + + return &Stream{PlaybackId: playbackId, Source: vorbisStream, Media: media, File: file}, nil +} diff --git a/spclient/context_resolver.go b/spclient/context_resolver.go index 20f7d7b3..08f7fe09 100644 --- a/spclient/context_resolver.go +++ b/spclient/context_resolver.go @@ -48,6 +48,18 @@ func isTracksComplete(ctx *connectpb.Context) bool { return expectedNumberOfTracks == totalLength } +// NewStaticContextResolver creates a ContextResolver backed by a pre-known +// list of tracks. Used for dynamic contexts (e.g. Spotify DJ) where +// spclient returns empty pages. +func NewStaticContextResolver(log librespot.Logger, uri string, contextTracks []*connectpb.ContextTrack) *ContextResolver { + typ := librespot.InferSpotifyIdTypeFromContextUri(uri) + spotCtx := &connectpb.Context{ + Uri: uri, + Pages: []*connectpb.ContextPage{{Tracks: contextTracks}}, + } + return &ContextResolver{log: log, sp: nil, typ: typ, ctx: spotCtx} +} + func NewContextResolver(ctx context.Context, log librespot.Logger, sp *Spclient, spotCtx *connectpb.Context) (_ *ContextResolver, err error) { typ := librespot.InferSpotifyIdTypeFromContextUri(spotCtx.Uri) @@ -69,6 +81,19 @@ func NewContextResolver(ctx context.Context, log librespot.Logger, sp *Spclient, spotCtx = newSpotCtx } + // If every page is empty (no tracks, no PageUrl, no NextPageUrl), the context + // is unusable — callers should fall back to a static resolver. + allPagesEmpty := len(spotCtx.Pages) > 0 + for _, page := range spotCtx.Pages { + if len(page.Tracks) > 0 || len(page.PageUrl) > 0 || len(page.NextPageUrl) > 0 { + allPagesEmpty = false + break + } + } + if allPagesEmpty { + return nil, fmt.Errorf("context %s has only empty pages", spotCtx.Uri) + } + autoplay := strings.HasPrefix(spotCtx.Uri, "spotify:station:") for _, page := range spotCtx.Pages { for _, track := range page.Tracks { diff --git a/spclient/spclient.go b/spclient/spclient.go index 5760b3d5..fd3b48f2 100644 --- a/spclient/spclient.go +++ b/spclient/spclient.go @@ -373,6 +373,35 @@ func (c *Spclient) ContextResolve(ctx context.Context, uri string) (*connectpb.C return &context, nil } +// LexiconContextResolve calls the lexicon-session-provider endpoint which returns the +// full DJ session (tracks + metadata) as JSON in connectpb.Context format. +// reason is "interactive" for fresh user-initiated DJ starts, "state_restore" for resumes. +func (c *Spclient) LexiconContextResolve(ctx context.Context, contextUri, reason string) (*connectpb.Context, error) { + q := url.Values{"contextUri": {contextUri}, "reason": {reason}} + resp, err := c.Request(ctx, "GET", "/lexicon-session-provider/context-resolve/v2/session", q, nil, nil) + if err != nil { + return nil, err + } + + defer func() { _ = resp.Body.Close() }() + + if resp.StatusCode != 200 { + return nil, fmt.Errorf("invalid status code from lexicon context resolve: %d", resp.StatusCode) + } + + respBytes, err := io.ReadAll(resp.Body) + if err != nil { + return nil, fmt.Errorf("failed reading lexicon response body: %w", err) + } + + var context connectpb.Context + if err := json.Unmarshal(respBytes, &context); err != nil { + return nil, fmt.Errorf("failed json unmarshalling lexicon Context: %w", err) + } + + return &context, nil +} + func (c *Spclient) ContextResolveAutoplay(ctx context.Context, reqProto *playerpb.AutoplayContextRequest) (*connectpb.Context, error) { reqBody, err := proto.Marshal(reqProto) if err != nil { diff --git a/tracks/tracks.go b/tracks/tracks.go index a33d0fdb..afa019e1 100644 --- a/tracks/tracks.go +++ b/tracks/tracks.go @@ -27,6 +27,16 @@ type List struct { queue []*connectpb.ContextTrack } +// NewTrackListFromResolver creates a List from an already-built ContextResolver. +// Used when the caller constructs a static resolver (e.g. for Spotify DJ). +func NewTrackListFromResolver(log_ librespot.Logger, resolver *spclient.ContextResolver) *List { + tl := &List{} + tl.ctx = resolver + tl.log = log_.WithField("uri", resolver.Uri()) + tl.tracks = newPagedList[*connectpb.ContextTrack](tl.log, resolver) + return tl +} + func NewTrackListFromContext(ctx context.Context, log_ librespot.Logger, sp *spclient.Spclient, spotCtx *connectpb.Context) (_ *List, err error) { tl := &List{} tl.ctx, err = spclient.NewContextResolver(ctx, log_, sp, spotCtx)