Clean Up and List Files
Remove development helpers and add a directory walker that filters by extension
Writing code and entering commands is only available on desktop. Open this page on a larger screen to complete this chapter.
The RAG app you built in the previous course included two helper functions — test_search and preview_chunks — that were useful during development. They let you inspect chunks and verify that search worked. The finished assistant doesn't need them, so this chapter removes them before adding new capabilities.
Walking a directory with os.walk
To index a folder of files, you need a way to list every file inside it — including files in subfolders. Python's os.walk does this:
for dirpath, _, filenames in os.walk(folder):
for filename in filenames:
full_path = os.path.join(dirpath, filename)os.walk yields a tuple for each directory it visits: the directory path, its subdirectories, and its files. The _ discards the subdirectory list — you don't need it because os.walk visits subdirectories automatically.
Filtering by extension
Not every file in a folder contains text the assistant can use. Binary files, images, and compiled artifacts would produce garbage if you tried to read them as text. A SUPPORTED_EXTENSIONS constant defines the exact set of extensions the assistant accepts:
SUPPORTED_EXTENSIONS = {".txt", ".md", ".py", ".js", ".ts", ".yaml", ".yml", ".json"}Using a set makes the membership check (ext in SUPPORTED_EXTENSIONS) fast regardless of how many extensions you add.
Instructions
- Delete the
preview_chunksfunction. - Delete the
test_searchfunction. - Define a module-level constant called
SUPPORTED_EXTENSIONS. Set it to a set containing these 8 strings:".txt",".md",".py",".js",".ts",".yaml",".yml",".json". The indexer checks this constant before reading any file. - Define a function called
list_filesthat takesfolder. Inside it, create an empty list calledfile_pathsto hold the matching file paths. - Add the outer loop
for dirpath, _, filenames in os.walk(folder):. This walks every subdirectory recursively. Inside that loop, addfor filename in filenames:to iterate over each file. - Inside the inner loop:
- Call
os.path.splitext(filename)and assign the result to_, extto extract the extension. - If
ext in SUPPORTED_EXTENSIONS, appendos.path.join(dirpath, filename)tofile_paths. This records the full path of every supported file found.
- Call
- Return
file_paths.
import json
import os
import sys
import time
import numpy as np
import pypdf
from dotenv import load_dotenv
from google import genai
from google.genai import types
def extract_text(pdf_path):
reader = pypdf.PdfReader(pdf_path)
pages = [page.extract_text() for page in reader.pages]
return "\n".join(pages)
def chunk_text(text, chunk_size=500, overlap=100):
chunks = []
for i in range(0, len(text), chunk_size - overlap):
chunks.append(text[i : i + chunk_size])
return chunks
# Step 1: delete this function
def preview_chunks(chunks):
print(f"Total chunks: {len(chunks)}")
print(f"First chunk:\n{chunks[0]}")
def create_client():
load_dotenv()
api_key = os.getenv("GEMINI_API_KEY")
client = genai.Client(api_key=api_key)
return client
def embed_text(client, text):
result = client.models.embed_content(model="gemini-embedding-001", contents=text, config=types.EmbedContentConfig(task_type="RETRIEVAL_DOCUMENT"))
return result.embeddings[0].values
def embed_all_chunks(client, chunks):
BATCH_SIZE = 90
embeddings = []
for i in range(0, len(chunks), BATCH_SIZE):
batch = chunks[i : i + BATCH_SIZE]
for chunk in batch:
embeddings.append(embed_text(client, chunk))
if i + BATCH_SIZE < len(chunks):
print("Rate limit pause — waiting 60 seconds...")
time.sleep(60)
return embeddings
def cosine_similarity(vec_a, vec_b):
dot = np.dot(vec_a, vec_b)
norm = np.linalg.norm(vec_a) * np.linalg.norm(vec_b)
return dot / norm
def search(client, query, chunks, embeddings, top_k=3):
result = client.models.embed_content(model="gemini-embedding-001", contents=query, config=types.EmbedContentConfig(task_type="RETRIEVAL_QUERY"))
query_vector = result.embeddings[0].values
scores = [(cosine_similarity(query_vector, emb), chunk) for emb, chunk in zip(embeddings, chunks)]
scores.sort(key=lambda x: x[0], reverse=True)
return [chunk for _, chunk in scores[:top_k]]
# Step 2: delete this function
def test_search(client, pdf_path, question):
text = extract_text(pdf_path)
chunks = chunk_text(text)
embeddings = embed_all_chunks(client, chunks)
results = search(client, question, chunks, embeddings)
for i, chunk in enumerate(results, 1):
print(f"Result {i}:\n{chunk}\n")
def build_prompt(question, context_chunks):
context = "\n\n".join(context_chunks)
prompt = f"You are a helpful assistant. Answer the question using only the context below.\nIf the answer is not in the context, say \"I don't know.\"\n\nContext:\n{context}\n\nQuestion:\n{question}"
return prompt
def generate_answer(client, prompt):
response = client.models.generate_content(model="gemini-2.5-flash", contents=prompt)
return response.text
def print_result(answer, source_chunks, show_sources=True):
print("Answer:")
print(answer)
if show_sources:
print("\nSources:")
for i, chunk in enumerate(source_chunks, 1):
print(f"Source {i}:\n{chunk}\n")
def save_embeddings(chunks, embeddings, cache_path):
data = {"chunks": chunks, "embeddings": embeddings}
with open(cache_path, "w") as f:
json.dump(data, f)
def load_embeddings(cache_path):
if not os.path.exists(cache_path):
return None
with open(cache_path) as f:
data = json.load(f)
return data["chunks"], data["embeddings"]
# Step 3: SUPPORTED_EXTENSIONS = ...
# Step 4-7: def list_files(folder):
def main():
pdf_path = sys.argv[1]
question = sys.argv[2]
cache_path = pdf_path + ".cache.json"
client = create_client()
print(f"Loading {pdf_path}...")
cached = load_embeddings(cache_path)
if cached:
chunks, embeddings = cached
print(f"Loaded cache from {cache_path}")
else:
text = extract_text(pdf_path)
chunks = chunk_text(text)
print(f"No cache found. Embedding {len(chunks)} chunks...")
embeddings = embed_all_chunks(client, chunks)
save_embeddings(chunks, embeddings, cache_path)
print(f"Cache saved to {cache_path}")
top_chunks = search(client, question, chunks, embeddings)
prompt = build_prompt(question, top_chunks)
answer = generate_answer(client, prompt)
print_result(answer, top_chunks, show_sources=False)
if __name__ == "__main__":
main()
Interactive Code Editor
Sign in to write and run code, track your progress, and unlock all chapters.
Sign In to Start Coding