Build the Streaming Function
Exit
Build the Streaming Function
Replace the blocking generate_answer with stream_answer that prints tokens as they arrive
💻
Writing code and entering commands is only available on desktop. Open this page on a larger screen to complete this chapter.
Printing tokens without buffering
The stream_answer function receives chunks from the API as the model generates them. Two arguments to print ensure each chunk appears immediately:
end=""— replaces the default newline so each chunk runs onto the same line as the previous one.flush=True— forces Python to write the chunk to the terminal immediately. Without it, Python's output buffer may hold several chunks before displaying them, which defeats the purpose of streaming.
After the loop finishes, a bare print() moves the cursor to the next line so the next prompt appears cleanly below the response.
The function also accumulates each chunk into full_text and returns it. History and prompt-building still need the complete response string, not individual chunks.
Instructions
- Define a function called
stream_answerthat takesclientandpromptas arguments. - Inside
stream_answer, create a variable calledfull_textand assign it an empty string"". - Write a
forloop:for chunk in client.models.generate_content_stream(model="gemini-2.5-flash", contents=prompt):. Usechunkas the loop variable. - Inside the loop, add
if chunk.text:to skip empty chunks. Inside that block:- Add
print(chunk.text, end="", flush=True)—end=""keeps the cursor on the same line so tokens run together, andflush=Trueforces the output to appear immediately rather than waiting in a buffer. - Add
full_text += chunk.textto accumulate the complete response text.
- Add
- After the loop (back at the function's indentation level), call
print()with no arguments — this moves the cursor to the next line after the stream finishes. - Return
full_text.
import json
import os
import sys
import time
import numpy as np
from dotenv import load_dotenv
from google import genai
from google.genai import types
from files import index_folder
def create_client():
load_dotenv()
api_key = os.getenv("GEMINI_API_KEY")
client = genai.Client(api_key=api_key)
return client
def embed_text(client, text):
result = client.models.embed_content(model="gemini-embedding-001", contents=text, config=types.EmbedContentConfig(task_type="RETRIEVAL_DOCUMENT"))
return result.embeddings[0].values
def embed_all_chunks(client, texts):
BATCH_SIZE = 90
embeddings = []
for i in range(0, len(texts), BATCH_SIZE):
batch = texts[i : i + BATCH_SIZE]
for text in batch:
embeddings.append(embed_text(client, text))
if i + BATCH_SIZE < len(texts):
print("Rate limit pause — waiting 60 seconds...")
time.sleep(60)
return embeddings
def cosine_similarity(vec_a, vec_b):
dot = np.dot(vec_a, vec_b)
norm = np.linalg.norm(vec_a) * np.linalg.norm(vec_b)
return dot / norm
def search(client, query, chunks, embeddings, top_k=3):
result = client.models.embed_content(model="gemini-embedding-001", contents=query, config=types.EmbedContentConfig(task_type="RETRIEVAL_QUERY"))
query_vector = result.embeddings[0].values
scores = [(cosine_similarity(query_vector, emb), chunk) for emb, chunk in zip(embeddings, chunks)]
scores.sort(key=lambda x: x[0], reverse=True)
return [chunk for _, chunk in scores[:top_k]]
def build_prompt(question, context_chunks, history=None, file_list=None):
context = "\n\n".join(chunk["text"] for chunk in context_chunks)
history_text = format_history(history or [])
files_line = ""
if file_list:
files_line = f"You have access to these files: {', '.join(file_list)}\n"
return (
f"{files_line}"
"You are a helpful assistant. Answer the question using only the context below.\n"
"If the answer is not in the context, say \"I don't know.\"\n\n"
f"Context:\n{context}"
f"{history_text}\n\n"
f"Question:\n{question}"
)
def generate_answer(client, prompt):
response = client.models.generate_content(model="gemini-2.5-flash", contents=prompt)
return response.text
# Step 1:
# Step 2:
# Step 3:
# Step 4:
# Step 5:
# Step 6:
def save_embeddings(chunks, embeddings, cache_path):
data = {"chunks": chunks, "embeddings": embeddings}
with open(cache_path, "w") as f:
json.dump(data, f)
def load_embeddings(cache_path):
if not os.path.exists(cache_path):
return None
with open(cache_path) as f:
data = json.load(f)
return data["chunks"], data["embeddings"]
def format_history(messages):
if not messages:
return ""
lines = ["\nConversation so far:"]
for msg in messages:
role = "You" if msg["role"] == "user" else "Assistant"
lines.append(f"{role}: {msg['content']}")
return "\n".join(lines)
def chat_loop(client, chunks, embeddings):
file_list = sorted(set(chunk["source"] for chunk in chunks))
messages = []
print("Assistant ready. Type your question, or /help for commands.\n")
while True:
question = input("You: ").strip()
if not question:
continue
top_chunks = search(client, question, chunks, embeddings)
prompt = build_prompt(question, top_chunks, messages, file_list)
answer = generate_answer(client, prompt)
print(f"Assistant: {answer}")
messages.append({"role": "user", "content": question})
messages.append({"role": "assistant", "content": answer})
def main():
if len(sys.argv) < 2:
print("Usage: python app.py <folder>")
sys.exit(1)
folder = sys.argv[1]
cache_path = folder.rstrip("/\\") + ".cache.json"
client = create_client()
cached = load_embeddings(cache_path)
if cached:
chunks, embeddings = cached
print(f"Loaded cache from {cache_path}")
else:
print(f"Indexing {folder}...")
chunks = index_folder(folder)
texts = [chunk["text"] for chunk in chunks]
file_count = len(set(chunk["source"] for chunk in chunks))
print(f"Indexed {len(chunks)} chunks from {file_count} files.")
embeddings = embed_all_chunks(client, texts)
save_embeddings(chunks, embeddings, cache_path)
print(f"Cache saved to {cache_path}")
chat_loop(client, chunks, embeddings)
if __name__ == "__main__":
main()
Interactive Code Editor
Sign in to write and run code, track your progress, and unlock all chapters.
Sign In to Start Coding