Use File Routing in the Loop
Update chat_loop to call resolve_file_reference before deciding whether to run vector search
Writing code and entering commands is only available on desktop. Open this page on a larger screen to complete this chapter.
Routing questions to files
resolve_file_reference returns two values: a cleaned question and the matching chunks (or None). But chat_loop still calls search for every question — it never uses the new function.
This chapter wires up the routing. Before running vector search, chat_loop calls resolve_file_reference. If the function returns chunks, those chunks are used directly. If it returns None, the normal search pipeline runs instead.
The cleaned question replaces the original in build_prompt and in the history append, so the @filename token is stripped from both the model prompt and the stored exchange.
Instructions
- In
chat_loop, replacetop_chunks = search(client, question, chunks, embeddings)with three lines that check for an@filenamereference first:clean_question, file_chunks = resolve_file_reference(question, chunks)— this returns the cleaned question (with@filenameremoved) and the matching chunks, orNoneif no@filenamewas found.if file_chunks:top_chunks = file_chunks— use the targeted chunks directly, skipping vector search.
- Add the
else:fallback for questions without a file reference:clean_question = question— when there is no@filename, the question is unchanged, but assign it toclean_questionso the variable name is consistent in the lines below.top_chunks = search(client, question, chunks, embeddings)
- Update the
build_promptcall: change the first argument fromquestiontoclean_question. - Update
history.append({"role": "user", "content": question})to useclean_questioninstead ofquestion— this stores the cleaned version (without the@filenametoken) in history.
import json
import os
import re
import sys
import time
import numpy as np
from dotenv import load_dotenv
from google import genai
from google.genai import types
from files import index_folder
def create_client():
load_dotenv()
api_key = os.getenv("GEMINI_API_KEY")
client = genai.Client(api_key=api_key)
return client
def embed_text(client, text):
result = client.models.embed_content(model="gemini-embedding-001", contents=text, config=types.EmbedContentConfig(task_type="RETRIEVAL_DOCUMENT"))
return result.embeddings[0].values
def embed_all_chunks(client, texts):
BATCH_SIZE = 90
embeddings = []
for i in range(0, len(texts), BATCH_SIZE):
batch = texts[i : i + BATCH_SIZE]
for text in batch:
embeddings.append(embed_text(client, text))
if i + BATCH_SIZE < len(texts):
print("Rate limit pause — waiting 60 seconds...")
time.sleep(60)
return embeddings
def cosine_similarity(vec_a, vec_b):
dot = np.dot(vec_a, vec_b)
norm = np.linalg.norm(vec_a) * np.linalg.norm(vec_b)
return dot / norm
def search(client, query, chunks, embeddings, top_k=3):
result = client.models.embed_content(model="gemini-embedding-001", contents=query, config=types.EmbedContentConfig(task_type="RETRIEVAL_QUERY"))
query_vector = result.embeddings[0].values
scores = [(cosine_similarity(query_vector, emb), chunk) for emb, chunk in zip(embeddings, chunks)]
scores.sort(key=lambda x: x[0], reverse=True)
return [chunk for _, chunk in scores[:top_k]]
def build_prompt(question, context_chunks, history=None, file_list=None):
context = "\n\n".join(chunk["text"] for chunk in context_chunks)
history_text = format_history(history or [])
files_line = ""
if file_list:
files_line = f"You have access to these files: {', '.join(file_list)}\n"
return (
f"{files_line}"
"You are a helpful assistant. Answer the question using only the context below.\n"
"If the answer is not in the context, say \"I don't know.\"\n\n"
f"Context:\n{context}"
f"{history_text}\n\n"
f"Question:\n{question}"
)
def stream_answer(client, prompt):
full_text = ""
for chunk in client.models.generate_content_stream(
model="gemini-2.5-flash", contents=prompt
):
if chunk.text:
print(chunk.text, end="", flush=True)
full_text += chunk.text
print()
return full_text
def resolve_file_reference(question, chunks):
match = re.search(r'@(\S+)', question)
if not match:
return question, None
filename = match.group(1)
file_chunks = [c for c in chunks if c["source"] == filename]
if not file_chunks:
file_chunks = [c for c in chunks if filename in c["source"]]
if not file_chunks:
return question, None
clean_question = question.replace(match.group(0), "").strip()
return clean_question, file_chunks
def save_embeddings(chunks, embeddings, cache_path):
data = {"chunks": chunks, "embeddings": embeddings}
with open(cache_path, "w") as f:
json.dump(data, f)
def load_embeddings(cache_path):
if not os.path.exists(cache_path):
return None
with open(cache_path) as f:
data = json.load(f)
return data["chunks"], data["embeddings"]
def format_history(messages):
if not messages:
return ""
lines = ["\nConversation so far:"]
for msg in messages:
role = "You" if msg["role"] == "user" else "Assistant"
lines.append(f"{role}: {msg['content']}")
return "\n".join(lines)
def chat_loop(client, chunks, embeddings):
file_list = sorted(set(chunk["source"] for chunk in chunks))
history = []
print("Assistant ready. Type your question, or /help for commands.\n")
while True:
question = input("You: ").strip()
if not question:
continue
if question.startswith("/"):
parts = question.split(maxsplit=1)
command = parts[0]
if command == "/quit":
print("Goodbye!")
break
elif command == "/files":
print("Indexed files:")
for name in file_list:
print(f" {name}")
elif command == "/new":
history = []
print("New conversation started. I won't remember what we discussed before.")
elif command == "/help":
print("/files — list indexed files")
print("/new — start a new conversation (clears memory)")
print("/quit — exit")
print("@filename ... — ask about a specific file, e.g. @readme.md what does it cover?")
else:
print(f"Unknown command: {command}")
print("Type /help to see available commands.")
continue
# Step 1:
top_chunks = search(client, question, chunks, embeddings)
# Step 2:
# Step 3: update build_prompt call
prompt = build_prompt(question, top_chunks, history, file_list)
print("Assistant: ", end="", flush=True)
answer = stream_answer(client, prompt)
# Step 4: update history.append
history.append({"role": "user", "content": question})
history.append({"role": "assistant", "content": answer})
def main():
if len(sys.argv) < 2:
print("Usage: python app.py <folder>")
sys.exit(1)
folder = sys.argv[1]
cache_path = folder.rstrip("/\\") + ".cache.json"
client = create_client()
cached = load_embeddings(cache_path)
if cached:
chunks, embeddings = cached
print(f"Loaded cache from {cache_path}")
else:
print(f"Indexing {folder}...")
chunks = index_folder(folder)
texts = [chunk["text"] for chunk in chunks]
file_count = len(set(chunk["source"] for chunk in chunks))
print(f"Indexed {len(chunks)} chunks from {file_count} files.")
embeddings = embed_all_chunks(client, texts)
save_embeddings(chunks, embeddings, cache_path)
print(f"Cache saved to {cache_path}")
chat_loop(client, chunks, embeddings)
if __name__ == "__main__":
main()
Interactive Code Editor
Sign in to write and run code, track your progress, and unlock all chapters.
Sign In to Start Coding