Stream Pagination with SSE
@spotify-effect/core provides stream pagination utilities that integrate seamlessly with Effect’s Stream API. Combined with Server-Sent Events (SSE), you can deliver paginated data to clients incrementally—tracks and statistics render as each page arrives from Spotify.
Why Use Effect Streams?
Effect streams offer several advantages for streaming paginated data:
- Lazy evaluation — pages are fetched on-demand, not buffered upfront
- Compositional operators — transform streams with
map,filter,take,grouped, and more - Single-pass processing — aggregate statistics while streaming without storing all items in memory
- Resource safety — streams integrate with Effect’s fiber-based concurrency and error handling
- Type safety — full TypeScript support with proper type inference
Traditional buffered approaches require fetching all pages before sending any data. With streams, you can emit partial results immediately—ideal for live dashboards, progress indicators, and large datasets.
How Stream Pagination Works
The paginateStream function creates a lazy stream that fetches pages on demand:
import * as Effect from "effect";import * as Stream from "effect/Stream";import { Library, paginateStream } from "@spotify-effect/core";
const program = Effect.gen(function* () { const library = yield* Library;
// Lazy stream — fetches pages as consumed const trackStream = paginateStream( (offset, limit) => library.getSavedTracks({ offset, limit }), 50, );
// Process tracks as they arrive yield* Stream.runForEach(trackStream, (track) => Effect.sync(() => console.log(track.track.name)), );});The stream terminates automatically when Spotify returns an empty page or no next cursor.
Streaming with SSE
SSE provides a simple mechanism to push stream data to clients over HTTP. The server streams Effect output through a ReadableStream while the client consumes events incrementally.
Server Implementation
import * as Effect from "effect";import * as Stream from "effect/Stream";import { Library, paginateStream } from "@spotify-effect/core";
const SPOTIFY_MAX_PAGE_SIZE = 50;
export const POST: RequestHandler = async ({ request }) => { const body = await request.json(); const accessToken = body.accessToken; const maxTracks = body.maxTracks ?? 200;
const layer = makeAccessTokenLayer(accessToken);
const readable = new ReadableStream({ start(controller) { const streamEffect = Effect.gen(function* () { const library = yield* Library; let stream = paginateStream( (offset, limit) => library.getSavedTracks({ offset, limit }), SPOTIFY_MAX_PAGE_SIZE, );
// Cap total items if needed if (maxTracks !== "unlimited") { stream = Stream.take(stream, maxTracks); }
// Re-group individual items into pages const grouped = Stream.grouped(stream, SPOTIFY_MAX_PAGE_SIZE);
// Accumulate stats while streaming const withStats = Stream.mapAccum( grouped, () => ({ artists: new Map(), trackCount: 0, page: 0 }), (acc, batch) => { for (const saved of batch) { const artist = saved.track.artists[0]?.name ?? "Unknown"; acc.artists.set(artist, (acc.artists.get(artist) ?? 0) + 1); acc.trackCount++; } acc.page++; return [acc, [{ batch, stats: acc }]]; }, );
// Emit SSE events for each batch yield* Stream.runForEach(withStats, ({ batch, stats }) => Effect.sync(() => { controller.enqueue(formatSSE("tracks", batch)); controller.enqueue(formatSSE("stats", snapshotTopArtists(stats.artists))); }), ); }).pipe(Effect.provide(layer));
// Handle completion and errors Effect.matchEffect(streamEffect, { onFailure: (error) => Effect.sync(() => { controller.enqueue(formatSSE("error", { message: error.message })); controller.close(); }), onSuccess: () => Effect.sync(() => { controller.enqueue(formatSSE("done", {})); controller.close(); }), }); }, });
return new Response(readable, { headers: { "Content-Type": "text/event-stream", "Cache-Control": "no-cache", Connection: "keep-alive", }, });};SSE Event Protocol
event: tracksdata: {"tracks":[{name, artist, album},...], "page": 1}
event: statsdata: {"topArtists":[{name, count},...], "totalTracksProcessed": 50}
event: donedata: {}
event: errordata: {"message": "..."}Each Spotify API page produces a tracks event with the batch data and a stats event with running aggregation. done fires when the stream completes.
Client Implementation
<script lang="ts"> let tracks = $state([]); let topArtists = $state([]); let totalTracksProcessed = $state(0); let isStreaming = $state(false); let isDone = $state(false); let abortController = $state(null);
async function startStream() { const response = await fetch('/api/stream-pagination', { method: 'POST', body: JSON.stringify({ accessToken, maxTracks }), });
const reader = response.body.pipeThrough(new TextDecoderStream()).getReader(); let buffer = '';
while (true) { const { done, value } = await reader.read(); if (done) break; buffer += value;
const { events, remainder } = parseSSE(buffer); buffer = remainder;
for (const event of events) { if (event.type === 'tracks') { tracks = [...tracks, ...event.data.tracks]; } else if (event.type === 'stats') { topArtists = event.data.topArtists; totalTracksProcessed = event.data.totalTracksProcessed; } else if (event.type === 'done') { isDone = true; } } } }
function stopStream() { abortController?.abort(); }</script>
<button onclick={isStreaming ? stopStream : startStream}> {isStreaming ? 'stop' : 'stream tracks'}</button>
{#if tracks.length > 0} <div>Loaded {tracks.length} tracks</div>{/if}
{#if topArtists.length > 0} <div>Top: {topArtists[0]?.name}</div>{/if}Key Operators Used
| Operator | Purpose |
|---|---|
paginateStream | Lazily fetch paginated Spotify data |
Stream.take | Cap total items (e.g., limit to 200 tracks) |
Stream.grouped | Re-chunk items into page-sized batches |
Stream.mapAccum | Carry running state while emitting transformed output |
Stream.runForEach | Side-effect each emission (e.g., enqueue SSE) |
Practical Example
A real-time dashboard showing library stats:
const withStats = Stream.mapAccum( Stream.grouped(paginateStream(fetch, 50), 50), () => ({ artists: new Map(), trackCount: 0 }), (acc, batch) => { for (const track of batch) { const artist = track.track.artists[0]?.name; acc.artists.set(artist, (acc.artists.get(artist) ?? 0) + 1); } return [acc, acc]; },);This processes each page once, accumulating artist counts, then emits the updated stats—perfect for SSE where clients see live updates after each fetch.
Next steps
- Pagination Guide — fetch all items with
paginateAll - Error Handling — handle stream errors gracefully
- Effect Stream API — learn more about Effect streams
- SvelteKit Example — live demo in the example app