Detect the @filename Syntax
Write resolve_file_reference to extract @filename from a question and return the matching chunks
Writing code and entering commands is only available on desktop. Open this page on a larger screen to complete this chapter.
Targeting a specific file
Vector search finds the most relevant chunks across all indexed files. That works well for general questions, but sometimes you already know which file has the answer. Searching across everything wastes time and may pull in chunks from the wrong file.
The @filename syntax lets you skip the search entirely. If your question starts with @readme.md, the assistant collects all chunks from readme.md and sends them directly to the model — no embeddings, no ranking. For example: @api-reference.md what authentication method does it use?
You'll write resolve_file_reference to detect the @filename pattern and return either the matched file's chunks or None if no match is found.
Python's re module provides re.search, which scans a string for a pattern and returns a match object — or None if nothing matches. The pattern @(\S+) matches @ followed by one or more non-whitespace characters.
Instructions
- Add
import reto the imports, on a new line afterimport os. - Define a function called
resolve_file_referencethat takesquestionandchunksas arguments. - Inside
resolve_file_reference, assignmatch = re.search(r'@(\S+)', question)— this looks for an@filenametoken anywhere in the question. - Add
if not match:thenreturn question, None. - Assign
filename = match.group(1)— this extracts the filename without the@. - Assign
file_chunks = [c for c in chunks if c["source"] == filename]— this collects all chunks whose source matches exactly. - Add
if not file_chunks:thenfile_chunks = [c for c in chunks if filename in c["source"]]— this falls back to a partial match if no exact match exists. - Add
if not file_chunks:thenreturn question, None— if still no chunks, treat it as a normal question. - Assign
clean_question = question.replace(match.group(0), "").strip()— this removes the@filenametoken from the question text before sending it to the model. - Return
clean_question, file_chunks.
import json
import os
# Step 1: import re
import sys
import time
import numpy as np
from dotenv import load_dotenv
from google import genai
from google.genai import types
from files import index_folder
def create_client():
load_dotenv()
api_key = os.getenv("GEMINI_API_KEY")
client = genai.Client(api_key=api_key)
return client
def embed_text(client, text):
result = client.models.embed_content(model="gemini-embedding-001", contents=text, config=types.EmbedContentConfig(task_type="RETRIEVAL_DOCUMENT"))
return result.embeddings[0].values
def embed_all_chunks(client, texts):
BATCH_SIZE = 90
embeddings = []
for i in range(0, len(texts), BATCH_SIZE):
batch = texts[i : i + BATCH_SIZE]
for text in batch:
embeddings.append(embed_text(client, text))
if i + BATCH_SIZE < len(texts):
print("Rate limit pause — waiting 60 seconds...")
time.sleep(60)
return embeddings
def cosine_similarity(vec_a, vec_b):
dot = np.dot(vec_a, vec_b)
norm = np.linalg.norm(vec_a) * np.linalg.norm(vec_b)
return dot / norm
def search(client, query, chunks, embeddings, top_k=3):
result = client.models.embed_content(model="gemini-embedding-001", contents=query, config=types.EmbedContentConfig(task_type="RETRIEVAL_QUERY"))
query_vector = result.embeddings[0].values
scores = [(cosine_similarity(query_vector, emb), chunk) for emb, chunk in zip(embeddings, chunks)]
scores.sort(key=lambda x: x[0], reverse=True)
return [chunk for _, chunk in scores[:top_k]]
def build_prompt(question, context_chunks, history=None, file_list=None):
context = "\n\n".join(chunk["text"] for chunk in context_chunks)
history_text = format_history(history or [])
files_line = ""
if file_list:
files_line = f"You have access to these files: {', '.join(file_list)}\n"
return (
f"{files_line}"
"You are a helpful assistant. Answer the question using only the context below.\n"
"If the answer is not in the context, say \"I don't know.\"\n\n"
f"Context:\n{context}"
f"{history_text}\n\n"
f"Question:\n{question}"
)
def stream_answer(client, prompt):
full_text = ""
for chunk in client.models.generate_content_stream(
model="gemini-2.5-flash", contents=prompt
):
if chunk.text:
print(chunk.text, end="", flush=True)
full_text += chunk.text
print()
return full_text
# Step 2:
# Step 3:
# Step 4:
# Step 5:
# Step 6:
# Step 7:
# Step 8:
# Step 9:
# Step 10:
def save_embeddings(chunks, embeddings, cache_path):
data = {"chunks": chunks, "embeddings": embeddings}
with open(cache_path, "w") as f:
json.dump(data, f)
def load_embeddings(cache_path):
if not os.path.exists(cache_path):
return None
with open(cache_path) as f:
data = json.load(f)
return data["chunks"], data["embeddings"]
def format_history(messages):
if not messages:
return ""
lines = ["\nConversation so far:"]
for msg in messages:
role = "You" if msg["role"] == "user" else "Assistant"
lines.append(f"{role}: {msg['content']}")
return "\n".join(lines)
def chat_loop(client, chunks, embeddings):
file_list = sorted(set(chunk["source"] for chunk in chunks))
history = []
print("Assistant ready. Type your question, or /help for commands.\n")
while True:
question = input("You: ").strip()
if not question:
continue
if question.startswith("/"):
parts = question.split(maxsplit=1)
command = parts[0]
if command == "/quit":
print("Goodbye!")
break
elif command == "/files":
print("Indexed files:")
for name in file_list:
print(f" {name}")
elif command == "/new":
history = []
print("New conversation started. I won't remember what we discussed before.")
elif command == "/help":
print("/files — list indexed files")
print("/new — start a new conversation (clears memory)")
print("/quit — exit")
print("@filename ... — ask about a specific file, e.g. @readme.md what does it cover?")
else:
print(f"Unknown command: {command}")
print("Type /help to see available commands.")
continue
top_chunks = search(client, question, chunks, embeddings)
prompt = build_prompt(question, top_chunks, history, file_list)
print("Assistant: ", end="", flush=True)
answer = stream_answer(client, prompt)
history.append({"role": "user", "content": question})
history.append({"role": "assistant", "content": answer})
def main():
if len(sys.argv) < 2:
print("Usage: python app.py <folder>")
sys.exit(1)
folder = sys.argv[1]
cache_path = folder.rstrip("/\\") + ".cache.json"
client = create_client()
cached = load_embeddings(cache_path)
if cached:
chunks, embeddings = cached
print(f"Loaded cache from {cache_path}")
else:
print(f"Indexing {folder}...")
chunks = index_folder(folder)
texts = [chunk["text"] for chunk in chunks]
file_count = len(set(chunk["source"] for chunk in chunks))
print(f"Indexed {len(chunks)} chunks from {file_count} files.")
embeddings = embed_all_chunks(client, texts)
save_embeddings(chunks, embeddings, cache_path)
print(f"Cache saved to {cache_path}")
chat_loop(client, chunks, embeddings)
if __name__ == "__main__":
main()
Interactive Code Editor
Sign in to write and run code, track your progress, and unlock all chapters.
Sign In to Start Coding