Handle /quit and /files
Add slash command detection to chat_loop with /quit to exit and /files to list indexed files
Writing code and entering commands is only available on desktop. Open this page on a larger screen to complete this chapter.
Giving users control over the loop
Right now the only way to exit the assistant is to press Ctrl+C. There is no way to see which files were indexed without looking at the startup output. Slash commands fix both problems.
A slash command is any input that starts with /. Before routing user input to the search pipeline, the loop checks whether the input starts with /. If it does, the command is handled and the loop continues — the AI pipeline is skipped entirely.
You'll add two commands in this chapter:
/quit: prints "Goodbye!" and breaks out of the loop, ending the program cleanly/files: lists every file that was indexed, one per line
An else clause catches any unrecognized command and tells the user to type /help.
Instructions
- After
if not question: continue, addif question.startswith("/"):. All slash command handling goes inside this block. - Inside the slash command block:
- Add
parts = question.split(maxsplit=1)— this splits the input into at most two parts, separating the command from any arguments that follow it. - Add
command = parts[0]to store just the command name.
- Add
- Add
if command == "/quit":, thenprint("Goodbye!"), thenbreak. - Add
elif command == "/files":. Inside it:- Add
print("Indexed files:"). - Add
for name in file_list:. - Inside the loop, add
print(f" {name}").
- Add
- Add
else:withprint(f"Unknown command: {command}"), thenprint("Type /help to see available commands."). - After the entire
if/elif/elseblock (still inside theif question.startswith("/"):block), addcontinue. This skips the search and prompt-building code below — slash commands don't send anything to the AI.
import json
import os
import sys
import time
import numpy as np
from dotenv import load_dotenv
from google import genai
from google.genai import types
from files import index_folder
def create_client():
load_dotenv()
api_key = os.getenv("GEMINI_API_KEY")
client = genai.Client(api_key=api_key)
return client
def embed_text(client, text):
result = client.models.embed_content(model="gemini-embedding-001", contents=text, config=types.EmbedContentConfig(task_type="RETRIEVAL_DOCUMENT"))
return result.embeddings[0].values
def embed_all_chunks(client, texts):
BATCH_SIZE = 90
embeddings = []
for i in range(0, len(texts), BATCH_SIZE):
batch = texts[i : i + BATCH_SIZE]
for text in batch:
embeddings.append(embed_text(client, text))
if i + BATCH_SIZE < len(texts):
print("Rate limit pause — waiting 60 seconds...")
time.sleep(60)
return embeddings
def cosine_similarity(vec_a, vec_b):
dot = np.dot(vec_a, vec_b)
norm = np.linalg.norm(vec_a) * np.linalg.norm(vec_b)
return dot / norm
def search(client, query, chunks, embeddings, top_k=3):
result = client.models.embed_content(model="gemini-embedding-001", contents=query, config=types.EmbedContentConfig(task_type="RETRIEVAL_QUERY"))
query_vector = result.embeddings[0].values
scores = [(cosine_similarity(query_vector, emb), chunk) for emb, chunk in zip(embeddings, chunks)]
scores.sort(key=lambda x: x[0], reverse=True)
return [chunk for _, chunk in scores[:top_k]]
def build_prompt(question, context_chunks, history=None, file_list=None):
context = "\n\n".join(chunk["text"] for chunk in context_chunks)
history_text = format_history(history or [])
files_line = ""
if file_list:
files_line = f"You have access to these files: {', '.join(file_list)}\n"
return (
f"{files_line}"
"You are a helpful assistant. Answer the question using only the context below.\n"
"If the answer is not in the context, say \"I don't know.\"\n\n"
f"Context:\n{context}"
f"{history_text}\n\n"
f"Question:\n{question}"
)
def stream_answer(client, prompt):
full_text = ""
for chunk in client.models.generate_content_stream(
model="gemini-2.5-flash", contents=prompt
):
if chunk.text:
print(chunk.text, end="", flush=True)
full_text += chunk.text
print()
return full_text
def save_embeddings(chunks, embeddings, cache_path):
data = {"chunks": chunks, "embeddings": embeddings}
with open(cache_path, "w") as f:
json.dump(data, f)
def load_embeddings(cache_path):
if not os.path.exists(cache_path):
return None
with open(cache_path) as f:
data = json.load(f)
return data["chunks"], data["embeddings"]
def format_history(messages):
if not messages:
return ""
lines = ["\nConversation so far:"]
for msg in messages:
role = "You" if msg["role"] == "user" else "Assistant"
lines.append(f"{role}: {msg['content']}")
return "\n".join(lines)
def chat_loop(client, chunks, embeddings):
file_list = sorted(set(chunk["source"] for chunk in chunks))
history = []
print("Assistant ready. Type your question, or /help for commands.\n")
while True:
question = input("You: ").strip()
if not question:
continue
# Step 1:
# Step 2:
# Step 3:
# Step 4:
# Step 5:
# Step 6:
top_chunks = search(client, question, chunks, embeddings)
prompt = build_prompt(question, top_chunks, history, file_list)
print("Assistant: ", end="", flush=True)
answer = stream_answer(client, prompt)
history.append({"role": "user", "content": question})
history.append({"role": "assistant", "content": answer})
def main():
if len(sys.argv) < 2:
print("Usage: python app.py <folder>")
sys.exit(1)
folder = sys.argv[1]
cache_path = folder.rstrip("/\\") + ".cache.json"
client = create_client()
cached = load_embeddings(cache_path)
if cached:
chunks, embeddings = cached
print(f"Loaded cache from {cache_path}")
else:
print(f"Indexing {folder}...")
chunks = index_folder(folder)
texts = [chunk["text"] for chunk in chunks]
file_count = len(set(chunk["source"] for chunk in chunks))
print(f"Indexed {len(chunks)} chunks from {file_count} files.")
embeddings = embed_all_chunks(client, texts)
save_embeddings(chunks, embeddings, cache_path)
print(f"Cache saved to {cache_path}")
chat_loop(client, chunks, embeddings)
if __name__ == "__main__":
main()
Interactive Code Editor
Sign in to write and run code, track your progress, and unlock all chapters.
Sign In to Start Coding