Skip to content

Streaming

liter-llm supports streaming responses from all providers that offer it. Tokens are delivered to your application as they are generated, reducing time-to-first-token and enabling real-time UIs.

How It Works

Most providers stream via Server-Sent Events (SSE) -- the HTTP response body is a series of data: lines, each containing a JSON chunk. AWS Bedrock uses its own EventStream binary protocol. liter-llm handles both transparently behind the same chat_stream API.

sequenceDiagram
    participant App
    participant Client as liter-llm
    participant API as Provider API

    App->>Client: chat_stream(request)
    Client->>API: POST (stream: true)
    loop Each token
        API-->>Client: SSE data chunk
        Client-->>App: ChatCompletionChunk
    end
    API-->>Client: [DONE]
    Client-->>App: stream ends

Chunk Structure

Each streamed chunk contains a delta -- the incremental text content for that token. The chunk also includes metadata like the model name and finish reason (on the final chunk).

Key fields:

Field Description
choices[].delta.content The incremental text content (may be null on the first/last chunk)
choices[].finish_reason null during streaming, "stop" on the final chunk
model The model that generated this chunk
id The completion ID (same across all chunks in one response)

Streaming Examples

import asyncio
import os
from liter_llm import LlmClient

async def main() -> None:
    client = LlmClient(api_key=os.environ["OPENAI_API_KEY"])
    async for chunk in await client.chat_stream(
        model="openai/gpt-4o",
        messages=[{"role": "user", "content": "Tell me a story"}],
    ):
        if chunk.choices and chunk.choices[0].delta.content:
            print(chunk.choices[0].delta.content, end="", flush=True)
    print()

asyncio.run(main())
import { LlmClient } from "@kreuzberg/liter-llm";

const client = new LlmClient({ apiKey: process.env.OPENAI_API_KEY! });
const chunks = await client.chatStream({
  model: "openai/gpt-4o",
  messages: [{ role: "user", content: "Tell me a story" }],
});

for (const chunk of chunks) {
  process.stdout.write(chunk.choices[0]?.delta?.content ?? "");
}
console.log();
package main

import (
 "context"
 "fmt"
 "os"

 llm "github.com/kreuzberg-dev/liter-llm/packages/go"
)

func main() {
 client := llm.NewClient(llm.WithAPIKey(os.Getenv("OPENAI_API_KEY")))
 err := client.ChatStream(
  context.Background(),
  &llm.ChatCompletionRequest{
   Model: "openai/gpt-4o",
   Messages: []llm.Message{
    llm.NewTextMessage(llm.RoleUser, "Tell me a story"),
   },
  },
  func(chunk *llm.ChatCompletionChunk) error {
   if len(chunk.Choices) > 0 && chunk.Choices[0].Delta.Content != nil {
    fmt.Print(*chunk.Choices[0].Delta.Content)
   }
   return nil
  },
 )
 if err != nil {
  panic(err)
 }
 fmt.Println()
}
# frozen_string_literal: true

require "liter_llm"
require "json"

# Note: The Ruby client does not yet support streaming.
# Use the non-streaming chat method instead.
client = LiterLlm::LlmClient.new(ENV.fetch("OPENAI_API_KEY"), {})

response = JSON.parse(client.chat(JSON.generate(
  model: "openai/gpt-4o",
  messages: [{ role: "user", content: "Tell me a story" }]
)))

puts response.dig("choices", 0, "message", "content")
import dev.kreuzberg.literllm.LlmClient;
import dev.kreuzberg.literllm.Types.*;
import java.util.List;

public class Main {
    public static void main(String[] args) throws Exception {
        // Note: The Java client does not yet support streaming.
        // Use the non-streaming chat method instead.
        try (var client = LlmClient.builder()
                .apiKey(System.getenv("OPENAI_API_KEY"))
                .build()) {
            var response = client.chat(new ChatCompletionRequest(
                "openai/gpt-4o",
                List.of(new UserMessage("Tell me a story"))
            ));
            System.out.println(response.choices().getFirst().message().content());
        }
    }
}
using LiterLlm;

// Note: The C# client does not yet support streaming.
// Use the non-streaming ChatAsync method instead.
await using var client = new LlmClient(
    apiKey: Environment.GetEnvironmentVariable("OPENAI_API_KEY")!);

var response = await client.ChatAsync(new ChatCompletionRequest(
    Model: "openai/gpt-4o",
    Messages: [new UserMessage("Tell me a story")]
));
Console.WriteLine(response.Choices[0].Message.Content);
# Note: The Elixir client does not yet support streaming.
# Use the non-streaming chat function instead.
{:ok, response} =
  LiterLlm.chat(
    %{
      model: "openai/gpt-4o",
      messages: [%{role: "user", content: "Tell me a story"}]
    },
    api_key: System.fetch_env!("OPENAI_API_KEY")
  )

IO.puts(hd(response["choices"])["message"]["content"])
import init, { LlmClient } from "@kreuzberg/liter-llm-wasm";

await init();

// Note: chatStream is not yet supported in the WASM binding.
// Use the non-streaming chat method instead.
const client = new LlmClient({ apiKey: "sk-..." });
const response = await client.chat({
  model: "openai/gpt-4o",
  messages: [{ role: "user", content: "Tell me a story" }],
});

console.log(response.choices[0].message.content);

Error Handling in Streams

Errors can occur at two points:

  1. Connection errors -- raised when calling chat_stream() (e.g. auth failure, network timeout). These are thrown/raised immediately before any chunks are yielded.
  2. Mid-stream errors -- raised during iteration if the provider closes the connection unexpectedly or sends malformed data. These surface as exceptions/errors from the stream iterator.

Always handle both error points

Wrap both the chat_stream() call and the iteration loop in error handling. A successful connection does not guarantee a complete response.

Stream Cancellation

Closing or dropping the stream iterator cancels the underlying HTTP connection. In Python, exiting the async for loop early is sufficient. In Go, cancelling the context.Context passed to ChatStream stops the stream. In TypeScript, the stream is fully consumed before the Promise resolves (buffer-based).

Async Bridging

The Rust core produces a BoxStream<ChatCompletionChunk> -- a futures::Stream of chunks. Each binding translates this to the host language's native async iteration:

Language Async iteration pattern
Python async for chunk in stream
TypeScript for (const chunk of await client.chatStream(req))
Go client.ChatStream(ctx, req, func(chunk) error { ... })
Ruby stream { \|chunk\| ... } (block)
Java Callback: (chunk) -> ...
C# await foreach (var chunk in stream)
Elixir Stream.each(stream, fn chunk -> ... end)