Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import org.scalajs.linker.interface.ModuleSplitStyle

import scala.sys.process.*

lazy val projectVersion = "2.4.2"
lazy val projectVersion = "2.4.3"
lazy val organizationName = "ru.trett"
lazy val scala3Version = "3.7.4"
lazy val circeVersion = "0.14.15"
Expand Down Expand Up @@ -120,6 +120,7 @@ lazy val server = project
).map(_ % doobieVersion),
libraryDependencies += "org.jsoup" % "jsoup" % "1.21.2",
libraryDependencies += "com.github.blemale" %% "scaffeine" % "5.3.0",
libraryDependencies += "io.circe" %% "circe-fs2" % "0.14.1",
libraryDependencies += "org.flywaydb" % "flyway-database-postgresql" % "11.17.2" % "runtime",
libraryDependencies += "org.scalatest" %% "scalatest" % "3.2.19" % Test,
libraryDependencies += "org.scalamock" %% "scalamock" % "7.5.2" % Test,
Expand Down
18 changes: 18 additions & 0 deletions client/src/main/scala/client/Models.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,24 @@ object Decoders:
}
given Decoder[SummaryResponse] = deriveDecoder

import SummaryEvent.*
given Decoder[Content] = deriveDecoder
given Decoder[Metadata] = deriveDecoder
given Decoder[FunFact] = deriveDecoder
given Decoder[Error] = deriveDecoder

given Decoder[SummaryEvent] = Decoder.instance { cursor =>
cursor.downField("type").as[String].flatMap {
case "content" => cursor.as[Content]
case "metadata" => cursor.as[Metadata]
case "funFact" => cursor.as[FunFact]
case "error" => cursor.as[Error]
case "done" => Right(Done)
case other =>
Left(io.circe.DecodingFailure(s"Unknown SummaryEvent type: $other", cursor.history))
}
}

final class Model:
val feedVar: Var[FeedItemList] = Var(List())
val channelVar: Var[ChannelList] = Var(List())
Expand Down
16 changes: 16 additions & 0 deletions client/src/main/scala/client/NetworkUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import scala.util.Failure
import scala.util.Success
import scala.util.Try
import ru.trett.rss.models.UserSettings
import ru.trett.rss.models.SummaryEvent

object NetworkUtils {

Expand Down Expand Up @@ -68,4 +69,19 @@ object NetworkUtils {

def logout(): EventStream[Unit] =
FetchStream.post("/api/logout", _.body("")).mapTo(())

def streamSummary(url: String): (EventStream[Try[SummaryEvent]], () => Unit) =
val bus = new EventBus[Try[SummaryEvent]]
val source = new dom.EventSource(url)

source.onmessage = msg =>
decode[SummaryEvent](msg.data.toString) match
case Right(event) => bus.emit(Success(event))
case Left(err) => bus.emit(Failure(err))

source.onerror = _ =>
bus.emit(Failure(new RuntimeException("Stream error")))
source.close()

(bus.events, () => source.close())
}
97 changes: 57 additions & 40 deletions client/src/main/scala/client/SummaryPage.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,12 @@ import be.doeraene.webcomponents.ui5.*
import be.doeraene.webcomponents.ui5.configkeys.*
import client.NetworkUtils.*
import com.raquo.laminar.api.L.*
import ru.trett.rss.models.{SummaryResponse, SummarySuccess, SummaryError}
import ru.trett.rss.models.SummaryEvent

import scala.util.{Failure, Success, Try}
import scala.util.{Failure, Success}

object SummaryPage:

import Decoders.given

private val model = AppState.model

private case class PageState(
Expand All @@ -27,52 +25,71 @@ object SummaryPage:
private val stateSignal = stateVar.signal
private val loadMoreBus: EventBus[Unit] = new EventBus

private var currentSubscription: Option[Subscription] = None
private var currentClose: Option[() => Unit] = None

private def resetState(): Unit = stateVar.set(PageState())

private def fetchSummaryBatch(): EventStream[Try[Option[SummaryResponse]]] =
FetchStream
.withDecoder(responseDecoder[SummaryResponse])
.get("/api/summarize")

private val batchObserver: Observer[Try[Option[SummaryResponse]]] = Observer {
case Success(Some(resp)) if resp.funFact.isDefined =>
stateVar.update(_.copy(isLoading = false, hasMore = false, funFact = resp.funFact))

case Success(Some(resp)) if resp.feedsProcessed > 0 =>
val (newContent, isError) = resp.result match
case SummarySuccess(html) => (html, false)
case SummaryError(message) => (message, true)
stateVar.update(s =>
s.copy(
isLoading = false,
summaries = s.summaries :+ newContent,
hasError = isError,
totalProcessed = s.totalProcessed + resp.feedsProcessed,
hasMore = resp.hasMore
)
private def cleanup(): Unit =
currentSubscription.foreach(_.kill())
currentClose.foreach(_())
currentSubscription = None
currentClose = None

private def startStreaming(offset: Int): Unit =
cleanup()

stateVar.update(s =>
s.copy(
isLoading = true,
hasError = false,
summaries = if offset > 0 then s.summaries :+ "" else s.summaries
)
Home.refreshUnreadCountBus.emit(())
)

val (stream, close) = NetworkUtils.streamSummary(s"/api/summarize?offset=$offset")
currentClose = Some(close)

currentSubscription = Some(stream.foreach {
case Success(SummaryEvent.Content(text)) =>
stateVar.update(s =>
val newSummaries =
if s.summaries.isEmpty then List(text)
else s.summaries.init :+ (s.summaries.last + text)
s.copy(summaries = newSummaries)
)

case Success(SummaryEvent.Metadata(processed, remaining, more)) =>
stateVar.update(s =>
s.copy(totalProcessed = s.totalProcessed + processed, hasMore = more)
)
Home.refreshUnreadCountBus.emit(())

case Success(SummaryEvent.FunFact(text)) =>
stateVar.update(_.copy(funFact = Some(text), isLoading = false))

case Success(SummaryEvent.Error(msg)) =>
stateVar.update(_.copy(hasError = true, isLoading = false))
client.NotifyComponent.errorMessage(new RuntimeException(msg))

case Success(_) =>
stateVar.update(_.copy(isLoading = false, hasError = true))
case Success(SummaryEvent.Done) =>
stateVar.update(_.copy(isLoading = false))
cleanup()

case Failure(err) =>
stateVar.update(_.copy(isLoading = false, hasError = true))
handleError(err)
}
case Failure(err) =>
stateVar.update(_.copy(hasError = true, isLoading = false))
cleanup()
handleError(err)
}(unsafeWindowOwner))

def render: Element =
resetState()
val initialFetch = fetchSummaryBatch()
div(
cls := "main-content",
initialFetch --> batchObserver,
onMountBind { ctx =>
loadMoreBus.events.flatMapSwitch { _ =>
stateVar.update(_.copy(isLoading = true))
fetchSummaryBatch()
} --> batchObserver
},
onMountUnmountCallback(mount = _ => startStreaming(0), unmount = _ => cleanup()),
loadMoreBus.events.map(_ => stateVar.now().totalProcessed) --> (offset =>
startStreaming(offset)
),
Card(
_.slots.header := CardHeader(
_.titleText := "AI Summary",
Expand Down
2 changes: 1 addition & 1 deletion scripts/local-docker/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ services:
- host.docker.internal:host-gateway

server:
image: server:2.4.2
image: server:2.4.3
container_name: rss_server
restart: always
depends_on:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,13 @@ package ru.trett.rss.server.codecs
import io.circe.{Decoder, Encoder}
import io.circe.generic.semiauto.*
import io.circe.syntax.*
import ru.trett.rss.models.{SummaryResult, SummarySuccess, SummaryError, SummaryResponse}
import ru.trett.rss.models.{
SummaryResult,
SummarySuccess,
SummaryError,
SummaryResponse,
SummaryEvent
}

object SummaryCodecs:
given Encoder[SummarySuccess] = deriveEncoder
Expand Down Expand Up @@ -34,3 +40,17 @@ object SummaryCodecs:

given Encoder[SummaryResponse] = deriveEncoder
given Decoder[SummaryResponse] = deriveDecoder

import SummaryEvent.*
given Encoder[Content] = deriveEncoder
given Encoder[Metadata] = deriveEncoder
given Encoder[FunFact] = deriveEncoder
given Encoder[Error] = deriveEncoder

given Encoder[SummaryEvent] = Encoder.instance {
case c: Content => c.asJson.mapObject(_.add("type", "content".asJson))
case m: Metadata => m.asJson.mapObject(_.add("type", "metadata".asJson))
case f: FunFact => f.asJson.mapObject(_.add("type", "funFact".asJson))
case e: Error => e.asJson.mapObject(_.add("type", "error".asJson))
case Done => io.circe.Json.obj("type" -> "done".asJson)
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@ package ru.trett.rss.server.controllers

import cats.effect.IO
import org.http4s.AuthedRoutes
import org.http4s.circe.CirceEntityEncoder.*
import org.http4s.ServerSentEvent
import org.http4s.dsl.io.*
import io.circe.syntax.*
import ru.trett.rss.server.models.User
import ru.trett.rss.server.services.SummarizeService
import ru.trett.rss.server.codecs.SummaryCodecs.given
Expand All @@ -15,8 +16,8 @@ object SummarizeController:
def routes(summarizeService: SummarizeService): AuthedRoutes[User, IO] =
AuthedRoutes.of[User, IO] {
case GET -> Root / "api" / "summarize" :? OffsetQueryParamMatcher(offset) as user =>
for
summary <- summarizeService.getSummary(user, offset.getOrElse(0))
response <- Ok(summary)
yield response
val stream = summarizeService
.streamSummary(user, offset.getOrElse(0))
.map(event => ServerSentEvent(data = Some(event.asJson.noSpaces)))
Ok(stream)
}
Loading