From e72b6d586f02b98d0ced4a0d11799ac6e258b6d7 Mon Sep 17 00:00:00 2001 From: trett Date: Thu, 12 Feb 2026 19:53:15 +0100 Subject: [PATCH 1/8] streaming response --- build.sbt | 3 +- client/src/main/scala/client/Models.scala | 18 ++ .../src/main/scala/client/NetworkUtils.scala | 16 ++ .../src/main/scala/client/SummaryPage.scala | 97 +++++---- scripts/local-docker/docker-compose.yml | 2 +- .../rss/server/codecs/SummaryCodecs.scala | 22 ++- .../controllers/SummarizeController.scala | 11 +- .../server/services/SummarizeService.scala | 184 +++++++++--------- .../ru/trett/rss/models/SummaryEvent.scala | 12 ++ 9 files changed, 224 insertions(+), 141 deletions(-) create mode 100644 shared/src/main/scala/ru/trett/rss/models/SummaryEvent.scala diff --git a/build.sbt b/build.sbt index 07d74db..011c9bb 100644 --- a/build.sbt +++ b/build.sbt @@ -4,7 +4,7 @@ import org.scalajs.linker.interface.ModuleSplitStyle import scala.sys.process.* -lazy val projectVersion = "2.4.2" +lazy val projectVersion = "2.4.3" lazy val organizationName = "ru.trett" lazy val scala3Version = "3.7.4" lazy val circeVersion = "0.14.15" @@ -120,6 +120,7 @@ lazy val server = project ).map(_ % doobieVersion), libraryDependencies += "org.jsoup" % "jsoup" % "1.21.2", libraryDependencies += "com.github.blemale" %% "scaffeine" % "5.3.0", + libraryDependencies += "io.circe" %% "circe-fs2" % "0.14.1", libraryDependencies += "org.flywaydb" % "flyway-database-postgresql" % "11.17.2" % "runtime", libraryDependencies += "org.scalatest" %% "scalatest" % "3.2.19" % Test, libraryDependencies += "org.scalamock" %% "scalamock" % "7.5.2" % Test, diff --git a/client/src/main/scala/client/Models.scala b/client/src/main/scala/client/Models.scala index 8232cf2..f980498 100644 --- a/client/src/main/scala/client/Models.scala +++ b/client/src/main/scala/client/Models.scala @@ -24,6 +24,24 @@ object Decoders: } given Decoder[SummaryResponse] = deriveDecoder + import SummaryEvent.* + given Decoder[Content] = deriveDecoder + given Decoder[Metadata] = deriveDecoder + given Decoder[FunFact] = deriveDecoder + given Decoder[Error] = deriveDecoder + + given Decoder[SummaryEvent] = Decoder.instance { cursor => + cursor.downField("type").as[String].flatMap { + case "content" => cursor.as[Content] + case "metadata" => cursor.as[Metadata] + case "funFact" => cursor.as[FunFact] + case "error" => cursor.as[Error] + case "done" => Right(Done) + case other => + Left(io.circe.DecodingFailure(s"Unknown SummaryEvent type: $other", cursor.history)) + } + } + final class Model: val feedVar: Var[FeedItemList] = Var(List()) val channelVar: Var[ChannelList] = Var(List()) diff --git a/client/src/main/scala/client/NetworkUtils.scala b/client/src/main/scala/client/NetworkUtils.scala index 1ea0474..66935d3 100644 --- a/client/src/main/scala/client/NetworkUtils.scala +++ b/client/src/main/scala/client/NetworkUtils.scala @@ -15,6 +15,7 @@ import scala.util.Failure import scala.util.Success import scala.util.Try import ru.trett.rss.models.UserSettings +import ru.trett.rss.models.SummaryEvent object NetworkUtils { @@ -68,4 +69,19 @@ object NetworkUtils { def logout(): EventStream[Unit] = FetchStream.post("/api/logout", _.body("")).mapTo(()) + + def streamSummary(url: String): (EventStream[Try[SummaryEvent]], () => Unit) = + val bus = new EventBus[Try[SummaryEvent]] + val source = new dom.EventSource(url) + + source.onmessage = msg => + decode[SummaryEvent](msg.data.toString) match + case Right(event) => bus.emit(Success(event)) + case Left(err) => bus.emit(Failure(err)) + + source.onerror = _ => + bus.emit(Failure(new RuntimeException("Stream error"))) + source.close() + + (bus.events, () => source.close()) } diff --git a/client/src/main/scala/client/SummaryPage.scala b/client/src/main/scala/client/SummaryPage.scala index 6f16c54..674a805 100644 --- a/client/src/main/scala/client/SummaryPage.scala +++ b/client/src/main/scala/client/SummaryPage.scala @@ -4,14 +4,12 @@ import be.doeraene.webcomponents.ui5.* import be.doeraene.webcomponents.ui5.configkeys.* import client.NetworkUtils.* import com.raquo.laminar.api.L.* -import ru.trett.rss.models.{SummaryResponse, SummarySuccess, SummaryError} +import ru.trett.rss.models.SummaryEvent -import scala.util.{Failure, Success, Try} +import scala.util.{Failure, Success} object SummaryPage: - import Decoders.given - private val model = AppState.model private case class PageState( @@ -27,52 +25,71 @@ object SummaryPage: private val stateSignal = stateVar.signal private val loadMoreBus: EventBus[Unit] = new EventBus + private var currentSubscription: Option[Subscription] = None + private var currentClose: Option[() => Unit] = None + private def resetState(): Unit = stateVar.set(PageState()) - private def fetchSummaryBatch(): EventStream[Try[Option[SummaryResponse]]] = - FetchStream - .withDecoder(responseDecoder[SummaryResponse]) - .get("/api/summarize") - - private val batchObserver: Observer[Try[Option[SummaryResponse]]] = Observer { - case Success(Some(resp)) if resp.funFact.isDefined => - stateVar.update(_.copy(isLoading = false, hasMore = false, funFact = resp.funFact)) - - case Success(Some(resp)) if resp.feedsProcessed > 0 => - val (newContent, isError) = resp.result match - case SummarySuccess(html) => (html, false) - case SummaryError(message) => (message, true) - stateVar.update(s => - s.copy( - isLoading = false, - summaries = s.summaries :+ newContent, - hasError = isError, - totalProcessed = s.totalProcessed + resp.feedsProcessed, - hasMore = resp.hasMore - ) + private def cleanup(): Unit = + currentSubscription.foreach(_.kill()) + currentClose.foreach(_()) + currentSubscription = None + currentClose = None + + private def startStreaming(offset: Int): Unit = + cleanup() + + stateVar.update(s => + s.copy( + isLoading = true, + hasError = false, + summaries = if offset > 0 then s.summaries :+ "" else s.summaries ) - Home.refreshUnreadCountBus.emit(()) + ) + + val (stream, close) = NetworkUtils.streamSummary(s"/api/summarize?offset=$offset") + currentClose = Some(close) + + currentSubscription = Some(stream.foreach { + case Success(SummaryEvent.Content(text)) => + stateVar.update(s => + val newSummaries = + if s.summaries.isEmpty then List(text) + else s.summaries.init :+ (s.summaries.last + text) + s.copy(summaries = newSummaries) + ) + + case Success(SummaryEvent.Metadata(processed, remaining, more)) => + stateVar.update(s => + s.copy(totalProcessed = s.totalProcessed + processed, hasMore = more) + ) + Home.refreshUnreadCountBus.emit(()) + + case Success(SummaryEvent.FunFact(text)) => + stateVar.update(_.copy(funFact = Some(text), isLoading = false)) + + case Success(SummaryEvent.Error(msg)) => + stateVar.update(_.copy(hasError = true, isLoading = false)) + client.NotifyComponent.errorMessage(new RuntimeException(msg)) - case Success(_) => - stateVar.update(_.copy(isLoading = false, hasError = true)) + case Success(SummaryEvent.Done) => + stateVar.update(_.copy(isLoading = false)) + cleanup() - case Failure(err) => - stateVar.update(_.copy(isLoading = false, hasError = true)) - handleError(err) - } + case Failure(err) => + stateVar.update(_.copy(hasError = true, isLoading = false)) + cleanup() + handleError(err) + }(unsafeWindowOwner)) def render: Element = resetState() - val initialFetch = fetchSummaryBatch() div( cls := "main-content", - initialFetch --> batchObserver, - onMountBind { ctx => - loadMoreBus.events.flatMapSwitch { _ => - stateVar.update(_.copy(isLoading = true)) - fetchSummaryBatch() - } --> batchObserver - }, + onMountUnmountCallback(mount = _ => startStreaming(0), unmount = _ => cleanup()), + loadMoreBus.events.map(_ => stateVar.now().totalProcessed) --> (offset => + startStreaming(offset) + ), Card( _.slots.header := CardHeader( _.titleText := "AI Summary", diff --git a/scripts/local-docker/docker-compose.yml b/scripts/local-docker/docker-compose.yml index decda0c..5bd2bc7 100644 --- a/scripts/local-docker/docker-compose.yml +++ b/scripts/local-docker/docker-compose.yml @@ -24,7 +24,7 @@ services: - host.docker.internal:host-gateway server: - image: server:2.4.2 + image: server:2.4.3 container_name: rss_server restart: always depends_on: diff --git a/server/src/main/scala/ru/trett/rss/server/codecs/SummaryCodecs.scala b/server/src/main/scala/ru/trett/rss/server/codecs/SummaryCodecs.scala index 192e78e..2bbab72 100644 --- a/server/src/main/scala/ru/trett/rss/server/codecs/SummaryCodecs.scala +++ b/server/src/main/scala/ru/trett/rss/server/codecs/SummaryCodecs.scala @@ -3,7 +3,13 @@ package ru.trett.rss.server.codecs import io.circe.{Decoder, Encoder} import io.circe.generic.semiauto.* import io.circe.syntax.* -import ru.trett.rss.models.{SummaryResult, SummarySuccess, SummaryError, SummaryResponse} +import ru.trett.rss.models.{ + SummaryResult, + SummarySuccess, + SummaryError, + SummaryResponse, + SummaryEvent +} object SummaryCodecs: given Encoder[SummarySuccess] = deriveEncoder @@ -34,3 +40,17 @@ object SummaryCodecs: given Encoder[SummaryResponse] = deriveEncoder given Decoder[SummaryResponse] = deriveDecoder + + import SummaryEvent.* + given Encoder[Content] = deriveEncoder + given Encoder[Metadata] = deriveEncoder + given Encoder[FunFact] = deriveEncoder + given Encoder[Error] = deriveEncoder + + given Encoder[SummaryEvent] = Encoder.instance { + case c: Content => c.asJson.mapObject(_.add("type", "content".asJson)) + case m: Metadata => m.asJson.mapObject(_.add("type", "metadata".asJson)) + case f: FunFact => f.asJson.mapObject(_.add("type", "funFact".asJson)) + case e: Error => e.asJson.mapObject(_.add("type", "error".asJson)) + case Done => io.circe.Json.obj("type" -> "done".asJson) + } diff --git a/server/src/main/scala/ru/trett/rss/server/controllers/SummarizeController.scala b/server/src/main/scala/ru/trett/rss/server/controllers/SummarizeController.scala index d3c3935..3b900fb 100644 --- a/server/src/main/scala/ru/trett/rss/server/controllers/SummarizeController.scala +++ b/server/src/main/scala/ru/trett/rss/server/controllers/SummarizeController.scala @@ -2,8 +2,9 @@ package ru.trett.rss.server.controllers import cats.effect.IO import org.http4s.AuthedRoutes -import org.http4s.circe.CirceEntityEncoder.* +import org.http4s.ServerSentEvent import org.http4s.dsl.io.* +import io.circe.syntax.* import ru.trett.rss.server.models.User import ru.trett.rss.server.services.SummarizeService import ru.trett.rss.server.codecs.SummaryCodecs.given @@ -15,8 +16,8 @@ object SummarizeController: def routes(summarizeService: SummarizeService): AuthedRoutes[User, IO] = AuthedRoutes.of[User, IO] { case GET -> Root / "api" / "summarize" :? OffsetQueryParamMatcher(offset) as user => - for - summary <- summarizeService.getSummary(user, offset.getOrElse(0)) - response <- Ok(summary) - yield response + val stream = summarizeService + .streamSummary(user, offset.getOrElse(0)) + .map(event => ServerSentEvent(data = Some(event.asJson.noSpaces))) + Ok(stream) } diff --git a/server/src/main/scala/ru/trett/rss/server/services/SummarizeService.scala b/server/src/main/scala/ru/trett/rss/server/services/SummarizeService.scala index 05f84d8..8b92029 100644 --- a/server/src/main/scala/ru/trett/rss/server/services/SummarizeService.scala +++ b/server/src/main/scala/ru/trett/rss/server/services/SummarizeService.scala @@ -1,6 +1,7 @@ package ru.trett.rss.server.services import cats.effect.IO +import fs2.Stream import io.circe.Decoder import io.circe.Json import io.circe.generic.auto.* @@ -15,14 +16,7 @@ import org.http4s.client.Client import org.typelevel.ci.* import org.typelevel.log4cats.Logger import org.typelevel.log4cats.LoggerFactory -import ru.trett.rss.models.{ - SummaryLanguage, - SummaryModel, - SummaryResponse, - SummaryResult, - SummarySuccess, - SummaryError -} +import ru.trett.rss.models.{SummaryLanguage, SummaryModel, SummaryEvent} import ru.trett.rss.server.models.User import ru.trett.rss.server.repositories.FeedRepository import org.jsoup.Jsoup @@ -41,15 +35,17 @@ class SummarizeService(feedRepository: FeedRepository, client: Client[IO], apiKe private val logger: Logger[IO] = LoggerFactory[IO].getLogger private val batchSize = 30 - private def getEndpoint(modelId: String): Uri = + private def getEndpoint(modelId: String, stream: Boolean = false): Uri = + val method = if stream then "streamGenerateContent" else "generateContent" Uri.unsafeFromString( - s"https://generativelanguage.googleapis.com/v1beta/models/$modelId:generateContent" + s"https://generativelanguage.googleapis.com/v1beta/models/$modelId:$method" ) private def buildGeminiRequest( modelId: String, prompt: String, - temperature: Option[Double] = None + temperature: Option[Double] = None, + stream: Boolean = false ): IO[Request[IO]] = val baseConfig = Json.obj( "contents" -> Json @@ -79,7 +75,7 @@ class SummarizeService(feedRepository: FeedRepository, client: Client[IO], apiKe IO.pure( Request[IO]( method = Method.POST, - uri = getEndpoint(modelId), + uri = getEndpoint(modelId, stream), headers = Headers( Header.Raw(ci"X-goog-api-key", apiKey), Header.Raw(ci"Content-Type", "application/json") @@ -87,63 +83,56 @@ class SummarizeService(feedRepository: FeedRepository, client: Client[IO], apiKe ).withEntity(config) ) - def getSummary(user: User, offset: Int): IO[SummaryResponse] = + def streamSummary(user: User, offset: Int): Stream[IO, SummaryEvent] = val selectedModel = user.settings.summaryModel .flatMap(SummaryModel.fromString) .getOrElse(SummaryModel.default) - for - totalUnread <- feedRepository.getTotalUnreadCount(user.id) - feeds <- feedRepository.getUnreadFeeds(user, batchSize, offset) - response <- - if feeds.isEmpty && offset == 0 then - // No feeds at all - generate fun fact - generateFunFact(user, selectedModel.modelId).map(funFact => - SummaryResponse( - result = SummarySuccess(""), - hasMore = false, - feedsProcessed = 0, - totalRemaining = 0, - funFact = Some(funFact) + Stream + .eval(feedRepository.getTotalUnreadCount(user.id)) + .flatMap { totalUnread => + Stream.eval(feedRepository.getUnreadFeeds(user, batchSize, offset)).flatMap { + feeds => + val remainingAfterThis = totalUnread - offset - feeds.size + val metadata = SummaryEvent.Metadata( + feedsProcessed = feeds.size, + totalRemaining = Math.max(0, remainingAfterThis), + hasMore = remainingAfterThis > 0 ) - ) - else if feeds.isEmpty then - // No more feeds (reached end of pagination) - IO.pure( - SummaryResponse( - result = SummarySuccess(""), - hasMore = false, - feedsProcessed = 0, - totalRemaining = 0, - funFact = None - ) - ) - else - val text = feeds.map(_.description).mkString("\n") - val strippedText = Jsoup.parse(text).text() - val validatedLanguage = user.settings.summaryLanguage - .flatMap(SummaryLanguage.fromString) - .getOrElse(SummaryLanguage.English) - - for - summaryResult <- summarize( - strippedText, - validatedLanguage.displayName, - selectedModel.modelId + + Stream.emit(metadata) ++ ( + if feeds.isEmpty && offset == 0 then + Stream + .eval(generateFunFact(user, selectedModel.modelId)) + .map(SummaryEvent.FunFact(_)) ++ Stream.emit(SummaryEvent.Done) + else if feeds.isEmpty then Stream.emit(SummaryEvent.Done) + else + val text = feeds.map(_.description).mkString("\n") + val strippedText = Jsoup.parse(text).text() + val validatedLanguage = user.settings.summaryLanguage + .flatMap(SummaryLanguage.fromString) + .getOrElse(SummaryLanguage.English) + + Stream + .eval( + if user.settings.isAiMode then + feedRepository.markFeedAsRead(feeds.map(_.link), user) + else IO.unit + ) + .drain ++ summarizeStream( + strippedText, + validatedLanguage.displayName, + selectedModel.modelId + ) ++ Stream.emit(SummaryEvent.Done) ) - _ <- summaryResult match - case _: SummarySuccess if user.settings.isAiMode => - feedRepository.markFeedAsRead(feeds.map(_.link), user) - case _ => IO.unit - remainingAfterThis = totalUnread - offset - feeds.size - yield SummaryResponse( - result = summaryResult, - hasMore = remainingAfterThis > 0, - feedsProcessed = feeds.size, - totalRemaining = Math.max(0, remainingAfterThis), - funFact = None - ) - yield response + } + } + .handleErrorWith { error => + Stream.eval(logger.error(error)("Error in streamSummary")).drain ++ + Stream.emit( + SummaryEvent.Error("Error generating summary: " + error.getMessage) + ) ++ Stream.emit(SummaryEvent.Done) + } private def generateFunFact(user: User, modelId: String): IO[String] = val validatedLanguage = user.settings.summaryLanguage @@ -185,7 +174,11 @@ class SummarizeService(feedRepository: FeedRepository, client: Client[IO], apiKe } } - private def summarize(text: String, language: String, modelId: String): IO[SummaryResult] = + private def summarizeStream( + text: String, + language: String, + modelId: String + ): Stream[IO, SummaryEvent] = val prompt = s"""You must follow these rules for your response: |1. Provide only the raw text of the code. |2. Do NOT use any markdown formatting. @@ -202,42 +195,47 @@ class SummarizeService(feedRepository: FeedRepository, client: Client[IO], apiKe |13. For each topic, list the key stories with brief summaries. |Now, following these rules exactly summarize the following text. Answer in $language: $text.""".stripMargin - buildGeminiRequest(modelId, prompt).flatMap { request => - client - .run(request) - .use { response => + Stream + .eval(buildGeminiRequest(modelId, prompt, stream = true)) + .flatMap { request => + client.stream(request).flatMap { response => if response.status.isSuccess then - response - .as[GeminiResponse] + response.body + .through(fs2.text.utf8.decode) + .through(io.circe.fs2.stringArrayParser) + .through(io.circe.fs2.decoder[IO, GeminiResponse]) .map { geminiResp => geminiResp.candidates.headOption .flatMap(_.content.parts.flatMap(_.headOption)) .map(_.text) .map { text => if text.startsWith("```html") then - text.stripPrefix("```html").stripSuffix("```").trim - else text.trim - } match - case Some(html) if html.nonEmpty => SummarySuccess(html) - case _ => - SummaryError("Could not extract summary from response.") + text.stripPrefix("```html").stripSuffix("```") + else text + } + .getOrElse("") } + .map(SummaryEvent.Content(_)) else - response.bodyText.compile.string.flatMap { body => - logger.error( - s"Gemini API error: status=${response.status}, body=$body" - ) *> - IO.pure(SummaryError(s"API error: ${response.status.reason}")) - } - } - .handleErrorWith { error => - val errorMessage = error match - case _: TimeoutException => - "Summary request timed out. The AI service is taking too long to respond. Please try again with fewer feeds." - case _ => - "Error communicating with the summary API." - logger.error(error)(s"Error summarizing text: ${error.getMessage}") *> IO.pure( - SummaryError(errorMessage) - ) + Stream + .eval(response.bodyText.compile.string.flatMap { body => + logger.error( + s"Gemini API stream error: status=${response.status}, body=$body" + ) + }) + .drain ++ Stream.emit( + SummaryEvent.Error(s"API error: ${response.status.reason}") + ) } - } + } + .handleErrorWith { error => + val errorMessage = error match + case _: TimeoutException => + "Summary request timed out." + case _ => + "Error communicating with the summary API." + Stream + .eval(logger.error(error)(s"Error summarizing text: ${error.getMessage}")) + .drain ++ + Stream.emit(SummaryEvent.Error(errorMessage)) + } diff --git a/shared/src/main/scala/ru/trett/rss/models/SummaryEvent.scala b/shared/src/main/scala/ru/trett/rss/models/SummaryEvent.scala new file mode 100644 index 0000000..468d587 --- /dev/null +++ b/shared/src/main/scala/ru/trett/rss/models/SummaryEvent.scala @@ -0,0 +1,12 @@ +package ru.trett.rss.models + +sealed trait SummaryEvent + +object SummaryEvent { + case class Content(text: String) extends SummaryEvent + case class Metadata(feedsProcessed: Int, totalRemaining: Int, hasMore: Boolean) + extends SummaryEvent + case class FunFact(text: String) extends SummaryEvent + case class Error(message: String) extends SummaryEvent + case object Done extends SummaryEvent +} From 55e66750282853a673da7b4cc8bdc3b608945922 Mon Sep 17 00:00:00 2001 From: trett Date: Sat, 14 Feb 2026 19:02:20 +0100 Subject: [PATCH 2/8] Refactor SummaryPage using Scala.js and Laminar best practices --- .../src/main/scala/client/SummaryPage.scala | 261 ++++++++++-------- 1 file changed, 140 insertions(+), 121 deletions(-) diff --git a/client/src/main/scala/client/SummaryPage.scala b/client/src/main/scala/client/SummaryPage.scala index 674a805..acea406 100644 --- a/client/src/main/scala/client/SummaryPage.scala +++ b/client/src/main/scala/client/SummaryPage.scala @@ -2,55 +2,32 @@ package client import be.doeraene.webcomponents.ui5.* import be.doeraene.webcomponents.ui5.configkeys.* -import client.NetworkUtils.* import com.raquo.laminar.api.L.* import ru.trett.rss.models.SummaryEvent - -import scala.util.{Failure, Success} +import client.NetworkUtils.unsafeParseToHtmlFragment +import scala.util.{Failure, Success, Try} object SummaryPage: - private val model = AppState.model - - private case class PageState( - summaries: List[String] = List(), - isLoading: Boolean = true, + private case class State( + summaries: List[String] = List.empty, + isLoading: Boolean = false, totalProcessed: Int = 0, hasMore: Boolean = false, funFact: Option[String] = None, - hasError: Boolean = false + error: Option[String] = None ) - private val stateVar: Var[PageState] = Var(PageState()) - private val stateSignal = stateVar.signal - private val loadMoreBus: EventBus[Unit] = new EventBus - - private var currentSubscription: Option[Subscription] = None - private var currentClose: Option[() => Unit] = None + def render: HtmlElement = + val stateVar = Var(State(isLoading = true)) + val loadMoreBus = new EventBus[Int] + var currentClose: Option[() => Unit] = None - private def resetState(): Unit = stateVar.set(PageState()) + def cleanup(): Unit = + currentClose.foreach(_()) + currentClose = None - private def cleanup(): Unit = - currentSubscription.foreach(_.kill()) - currentClose.foreach(_()) - currentSubscription = None - currentClose = None - - private def startStreaming(offset: Int): Unit = - cleanup() - - stateVar.update(s => - s.copy( - isLoading = true, - hasError = false, - summaries = if offset > 0 then s.summaries :+ "" else s.summaries - ) - ) - - val (stream, close) = NetworkUtils.streamSummary(s"/api/summarize?offset=$offset") - currentClose = Some(close) - - currentSubscription = Some(stream.foreach { + val streamObserver: Observer[Try[SummaryEvent]] = Observer { case Success(SummaryEvent.Content(text)) => stateVar.update(s => val newSummaries = @@ -69,7 +46,7 @@ object SummaryPage: stateVar.update(_.copy(funFact = Some(text), isLoading = false)) case Success(SummaryEvent.Error(msg)) => - stateVar.update(_.copy(hasError = true, isLoading = false)) + stateVar.update(_.copy(error = Some(msg), isLoading = false)) client.NotifyComponent.errorMessage(new RuntimeException(msg)) case Success(SummaryEvent.Done) => @@ -77,18 +54,34 @@ object SummaryPage: cleanup() case Failure(err) => - stateVar.update(_.copy(hasError = true, isLoading = false)) + stateVar.update(_.copy(error = Some(err.getMessage), isLoading = false)) cleanup() - handleError(err) - }(unsafeWindowOwner)) + NetworkUtils.handleError(err) + } + + def startStreaming(offset: Int)(using owner: Owner): Unit = + cleanup() + stateVar.update(s => + s.copy( + isLoading = true, + error = None, + // If loading more, append a placeholder for the new summary to avoid overwriting the last one + summaries = if offset > 0 then s.summaries :+ "" else s.summaries + ) + ) + + val (stream, close) = NetworkUtils.streamSummary(s"/api/summarize?offset=$offset") + currentClose = Some(close) + stream.addObserver(streamObserver)(owner) - def render: Element = - resetState() div( cls := "main-content", - onMountUnmountCallback(mount = _ => startStreaming(0), unmount = _ => cleanup()), - loadMoreBus.events.map(_ => stateVar.now().totalProcessed) --> (offset => - startStreaming(offset) + onMountUnmountCallback( + mount = ctx => + loadMoreBus.events + .startWith(0) + .foreach(offset => startStreaming(offset)(using ctx.owner))(ctx.owner), + unmount = _ => cleanup() ), Card( _.slots.header := CardHeader( @@ -101,65 +94,27 @@ object SummaryPage: fontSize.px := 15, color := "var(--sapContent_LabelColor)", lineHeight := "1.5", - child <-- stateSignal.map { state => + // Empty / Loading State + child <-- stateVar.signal.map { state => if state.isLoading && state.summaries.isEmpty then - div( - display.flex, - flexDirection.column, - alignItems.center, - justifyContent.center, - padding.px := 60, - BusyIndicator(_.active := true, _.size := BusyIndicatorSize.L), - p( - marginTop.px := 20, - color := "var(--sapContent_LabelColor)", - fontSize := "var(--sapFontSize)", - "Brewing your news digest..." - ) - ) + renderLoading("Brewing your news digest...") else emptyNode }, - child <-- stateSignal.map { state => - state.funFact match - case Some(fact) if fact.nonEmpty => - div( - padding.px := 40, - textAlign.center, - Title(_.level := TitleLevel.H3, "All caught up!"), - p( - marginTop.px := 10, - marginBottom.px := 20, - color := "var(--sapContent_LabelColor)", - "You have no unread feeds." - ), - div( - marginTop.px := 20, - padding.px := 20, - backgroundColor := "var(--sapBackgroundColor)", - borderRadius.px := 8, - border := "1px solid var(--sapContent_ForegroundBorderColor)", - Title(_.level := TitleLevel.H5, "Did you know?"), - p(marginTop.px := 10, fact) - ) - ) - case _ => emptyNode + // Fun Fact / Done State + child <-- stateVar.signal.map(_.funFact).map { + case Some(fact) => renderFunFact(fact) + case None => emptyNode }, - div(children <-- stateSignal.map { state => - state.summaries.zipWithIndex.map { case (html, index) => - div( - unsafeParseToHtmlFragment(html), - if index < state.summaries.length - 1 then - hr( - marginTop.px := 20, - marginBottom.px := 20, - border := "none", - borderTop := "1px solid var(--sapContent_ForegroundBorderColor)" - ) - else emptyNode + // Summaries List + div( + children <-- stateVar.signal + .map(_.summaries) + .splitByIndex((index, _, textSignal) => + renderSummaryItem(index, textSignal, stateVar.signal) ) - } - }), - child <-- stateSignal.map { state => + ), + // Loading More Indicator + child <-- stateVar.signal.map { state => if state.isLoading && state.summaries.nonEmpty then div( display.flex, @@ -167,35 +122,99 @@ object SummaryPage: justifyContent.center, padding.px := 20, gap.px := 10, - BusyIndicator(_.active := true, _.size := BusyIndicatorSize.S), + BusyIndicator( + _.active := true, + _.size := BusyIndicatorSize.S + ), span("Loading more stories...") ) else emptyNode }, - child <-- stateSignal.map { state => + // Load More Button + child <-- stateVar.signal.map { state => if !state.isLoading && state.summaries.nonEmpty && state.funFact.isEmpty then - div( - paddingTop.px := 20, - display.flex, - flexDirection.column, - alignItems.center, - gap.px := 16, - Text( - s"${state.totalProcessed} feeds summarized", - color := "var(--sapContent_LabelColor)" - ), - if state.hasMore && !state.hasError then - Button( - _.design := ButtonDesign.Emphasized, - _.icon := IconName.download, - "Load more news", - _.events.onClick.mapTo(()) --> loadMoreBus.writer - ) - else emptyNode - ) + renderLoadMore(state, loadMoreBus) else emptyNode } ) ) ) + + private def renderLoading(text: String): HtmlElement = + div( + display.flex, + flexDirection.column, + alignItems.center, + justifyContent.center, + padding.px := 60, + BusyIndicator(_.active := true, _.size := BusyIndicatorSize.L), + p( + marginTop.px := 20, + color := "var(--sapContent_LabelColor)", + fontSize := "var(--sapFontSize)", + text + ) + ) + + private def renderFunFact(fact: String): HtmlElement = + div( + padding.px := 40, + textAlign.center, + Title(_.level := TitleLevel.H3, "All caught up!"), + p( + marginTop.px := 10, + marginBottom.px := 20, + color := "var(--sapContent_LabelColor)", + "You have no unread feeds." + ), + div( + marginTop.px := 20, + padding.px := 20, + backgroundColor := "var(--sapBackgroundColor)", + borderRadius.px := 8, + border := "1px solid var(--sapContent_ForegroundBorderColor)", + Title(_.level := TitleLevel.H5, "Did you know?"), + p(marginTop.px := 10, fact) + ) + ) + + private def renderSummaryItem( + index: Int, + textSignal: Signal[String], + stateSignal: Signal[State] + ): HtmlElement = + div( + child <-- textSignal.map(unsafeParseToHtmlFragment), + child <-- stateSignal.map { state => + if index < state.summaries.length - 1 then + hr( + marginTop.px := 20, + marginBottom.px := 20, + border := "none", + borderTop := "1px solid var(--sapContent_ForegroundBorderColor)" + ) + else emptyNode + } + ) + + private def renderLoadMore(state: State, bus: EventBus[Int]): HtmlElement = + div( + paddingTop.px := 20, + display.flex, + flexDirection.column, + alignItems.center, + gap.px := 16, + Text( + s"${state.totalProcessed} feeds summarized", + color := "var(--sapContent_LabelColor)" + ), + if state.hasMore && state.error.isEmpty then + Button( + _.design := ButtonDesign.Emphasized, + _.icon := IconName.download, + "Load more news", + _.events.onClick.mapTo(state.totalProcessed) --> bus.writer + ) + else emptyNode + ) From 8ae64572f4f66b06f44a5be9de4070f917443b24 Mon Sep 17 00:00:00 2001 From: trett Date: Sat, 14 Feb 2026 19:06:04 +0100 Subject: [PATCH 3/8] Fix robust HTML stripping for streaming response --- .../ru/trett/rss/server/services/SummarizeService.scala | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/server/src/main/scala/ru/trett/rss/server/services/SummarizeService.scala b/server/src/main/scala/ru/trett/rss/server/services/SummarizeService.scala index 8b92029..348a164 100644 --- a/server/src/main/scala/ru/trett/rss/server/services/SummarizeService.scala +++ b/server/src/main/scala/ru/trett/rss/server/services/SummarizeService.scala @@ -207,12 +207,7 @@ class SummarizeService(feedRepository: FeedRepository, client: Client[IO], apiKe .map { geminiResp => geminiResp.candidates.headOption .flatMap(_.content.parts.flatMap(_.headOption)) - .map(_.text) - .map { text => - if text.startsWith("```html") then - text.stripPrefix("```html").stripSuffix("```") - else text - } + .map(_.text.replace("```html", "").replace("```", "")) .getOrElse("") } .map(SummaryEvent.Content(_)) From d744a78c08d4ce70553e9281c56bc4462fde1fce Mon Sep 17 00:00:00 2001 From: trett Date: Sat, 14 Feb 2026 19:09:39 +0100 Subject: [PATCH 4/8] Make summarizeStream self-contained by ensuring Done event on all paths --- .../ru/trett/rss/server/services/SummarizeService.scala | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/server/src/main/scala/ru/trett/rss/server/services/SummarizeService.scala b/server/src/main/scala/ru/trett/rss/server/services/SummarizeService.scala index 348a164..2322a8b 100644 --- a/server/src/main/scala/ru/trett/rss/server/services/SummarizeService.scala +++ b/server/src/main/scala/ru/trett/rss/server/services/SummarizeService.scala @@ -123,7 +123,7 @@ class SummarizeService(feedRepository: FeedRepository, client: Client[IO], apiKe strippedText, validatedLanguage.displayName, selectedModel.modelId - ) ++ Stream.emit(SummaryEvent.Done) + ) ) } } @@ -233,4 +233,5 @@ class SummarizeService(feedRepository: FeedRepository, client: Client[IO], apiKe .eval(logger.error(error)(s"Error summarizing text: ${error.getMessage}")) .drain ++ Stream.emit(SummaryEvent.Error(errorMessage)) - } + } ++ Stream.emit(SummaryEvent.Done) + From 79e628cda1fe24e8bc5464d1a2efcb9db27e54c3 Mon Sep 17 00:00:00 2001 From: trett Date: Sat, 14 Feb 2026 19:40:24 +0100 Subject: [PATCH 5/8] fix: Improve markdown stripping and filter empty chunks in SummarizeService --- .../trett/rss/server/services/SummarizeService.scala | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/server/src/main/scala/ru/trett/rss/server/services/SummarizeService.scala b/server/src/main/scala/ru/trett/rss/server/services/SummarizeService.scala index 2322a8b..039a236 100644 --- a/server/src/main/scala/ru/trett/rss/server/services/SummarizeService.scala +++ b/server/src/main/scala/ru/trett/rss/server/services/SummarizeService.scala @@ -207,9 +207,17 @@ class SummarizeService(feedRepository: FeedRepository, client: Client[IO], apiKe .map { geminiResp => geminiResp.candidates.headOption .flatMap(_.content.parts.flatMap(_.headOption)) - .map(_.text.replace("```html", "").replace("```", "")) + .map(_.text) .getOrElse("") } + .map { text => + if (text.startsWith("```html")) { + text.stripPrefix("```html").stripSuffix("```").trim + } else { + text.trim + } + } + .filter(_.nonEmpty) .map(SummaryEvent.Content(_)) else Stream From 925dcfd960b321942435291dcd089c0c10204d3a Mon Sep 17 00:00:00 2001 From: trett Date: Sat, 14 Feb 2026 21:06:12 +0100 Subject: [PATCH 6/8] fix(client): ensure SummaryPage sends request and properly cleans up EventSource - Replace onMountCallback with EventStream.merge to guarantee initial request in SummaryPage - Use ListBuffer in NetworkUtils to properly track and close EventSource connections - Update AGENTS.md to enforce scalafix/scalafmt usage --- AGENTS.md | 2 +- .../src/main/scala/client/NetworkUtils.scala | 40 +++++++++------ .../src/main/scala/client/SummaryPage.scala | 50 ++++++------------- 3 files changed, 43 insertions(+), 49 deletions(-) diff --git a/AGENTS.md b/AGENTS.md index 5ecd919..366b6f6 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -55,7 +55,7 @@ The project is a multi-module sbt project with the following key directories: - **Testing**: Tests are run using `sbt test`. - **Running Locally**: The `scripts` directory contains `docker-compose.yml` files to run the application stack (backend, frontend, database) locally. - **CI/CD**: GitHub Actions are configured to build and test the application on push and pull requests. -- **Code style**: Run `scalafixAll` and `scalafmtAll` every time before make a commit to make the style consistently. +- **Code style**: You MUST run `sbt scalafixAll` and `sbt scalafmtAll` before every commit and before creating a Pull Request to ensure code style consistency. ## Repository diff --git a/client/src/main/scala/client/NetworkUtils.scala b/client/src/main/scala/client/NetworkUtils.scala index 66935d3..508b26b 100644 --- a/client/src/main/scala/client/NetworkUtils.scala +++ b/client/src/main/scala/client/NetworkUtils.scala @@ -17,6 +17,8 @@ import scala.util.Try import ru.trett.rss.models.UserSettings import ru.trett.rss.models.SummaryEvent +import scala.collection.mutable.ListBuffer + object NetworkUtils { val JSON_ACCEPT: (String, String) = "Accept" -> "application/json" @@ -70,18 +72,28 @@ object NetworkUtils { def logout(): EventStream[Unit] = FetchStream.post("/api/logout", _.body("")).mapTo(()) - def streamSummary(url: String): (EventStream[Try[SummaryEvent]], () => Unit) = - val bus = new EventBus[Try[SummaryEvent]] - val source = new dom.EventSource(url) - - source.onmessage = msg => - decode[SummaryEvent](msg.data.toString) match - case Right(event) => bus.emit(Success(event)) - case Left(err) => bus.emit(Failure(err)) - - source.onerror = _ => - bus.emit(Failure(new RuntimeException("Stream error"))) - source.close() - - (bus.events, () => source.close()) + def streamSummary(url: String): EventStream[Try[SummaryEvent]] = + val source = ListBuffer.empty[dom.EventSource] + EventStream.fromCustomSource[Try[SummaryEvent]]( + shouldStart = _ => true, + start = (fireValue, fireError, getStartIndex, getIsStarted) => { + val s = new dom.EventSource(url) + source += s + + s.onmessage = msg => + if getIsStarted() then + decode[SummaryEvent](msg.data.toString) match + case Right(SummaryEvent.Done) => + fireValue(Success(SummaryEvent.Done)) + s.close() + case Right(event) => fireValue(Success(event)) + case Left(err) => fireValue(Failure(err)) + + s.onerror = _ => + if getIsStarted() then + fireValue(Failure(new RuntimeException("Stream error"))) + s.close() + }, + stop = _ => source.foreach(_.close()) + ) } diff --git a/client/src/main/scala/client/SummaryPage.scala b/client/src/main/scala/client/SummaryPage.scala index acea406..ab4a408 100644 --- a/client/src/main/scala/client/SummaryPage.scala +++ b/client/src/main/scala/client/SummaryPage.scala @@ -21,11 +21,6 @@ object SummaryPage: def render: HtmlElement = val stateVar = Var(State(isLoading = true)) val loadMoreBus = new EventBus[Int] - var currentClose: Option[() => Unit] = None - - def cleanup(): Unit = - currentClose.foreach(_()) - currentClose = None val streamObserver: Observer[Try[SummaryEvent]] = Observer { case Success(SummaryEvent.Content(text)) => @@ -51,38 +46,29 @@ object SummaryPage: case Success(SummaryEvent.Done) => stateVar.update(_.copy(isLoading = false)) - cleanup() case Failure(err) => stateVar.update(_.copy(error = Some(err.getMessage), isLoading = false)) - cleanup() NetworkUtils.handleError(err) } - def startStreaming(offset: Int)(using owner: Owner): Unit = - cleanup() - stateVar.update(s => - s.copy( - isLoading = true, - error = None, - // If loading more, append a placeholder for the new summary to avoid overwriting the last one - summaries = if offset > 0 then s.summaries :+ "" else s.summaries - ) - ) - - val (stream, close) = NetworkUtils.streamSummary(s"/api/summarize?offset=$offset") - currentClose = Some(close) - stream.addObserver(streamObserver)(owner) + val eventsStream = + EventStream.merge(EventStream.fromValue(0), loadMoreBus.events).flatMapSwitch { + offset => + stateVar.update(s => + s.copy( + isLoading = true, + error = None, + // If loading more, append a placeholder for the new summary to avoid overwriting the last one + summaries = if offset > 0 then s.summaries :+ "" else s.summaries + ) + ) + NetworkUtils.streamSummary(s"/api/summarize?offset=$offset") + } div( cls := "main-content", - onMountUnmountCallback( - mount = ctx => - loadMoreBus.events - .startWith(0) - .foreach(offset => startStreaming(offset)(using ctx.owner))(ctx.owner), - unmount = _ => cleanup() - ), + eventsStream --> streamObserver, Card( _.slots.header := CardHeader( _.titleText := "AI Summary", @@ -122,10 +108,7 @@ object SummaryPage: justifyContent.center, padding.px := 20, gap.px := 10, - BusyIndicator( - _.active := true, - _.size := BusyIndicatorSize.S - ), + BusyIndicator(_.active := true, _.size := BusyIndicatorSize.S), span("Loading more stories...") ) else emptyNode @@ -133,8 +116,7 @@ object SummaryPage: // Load More Button child <-- stateVar.signal.map { state => if !state.isLoading && state.summaries.nonEmpty && state.funFact.isEmpty - then - renderLoadMore(state, loadMoreBus) + then renderLoadMore(state, loadMoreBus) else emptyNode } ) From 13c48e4c9f6238fba0d0c317c23087546059c067 Mon Sep 17 00:00:00 2001 From: trett Date: Sat, 14 Feb 2026 21:06:24 +0100 Subject: [PATCH 7/8] style: apply scalafmt formatting to SummarizeService --- .../scala/ru/trett/rss/server/services/SummarizeService.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/server/src/main/scala/ru/trett/rss/server/services/SummarizeService.scala b/server/src/main/scala/ru/trett/rss/server/services/SummarizeService.scala index 039a236..1b1ab1e 100644 --- a/server/src/main/scala/ru/trett/rss/server/services/SummarizeService.scala +++ b/server/src/main/scala/ru/trett/rss/server/services/SummarizeService.scala @@ -242,4 +242,3 @@ class SummarizeService(feedRepository: FeedRepository, client: Client[IO], apiKe .drain ++ Stream.emit(SummaryEvent.Error(errorMessage)) } ++ Stream.emit(SummaryEvent.Done) - From 8b5a35acf30a1ce79e665f17f8c4a57e71c74f16 Mon Sep 17 00:00:00 2001 From: trett Date: Sun, 15 Feb 2026 12:57:04 +0100 Subject: [PATCH 8/8] clear source --- client/src/main/scala/client/NetworkUtils.scala | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/client/src/main/scala/client/NetworkUtils.scala b/client/src/main/scala/client/NetworkUtils.scala index 508b26b..188d30e 100644 --- a/client/src/main/scala/client/NetworkUtils.scala +++ b/client/src/main/scala/client/NetworkUtils.scala @@ -94,6 +94,8 @@ object NetworkUtils { fireValue(Failure(new RuntimeException("Stream error"))) s.close() }, - stop = _ => source.foreach(_.close()) + stop = _ => + source.foreach(_.close()) + source.clear() ) }