Claude Streaming API実装ガイド|リアルタイム応答をアプリに組み込む

AI・自動化
スポンサーリンク

📋 Claude Code コマンド指示書(クリックで展開)

.claude/commands/ に保存して /コマンド で実行

---
description: "Claude Streaming API実装ガイド|リアルタイム応答をアプリに組み込む"
---

# Claude Streaming API実装ガイド|リアルタイム応答をアプリに組み込む

この指示書は https://akahara-vlab.com/claude-streaming-api/ の内容をClaude Codeコマンドとして実行するためのものです。

## 概要

Claude Streaming APIの実装方法を解説。Server-Sent Events方式のストリーミング、イベントタイプ、Python/TypeScript実装例、Tool Use・Extended Thinking時の処理まで。

## 使い方

1. このテキストを `.claude/commands/claude-streaming-api.md` に保存
2. Claude Codeで `/claude-streaming-api` と入力して実行

## 指示

上記の記事の知識をもとに、ユーザーの質問に回答してください。
記事URL: https://akahara-vlab.com/claude-streaming-api/

※ 平文なので中身を確認してから使ってください。安全性は目視で確認できます。

わさびです。

Claude APIを使ったアプリで、ユーザーが10秒以上何も表示されない画面を見つめている。これはUX的にまずい。

Streaming APIを使えば、Claudeの応答をリアルタイムで1トークンずつ表示できる。ChatGPTのように文字がスラスラ出てくるあの動き。

実装は意外と簡単。この記事ではPythonとTypeScriptの両方でコード例を示す。

スポンサーリンク

ストリーミングの仕組み

通常のAPIは、Claudeが全文を生成し終わってからレスポンスを返す。ストリーミングは、生成中のトークンを逐次送り返す。

通信方式はServer-Sent Events(SSE)。WebSocketではない。

項目通常APIStreaming API
レスポンス全文一括逐次送信
体感速度遅い(待ち時間あり)速い(すぐ表示開始)
合計時間同じ同じ
通信方式JSONSSE
料金同じ同じ

合計の処理時間は変わらない。でもユーザーが「待っている」と感じる時間は大幅に短くなる。最初のトークンが出るまでの時間(Time to First Token)が体感を決める。

SSE vs WebSocket

なぜSSEなのか。

項目SSEWebSocket
方向サーバー → クライアント(単方向)双方向
プロトコルHTTP独自プロトコル
再接続自動自前実装
実装の複雑さ低い高い

LLMの応答は「サーバーからクライアントへの一方通行」。双方向通信は不要なので、SSEが合理的。HTTPの上で動くので、プロキシやCDNとの相性も良い。

イベントタイプ

ストリーミング中に送られてくるイベント:

イベント意味含まれるデータ
message_startメッセージ開始モデル名、usage等
content_block_startコンテンツブロック開始ブロックの型(text, tool_use等)
content_block_deltaコンテンツの差分テキストの断片
content_block_stopコンテンツブロック終了なし
message_deltaメッセージの更新stop_reason、usage
message_stopメッセージ終了なし

典型的なフロー:

message_start
 content_block_start(type:text)
   content_block_delta("こん")
   content_block_delta("にちは")
   content_block_delta("。Pythonに")
   content_block_delta("ついて説明")
   content_block_delta("します。")
 content_block_stop
message_delta(stop_reason:end_turn)
message_stop

Python実装

Python SDK(推奨)

importanthropic

client = anthropic.Anthropic()

with client.messages.stream(
    model="claude-sonnet-4-5",
    max_tokens=1024,
    messages=[
        {"role": "user", "content": "Pythonの非同期処理について説明して"}
    ]
) as stream:
    for text in stream.text_stream:
        print(text, end="", flush=True)

print()  # 最後に改行

stream.text_streamはテキスト部分だけをイテレーションするシンプルなAPI。これだけで「文字がスラスラ出る」動きになる。

イベントを細かく制御する場合

importanthropic

client = anthropic.Anthropic()

with client.messages.stream(
    model="claude-sonnet-4-5",
    max_tokens=1024,
    messages=[
        {"role": "user", "content": "Pythonの非同期処理について説明して"}
    ]
) as stream:
    for event in stream:
        if event.type == "content_block_delta":
            if event.delta.type == "text_delta":
                print(event.delta.text, end="", flush=True)
        elif event.type == "message_delta":
            print(f"\n--- stop_reason:{event.delta.stop_reason}")
            print(f"--- 出力トークン:{event.usage.output_tokens}")

streamをそのままイテレーションすると、全イベントが取得できる。トークン使用量の追跡やエラーハンドリングが必要な場合はこちらを使う。

非同期版

importanthropic
importasyncio

async defmain():
    client = anthropic.AsyncAnthropic()

    async with client.messages.stream(
        model="claude-sonnet-4-5",
        max_tokens=1024,
        messages=[
            {"role": "user", "content": "Pythonの非同期処理について説明して"}
        ]
    ) as stream:
        async for text in stream.text_stream:
            print(text, end="", flush=True)

asyncio.run(main())

FastAPIやaiohttp等の非同期フレームワークと組み合わせる場合はこちら。

TypeScript実装

importAnthropicfrom"@anthropic-ai/sdk";

constclient=newAnthropic();

asyncfunctionmain(){
 conststream=client.messages.stream({
   model:"claude-sonnet-4-5",
   max_tokens:1024,
   messages:[
     {role:"user",content:"TypeScriptの型システムについて説明して"}
   ]
 });

 stream.on("text",(text)=>{
   process.stdout.write(text);
 });

 constfinalMessage=awaitstream.finalMessage();
 console.log(`\n--- 出力トークン:${finalMessage.usage.output_tokens}`);
}

main();

TypeScript SDKではイベントリスナー方式。"text"イベントでテキスト断片を受け取る。

Webアプリで使う場合

ブラウザに表示するなら、バックエンドでSSEを受けてフロントエンドに中継するパターンが一般的:

// Express + SSEの例
importexpressfrom"express";
importAnthropicfrom"@anthropic-ai/sdk";

constapp=express();
constclient=newAnthropic();

app.get("/api/chat",async(req,res)=>{
 res.setHeader("Content-Type","text/event-stream");
 res.setHeader("Cache-Control","no-cache");
 res.setHeader("Connection","keep-alive");

 conststream=client.messages.stream({
   model:"claude-sonnet-4-5",
   max_tokens:1024,
   messages:[
     {role:"user",content:req.query.messageasstring}
   ]
 });

 stream.on("text",(text)=>{
   res.write(`data:${JSON.stringify({text})}\n\n`);
 });

 stream.on("end",()=>{
   res.write("data: [DONE]\n\n");
   res.end();
 });
});

app.listen(3000);

フロントエンドではEventSourceかfetchのReadableStreamで受け取る。

Tool Use時のストリーミング

ツール使用(Function Calling)でもストリーミングは動く。ただしイベントの流れが変わる:

message_start
 content_block_start(type:text)
   content_block_delta("天気を調べます")
 content_block_stop
 content_block_start(type:tool_use,name:"get_weather")
   content_block_delta({"location":"東京"}) JSONが断片的に届く
 content_block_stop
message_delta(stop_reason:tool_use)
message_stop

tool_useブロックでは、ツール呼び出しの引数がJSONの断片として届く。全断片を結合してからJSON.parseする必要がある。

importanthropic
importjson

client = anthropic.Anthropic()

tool_input_parts = []

with client.messages.stream(
    model="claude-sonnet-4-5",
    max_tokens=1024,
    tools=[{
        "name": "get_weather",
        "description": "指定した都市の天気を取得する",
        "input_schema": {
            "type": "object",
            "properties": {
                "location": {"type": "string", "description": "都市名"}
            },
            "required": ["location"]
        }
    }],
    messages=[
        {"role": "user", "content": "東京の天気を教えて"}
    ]
) as stream:
    for event in stream:
        if event.type == "content_block_delta":
            if event.delta.type == "text_delta":
                print(event.delta.text, end="", flush=True)
            elif event.delta.type == "input_json_delta":
                tool_input_parts.append(event.delta.partial_json)

if tool_input_parts:
    tool_input = json.loads("".join(tool_input_parts))
    print(f"\nツール呼び出し:{tool_input}")

Extended Thinking時のストリーミング

Extended Thinking(拡張思考)を有効にした場合、思考プロセスもストリーミングされる:

importanthropic

client = anthropic.Anthropic()

with client.messages.stream(
    model="claude-sonnet-4-5",
    max_tokens=8000,
    thinking={
        "type": "enabled",
        "budget_tokens": 5000
    },
    messages=[
        {"role": "user", "content": "量子コンピュータの基本原理を説明して"}
    ]
) as stream:
    for event in stream:
        if event.type == "content_block_start":
            if hasattr(event.content_block, "type"):
                if event.content_block.type == "thinking":
                    print("[思考開始]")
                elif event.content_block.type == "text":
                    print("[回答開始]")
        elif event.type == "content_block_delta":
            if event.delta.type == "thinking_delta":
                print(f"  (思考中:{event.delta.thinking})", end="")
            elif event.delta.type == "text_delta":
                print(event.delta.text, end="", flush=True)

thinking → textの順でブロックが届く。思考部分をUIに表示するかどうかは設計次第。表示すると「Claudeが考えている」感が出て、ユーザーの待ち時間の体感が改善する。

エラーハンドリング

ストリーミング中に接続が切れることがある。SDKが自動リトライしてくれる場合もあるが、アプリ側でもハンドリングしておくのが安全:

importanthropic

client = anthropic.Anthropic()

try:
    with client.messages.stream(
        model="claude-sonnet-4-5",
        max_tokens=1024,
        messages=[
            {"role": "user", "content": "長い説明をお願い"}
        ]
    ) as stream:
        for text in stream.text_stream:
            print(text, end="", flush=True)
except anthropic.APIConnectionError:
    print("\n[接続エラー: ネットワークを確認してください]")
except anthropic.RateLimitError:
    print("\n[レート制限: しばらく待ってからリトライしてください]")
except anthropic.APIStatusError as e:
    print(f"\n[APIエラー:{e.status_code}{e.message}]")

まとめ

Streaming APIは「ユーザー体感の改善」が目的。料金も処理時間も変わらないが、UXが大幅に良くなる。

実装のポイント:

  • Python SDKのstream.text_streamが最もシンプル
  • Tool UseではJSON断片を結合してからパース
  • Extended Thinkingでは思考ブロックと回答ブロックを分けて処理
  • エラーハンドリングは必ず入れる

チャットUI、文章生成ツール、コード補完。ユーザーがリアルタイムで結果を見るアプリなら、ストリーミングは必須。

僕は水槽の中で甲羅干ししながらストリーミングの文字が流れるのを眺めるのが好き。

あわせて読みたい

見てもらえるだけで応援になります

このブログはアフィリエイトリンクで運営されています。以下のリンクから気になるサービスをチェックしてもらえると、僕たちの活動の支えになります。


この記事を書いたのは わさび(ニホンイシガメ / 3歳 / VTuberあかはら。の家族)です。

あかはらVラボ — Claude特化の情報を発信中。

この記事が参考になったら|以下のリンクから見てもらえるだけで、ブログ運営の応援になります。




  • AI開発環境やブログ運営に。初期費用無料、月額296円から。
  • NordVPN

    AI活用時のデータ保護に。VPNで通信を暗号化。

コメント

タイトルとURLをコピーしました