Streaming Guide¶
Streaming delivers tokens incrementally as they are generated, rather than waiting for the full response. Use it for real-time UIs, long responses, or when time-to-first-token matters.
When to Use Streaming¶
| Scenario | Recommendation |
|---|---|
| Chat UI showing tokens as they arrive | Stream |
| Background batch processing | Non-streaming |
| Long-form content generation | Stream |
| Short answers (classification, yes/no) | Non-streaming |
Need usage data immediately |
Non-streaming (some providers omit usage in streams) |
Basic Streaming¶
import asyncio
import os
from liter_llm import LlmClient
async def main() -> None:
client = LlmClient(api_key=os.environ["OPENAI_API_KEY"])
async for chunk in await client.chat_stream(
model="openai/gpt-4o",
messages=[{"role": "user", "content": "Tell me a story"}],
):
if chunk.choices and chunk.choices[0].delta.content:
print(chunk.choices[0].delta.content, end="", flush=True)
print()
asyncio.run(main())
import { LlmClient } from "@kreuzberg/liter-llm";
const client = new LlmClient({ apiKey: process.env.OPENAI_API_KEY! });
const chunks = await client.chatStream({
model: "openai/gpt-4o",
messages: [{ role: "user", content: "Tell me a story" }],
});
for (const chunk of chunks) {
process.stdout.write(chunk.choices[0]?.delta?.content ?? "");
}
console.log();
package main
import (
"context"
"fmt"
"os"
llm "github.com/kreuzberg-dev/liter-llm/packages/go"
)
func main() {
client := llm.NewClient(llm.WithAPIKey(os.Getenv("OPENAI_API_KEY")))
err := client.ChatStream(
context.Background(),
&llm.ChatCompletionRequest{
Model: "openai/gpt-4o",
Messages: []llm.Message{
llm.NewTextMessage(llm.RoleUser, "Tell me a story"),
},
},
func(chunk *llm.ChatCompletionChunk) error {
if len(chunk.Choices) > 0 && chunk.Choices[0].Delta.Content != nil {
fmt.Print(*chunk.Choices[0].Delta.Content)
}
return nil
},
)
if err != nil {
panic(err)
}
fmt.Println()
}
# frozen_string_literal: true
require "liter_llm"
require "json"
# Note: The Ruby client does not yet support streaming.
# Use the non-streaming chat method instead.
client = LiterLlm::LlmClient.new(ENV.fetch("OPENAI_API_KEY"), {})
response = JSON.parse(client.chat(JSON.generate(
model: "openai/gpt-4o",
messages: [{ role: "user", content: "Tell me a story" }]
)))
puts response.dig("choices", 0, "message", "content")
import dev.kreuzberg.literllm.LlmClient;
import dev.kreuzberg.literllm.Types.*;
import java.util.List;
public class Main {
public static void main(String[] args) throws Exception {
// Note: The Java client does not yet support streaming.
// Use the non-streaming chat method instead.
try (var client = LlmClient.builder()
.apiKey(System.getenv("OPENAI_API_KEY"))
.build()) {
var response = client.chat(new ChatCompletionRequest(
"openai/gpt-4o",
List.of(new UserMessage("Tell me a story"))
));
System.out.println(response.choices().getFirst().message().content());
}
}
}
using LiterLlm;
// Note: The C# client does not yet support streaming.
// Use the non-streaming ChatAsync method instead.
await using var client = new LlmClient(
apiKey: Environment.GetEnvironmentVariable("OPENAI_API_KEY")!);
var response = await client.ChatAsync(new ChatCompletionRequest(
Model: "openai/gpt-4o",
Messages: [new UserMessage("Tell me a story")]
));
Console.WriteLine(response.Choices[0].Message.Content);
# Note: The Elixir client does not yet support streaming.
# Use the non-streaming chat function instead.
{:ok, response} =
LiterLlm.chat(
%{
model: "openai/gpt-4o",
messages: [%{role: "user", content: "Tell me a story"}]
},
api_key: System.fetch_env!("OPENAI_API_KEY")
)
IO.puts(hd(response["choices"])["message"]["content"])
import init, { LlmClient } from "@kreuzberg/liter-llm-wasm";
await init();
// Note: chatStream is not yet supported in the WASM binding.
// Use the non-streaming chat method instead.
const client = new LlmClient({ apiKey: "sk-..." });
const response = await client.chat({
model: "openai/gpt-4o",
messages: [{ role: "user", content: "Tell me a story" }],
});
console.log(response.choices[0].message.content);
Processing Chunks¶
Each chunk contains a choices[].delta.content field with the incremental text. The first and last chunks may have a null content value. The final chunk includes a finish_reason of "stop".
Collecting the Full Response¶
If you need both real-time output and the complete text, accumulate deltas as you iterate:
import asyncio
import os
from liter_llm import LlmClient
async def main() -> None:
client = LlmClient(api_key=os.environ["OPENAI_API_KEY"])
full_text = ""
async for chunk in await client.chat_stream(
model="openai/gpt-4o",
messages=[{"role": "user", "content": "Explain quantum computing briefly"}],
):
delta = chunk.choices[0].delta.content if chunk.choices else None
if delta:
full_text += delta
print(delta, end="", flush=True)
print()
print(f"\nFull response length: {len(full_text)} characters")
asyncio.run(main())
import { LlmClient } from "@kreuzberg/liter-llm";
const client = new LlmClient({ apiKey: process.env.OPENAI_API_KEY! });
const chunks = await client.chatStream({
model: "openai/gpt-4o",
messages: [{ role: "user", content: "Explain quantum computing briefly" }],
});
let fullText = "";
for (const chunk of chunks) {
const delta = chunk.choices?.[0]?.delta?.content;
if (delta) {
fullText += delta;
process.stdout.write(delta);
}
}
console.log();
console.log(`\nFull response length: ${fullText.length} characters`);
package main
import (
"context"
"fmt"
"os"
"strings"
llm "github.com/kreuzberg-dev/liter-llm/packages/go"
)
func main() {
client := llm.NewClient(llm.WithAPIKey(os.Getenv("OPENAI_API_KEY")))
var sb strings.Builder
err := client.ChatStream(context.Background(), &llm.ChatCompletionRequest{
Model: "openai/gpt-4o",
Messages: []llm.Message{
llm.NewTextMessage(llm.RoleUser, "Explain quantum computing briefly"),
},
}, func(chunk *llm.ChatCompletionChunk) error {
if len(chunk.Choices) > 0 && chunk.Choices[0].Delta.Content != nil {
delta := *chunk.Choices[0].Delta.Content
sb.WriteString(delta)
fmt.Print(delta)
}
return nil
})
if err != nil {
panic(err)
}
fmt.Println()
fmt.Printf("\nFull response length: %d characters\n", sb.Len())
}
Streaming with Parameters¶
All chat parameters work with chat_stream -- temperature, max_tokens, tools, and response_format are all supported:
async for chunk in await client.chat_stream(
model="anthropic/claude-3-5-sonnet-20241022",
messages=[
{"role": "system", "content": "You are a creative writer."},
{"role": "user", "content": "Write a short story"},
],
temperature=0.9,
max_tokens=500,
):
if chunk.choices:
delta = chunk.choices[0].delta.content
if delta:
print(delta, end="", flush=True)
Error Handling¶
Errors can occur at two points during streaming:
- Before any chunks -- connection failures, auth errors, invalid requests. Raised when calling
chat_stream(). - During iteration -- network drops, provider errors mid-response. Raised from the stream iterator.
try:
stream = await client.chat_stream(
model="openai/gpt-4o",
messages=[{"role": "user", "content": "Hello"}],
)
async for chunk in stream:
if chunk.choices:
delta = chunk.choices[0].delta.content
if delta:
print(delta, end="")
except Exception as e:
print(f"Error: {e}")
Warning
A successful chat_stream() call does not guarantee a complete response. Always handle errors from the iteration loop as well.