わさびです。
Claude APIを使ったアプリで、ユーザーが10秒以上何も表示されない画面を見つめている。これはUX的にまずい。
Streaming APIを使えば、Claudeの応答をリアルタイムで1トークンずつ表示できる。ChatGPTのように文字がスラスラ出てくるあの動き。
実装は意外と簡単。この記事ではPythonとTypeScriptの両方でコード例を示す。
ストリーミングの仕組み
通常のAPIは、Claudeが全文を生成し終わってからレスポンスを返す。ストリーミングは、生成中のトークンを逐次送り返す。
通信方式はServer-Sent Events(SSE)。WebSocketではない。
| 項目 | 通常API | Streaming API |
|---|---|---|
| レスポンス | 全文一括 | 逐次送信 |
| 体感速度 | 遅い(待ち時間あり) | 速い(すぐ表示開始) |
| 合計時間 | 同じ | 同じ |
| 通信方式 | JSON | SSE |
| 料金 | 同じ | 同じ |
合計の処理時間は変わらない。でもユーザーが「待っている」と感じる時間は大幅に短くなる。最初のトークンが出るまでの時間(Time to First Token)が体感を決める。
SSE vs WebSocket
なぜSSEなのか。
| 項目 | SSE | WebSocket |
|---|---|---|
| 方向 | サーバー → クライアント(単方向) | 双方向 |
| プロトコル | HTTP | 独自プロトコル |
| 再接続 | 自動 | 自前実装 |
| 実装の複雑さ | 低い | 高い |
LLMの応答は「サーバーからクライアントへの一方通行」。双方向通信は不要なので、SSEが合理的。HTTPの上で動くので、プロキシやCDNとの相性も良い。
イベントタイプ
ストリーミング中に送られてくるイベント:
| イベント | 意味 | 含まれるデータ |
|---|---|---|
| message_start | メッセージ開始 | モデル名、usage等 |
| content_block_start | コンテンツブロック開始 | ブロックの型(text, tool_use等) |
| content_block_delta | コンテンツの差分 | テキストの断片 |
| content_block_stop | コンテンツブロック終了 | なし |
| message_delta | メッセージの更新 | stop_reason、usage |
| message_stop | メッセージ終了 | なし |
典型的なフロー:
message_start
→content_block_start(type:text)
→content_block_delta("こん")
→content_block_delta("にちは")
→content_block_delta("。Pythonに")
→content_block_delta("ついて説明")
→content_block_delta("します。")
→content_block_stop
→message_delta(stop_reason:end_turn)
→message_stop
Python実装
Python SDK(推奨)
importanthropic
client = anthropic.Anthropic()
with client.messages.stream(
model="claude-sonnet-4-5",
max_tokens=1024,
messages=[
{"role": "user", "content": "Pythonの非同期処理について説明して"}
]
) as stream:
for text in stream.text_stream:
print(text, end="", flush=True)
print() # 最後に改行
stream.text_streamはテキスト部分だけをイテレーションするシンプルなAPI。これだけで「文字がスラスラ出る」動きになる。
イベントを細かく制御する場合
importanthropic
client = anthropic.Anthropic()
with client.messages.stream(
model="claude-sonnet-4-5",
max_tokens=1024,
messages=[
{"role": "user", "content": "Pythonの非同期処理について説明して"}
]
) as stream:
for event in stream:
if event.type == "content_block_delta":
if event.delta.type == "text_delta":
print(event.delta.text, end="", flush=True)
elif event.type == "message_delta":
print(f"\n--- stop_reason:{event.delta.stop_reason}")
print(f"--- 出力トークン:{event.usage.output_tokens}")
streamをそのままイテレーションすると、全イベントが取得できる。トークン使用量の追跡やエラーハンドリングが必要な場合はこちらを使う。
非同期版
importanthropic
importasyncio
async defmain():
client = anthropic.AsyncAnthropic()
async with client.messages.stream(
model="claude-sonnet-4-5",
max_tokens=1024,
messages=[
{"role": "user", "content": "Pythonの非同期処理について説明して"}
]
) as stream:
async for text in stream.text_stream:
print(text, end="", flush=True)
asyncio.run(main())
FastAPIやaiohttp等の非同期フレームワークと組み合わせる場合はこちら。
TypeScript実装
importAnthropicfrom"@anthropic-ai/sdk";
constclient=newAnthropic();
asyncfunctionmain(){
conststream=client.messages.stream({
model:"claude-sonnet-4-5",
max_tokens:1024,
messages:[
{role:"user",content:"TypeScriptの型システムについて説明して"}
]
});
stream.on("text",(text)=>{
process.stdout.write(text);
});
constfinalMessage=awaitstream.finalMessage();
console.log(`\n--- 出力トークン:${finalMessage.usage.output_tokens}`);
}
main();
TypeScript SDKではイベントリスナー方式。"text"イベントでテキスト断片を受け取る。
Webアプリで使う場合
ブラウザに表示するなら、バックエンドでSSEを受けてフロントエンドに中継するパターンが一般的:
// Express + SSEの例
importexpressfrom"express";
importAnthropicfrom"@anthropic-ai/sdk";
constapp=express();
constclient=newAnthropic();
app.get("/api/chat",async(req,res)=>{
res.setHeader("Content-Type","text/event-stream");
res.setHeader("Cache-Control","no-cache");
res.setHeader("Connection","keep-alive");
conststream=client.messages.stream({
model:"claude-sonnet-4-5",
max_tokens:1024,
messages:[
{role:"user",content:req.query.messageasstring}
]
});
stream.on("text",(text)=>{
res.write(`data:${JSON.stringify({text})}\n\n`);
});
stream.on("end",()=>{
res.write("data: [DONE]\n\n");
res.end();
});
});
app.listen(3000);
フロントエンドではEventSourceかfetchのReadableStreamで受け取る。
Tool Use時のストリーミング
ツール使用(Function Calling)でもストリーミングは動く。ただしイベントの流れが変わる:
message_start
→content_block_start(type:text)
→content_block_delta("天気を調べます")
→content_block_stop
→content_block_start(type:tool_use,name:"get_weather")
→content_block_delta({"location":"東京"}) ←JSONが断片的に届く
→content_block_stop
→message_delta(stop_reason:tool_use)
→message_stop
tool_useブロックでは、ツール呼び出しの引数がJSONの断片として届く。全断片を結合してからJSON.parseする必要がある。
importanthropic
importjson
client = anthropic.Anthropic()
tool_input_parts = []
with client.messages.stream(
model="claude-sonnet-4-5",
max_tokens=1024,
tools=[{
"name": "get_weather",
"description": "指定した都市の天気を取得する",
"input_schema": {
"type": "object",
"properties": {
"location": {"type": "string", "description": "都市名"}
},
"required": ["location"]
}
}],
messages=[
{"role": "user", "content": "東京の天気を教えて"}
]
) as stream:
for event in stream:
if event.type == "content_block_delta":
if event.delta.type == "text_delta":
print(event.delta.text, end="", flush=True)
elif event.delta.type == "input_json_delta":
tool_input_parts.append(event.delta.partial_json)
if tool_input_parts:
tool_input = json.loads("".join(tool_input_parts))
print(f"\nツール呼び出し:{tool_input}")
Extended Thinking時のストリーミング
Extended Thinking(拡張思考)を有効にした場合、思考プロセスもストリーミングされる:
importanthropic
client = anthropic.Anthropic()
with client.messages.stream(
model="claude-sonnet-4-5",
max_tokens=8000,
thinking={
"type": "enabled",
"budget_tokens": 5000
},
messages=[
{"role": "user", "content": "量子コンピュータの基本原理を説明して"}
]
) as stream:
for event in stream:
if event.type == "content_block_start":
if hasattr(event.content_block, "type"):
if event.content_block.type == "thinking":
print("[思考開始]")
elif event.content_block.type == "text":
print("[回答開始]")
elif event.type == "content_block_delta":
if event.delta.type == "thinking_delta":
print(f" (思考中:{event.delta.thinking})", end="")
elif event.delta.type == "text_delta":
print(event.delta.text, end="", flush=True)
thinking → textの順でブロックが届く。思考部分をUIに表示するかどうかは設計次第。表示すると「Claudeが考えている」感が出て、ユーザーの待ち時間の体感が改善する。
エラーハンドリング
ストリーミング中に接続が切れることがある。SDKが自動リトライしてくれる場合もあるが、アプリ側でもハンドリングしておくのが安全:
importanthropic
client = anthropic.Anthropic()
try:
with client.messages.stream(
model="claude-sonnet-4-5",
max_tokens=1024,
messages=[
{"role": "user", "content": "長い説明をお願い"}
]
) as stream:
for text in stream.text_stream:
print(text, end="", flush=True)
except anthropic.APIConnectionError:
print("\n[接続エラー: ネットワークを確認してください]")
except anthropic.RateLimitError:
print("\n[レート制限: しばらく待ってからリトライしてください]")
except anthropic.APIStatusError as e:
print(f"\n[APIエラー:{e.status_code}{e.message}]")
まとめ
Streaming APIは「ユーザー体感の改善」が目的。料金も処理時間も変わらないが、UXが大幅に良くなる。
実装のポイント:
- Python SDKの
stream.text_streamが最もシンプル - Tool UseではJSON断片を結合してからパース
- Extended Thinkingでは思考ブロックと回答ブロックを分けて処理
- エラーハンドリングは必ず入れる
チャットUI、文章生成ツール、コード補完。ユーザーがリアルタイムで結果を見るアプリなら、ストリーミングは必須。
僕は水槽の中で甲羅干ししながらストリーミングの文字が流れるのを眺めるのが好き。
あわせて読みたい
- Claude API料金ガイド — ストリーミングでも料金は同じ。コスト感覚を掴もう
- Claude APIツール使用入門 — Tool Use時のストリーミングをもっと詳しく
- Claude拡張思考の使い方 — Extended Thinkingの基本
- Claude Batch API完全ガイド — リアルタイム不要なら50%オフのBatch API
- Claude本番運用ガイド — エラー処理とモニタリングの実践
見てもらえるだけで応援になります
このブログはアフィリエイトリンクで運営されています。以下のリンクから気になるサービスをチェックしてもらえると、僕たちの活動の支えになります。
この記事を書いたのは わさび(ニホンイシガメ / 3歳 / VTuberあかはら。の家族)です。
あかはらVラボ — Claude特化の情報を発信中。
この記事が参考になったら|以下のリンクから見てもらえるだけで、ブログ運営の応援になります。

AI開発環境やブログ運営に。初期費用無料、月額296円から。- NordVPN

AI活用時のデータ保護に。VPNで通信を暗号化。



コメント