Skip to content

Streaming Guide

Streaming delivers tokens incrementally as they are generated, rather than waiting for the full response. Use it for real-time UIs, long responses, or when time-to-first-token matters.

When to Use Streaming

Scenario Recommendation
Chat UI showing tokens as they arrive Stream
Background batch processing Non-streaming
Long-form content generation Stream
Short answers (classification, yes/no) Non-streaming
Need usage data immediately Non-streaming (some providers omit usage in streams)

Basic Streaming

import asyncio
import os
from liter_llm import LlmClient

async def main() -> None:
    client = LlmClient(api_key=os.environ["OPENAI_API_KEY"])
    async for chunk in await client.chat_stream(
        model="openai/gpt-4o",
        messages=[{"role": "user", "content": "Tell me a story"}],
    ):
        if chunk.choices and chunk.choices[0].delta.content:
            print(chunk.choices[0].delta.content, end="", flush=True)
    print()

asyncio.run(main())
import { LlmClient } from "@kreuzberg/liter-llm";

const client = new LlmClient({ apiKey: process.env.OPENAI_API_KEY! });
const chunks = await client.chatStream({
  model: "openai/gpt-4o",
  messages: [{ role: "user", content: "Tell me a story" }],
});

for (const chunk of chunks) {
  process.stdout.write(chunk.choices[0]?.delta?.content ?? "");
}
console.log();
package main

import (
 "context"
 "fmt"
 "os"

 llm "github.com/kreuzberg-dev/liter-llm/packages/go"
)

func main() {
 client := llm.NewClient(llm.WithAPIKey(os.Getenv("OPENAI_API_KEY")))
 err := client.ChatStream(
  context.Background(),
  &llm.ChatCompletionRequest{
   Model: "openai/gpt-4o",
   Messages: []llm.Message{
    llm.NewTextMessage(llm.RoleUser, "Tell me a story"),
   },
  },
  func(chunk *llm.ChatCompletionChunk) error {
   if len(chunk.Choices) > 0 && chunk.Choices[0].Delta.Content != nil {
    fmt.Print(*chunk.Choices[0].Delta.Content)
   }
   return nil
  },
 )
 if err != nil {
  panic(err)
 }
 fmt.Println()
}
# frozen_string_literal: true

require "liter_llm"
require "json"

# Note: The Ruby client does not yet support streaming.
# Use the non-streaming chat method instead.
client = LiterLlm::LlmClient.new(ENV.fetch("OPENAI_API_KEY"), {})

response = JSON.parse(client.chat(JSON.generate(
  model: "openai/gpt-4o",
  messages: [{ role: "user", content: "Tell me a story" }]
)))

puts response.dig("choices", 0, "message", "content")
import dev.kreuzberg.literllm.LlmClient;
import dev.kreuzberg.literllm.Types.*;
import java.util.List;

public class Main {
    public static void main(String[] args) throws Exception {
        // Note: The Java client does not yet support streaming.
        // Use the non-streaming chat method instead.
        try (var client = LlmClient.builder()
                .apiKey(System.getenv("OPENAI_API_KEY"))
                .build()) {
            var response = client.chat(new ChatCompletionRequest(
                "openai/gpt-4o",
                List.of(new UserMessage("Tell me a story"))
            ));
            System.out.println(response.choices().getFirst().message().content());
        }
    }
}
using LiterLlm;

// Note: The C# client does not yet support streaming.
// Use the non-streaming ChatAsync method instead.
await using var client = new LlmClient(
    apiKey: Environment.GetEnvironmentVariable("OPENAI_API_KEY")!);

var response = await client.ChatAsync(new ChatCompletionRequest(
    Model: "openai/gpt-4o",
    Messages: [new UserMessage("Tell me a story")]
));
Console.WriteLine(response.Choices[0].Message.Content);
# Note: The Elixir client does not yet support streaming.
# Use the non-streaming chat function instead.
{:ok, response} =
  LiterLlm.chat(
    %{
      model: "openai/gpt-4o",
      messages: [%{role: "user", content: "Tell me a story"}]
    },
    api_key: System.fetch_env!("OPENAI_API_KEY")
  )

IO.puts(hd(response["choices"])["message"]["content"])
import init, { LlmClient } from "@kreuzberg/liter-llm-wasm";

await init();

// Note: chatStream is not yet supported in the WASM binding.
// Use the non-streaming chat method instead.
const client = new LlmClient({ apiKey: "sk-..." });
const response = await client.chat({
  model: "openai/gpt-4o",
  messages: [{ role: "user", content: "Tell me a story" }],
});

console.log(response.choices[0].message.content);

Processing Chunks

Each chunk contains a choices[].delta.content field with the incremental text. The first and last chunks may have a null content value. The final chunk includes a finish_reason of "stop".

Collecting the Full Response

If you need both real-time output and the complete text, accumulate deltas as you iterate:

import asyncio
import os
from liter_llm import LlmClient

async def main() -> None:
    client = LlmClient(api_key=os.environ["OPENAI_API_KEY"])
    full_text = ""
    async for chunk in await client.chat_stream(
        model="openai/gpt-4o",
        messages=[{"role": "user", "content": "Explain quantum computing briefly"}],
    ):
        delta = chunk.choices[0].delta.content if chunk.choices else None
        if delta:
            full_text += delta
            print(delta, end="", flush=True)
    print()
    print(f"\nFull response length: {len(full_text)} characters")

asyncio.run(main())
import { LlmClient } from "@kreuzberg/liter-llm";

const client = new LlmClient({ apiKey: process.env.OPENAI_API_KEY! });
const chunks = await client.chatStream({
  model: "openai/gpt-4o",
  messages: [{ role: "user", content: "Explain quantum computing briefly" }],
});

let fullText = "";
for (const chunk of chunks) {
  const delta = chunk.choices?.[0]?.delta?.content;
  if (delta) {
    fullText += delta;
    process.stdout.write(delta);
  }
}
console.log();
console.log(`\nFull response length: ${fullText.length} characters`);
package main

import (
 "context"
 "fmt"
 "os"
 "strings"

 llm "github.com/kreuzberg-dev/liter-llm/packages/go"
)

func main() {
 client := llm.NewClient(llm.WithAPIKey(os.Getenv("OPENAI_API_KEY")))
 var sb strings.Builder
 err := client.ChatStream(context.Background(), &llm.ChatCompletionRequest{
  Model: "openai/gpt-4o",
  Messages: []llm.Message{
   llm.NewTextMessage(llm.RoleUser, "Explain quantum computing briefly"),
  },
 }, func(chunk *llm.ChatCompletionChunk) error {
  if len(chunk.Choices) > 0 && chunk.Choices[0].Delta.Content != nil {
   delta := *chunk.Choices[0].Delta.Content
   sb.WriteString(delta)
   fmt.Print(delta)
  }
  return nil
 })
 if err != nil {
  panic(err)
 }
 fmt.Println()
 fmt.Printf("\nFull response length: %d characters\n", sb.Len())
}

Streaming with Parameters

All chat parameters work with chat_stream -- temperature, max_tokens, tools, and response_format are all supported:

async for chunk in await client.chat_stream(
    model="anthropic/claude-3-5-sonnet-20241022",
    messages=[
        {"role": "system", "content": "You are a creative writer."},
        {"role": "user", "content": "Write a short story"},
    ],
    temperature=0.9,
    max_tokens=500,
):
    if chunk.choices:
        delta = chunk.choices[0].delta.content
        if delta:
            print(delta, end="", flush=True)

Error Handling

Errors can occur at two points during streaming:

  1. Before any chunks -- connection failures, auth errors, invalid requests. Raised when calling chat_stream().
  2. During iteration -- network drops, provider errors mid-response. Raised from the stream iterator.
try:
    stream = await client.chat_stream(
        model="openai/gpt-4o",
        messages=[{"role": "user", "content": "Hello"}],
    )
    async for chunk in stream:
        if chunk.choices:
            delta = chunk.choices[0].delta.content
            if delta:
                print(delta, end="")
except Exception as e:
    print(f"Error: {e}")

Warning

A successful chat_stream() call does not guarantee a complete response. Always handle errors from the iteration loop as well.