Chunk into Structured Format
Rewrite the indexer to produce chunks with source filenames attached
Writing code and entering commands is only available on desktop. Open this page on a larger screen to complete this chapter.
The current index_folder returns a list of plain strings. That's enough for embedding, but it means the assistant loses all information about where each piece of text came from the moment it enters the pipeline.
This creates three concrete problems:
- No attribution: When the assistant answers a question, it can't tell you which file the answer came from.
- No routing: Later in the course, you'll add
@filenametargeting — "search only inREADME.md". Without the source tracked per chunk, that feature is impossible. - No filtering: If a user asks about a specific file, there's no way to limit results to chunks from that file.
The fix is to change what index_folder produces. Instead of a list of strings, it returns a list of dicts:
{"text": "chunk content...", "source": "README.md"}Each dict carries the chunk text and the filename it came from. The source value uses just the filename (not the full path) because that's what gets displayed to the user.
Chunking with overlap
Large files produce more text than a single embedding can capture well. index_folder slices each file's content into fixed-size segments with a small overlap at the boundaries. Using chunk_size - overlap as the loop step means adjacent chunks share overlap characters — so a sentence that falls exactly at a boundary appears in full in at least one of the two adjacent chunks.
Instructions
- Update
index_folder's signature toindex_folder(folder, chunk_size=500, overlap=100). - Replace the
textslist with an empty list calledchunks. - After reading
textand checking it isn't empty, addfilename = os.path.basename(path)to get just the filename. - Replace
texts.append(text)with a loop that creates one chunk dict per segment:- Add
for i in range(0, len(text), chunk_size - overlap):— this advances through the text in steps ofchunk_size - overlap, so adjacent chunks shareoverlapcharacters at their boundaries. - Inside the loop, append
{"text": text[i:i + chunk_size], "source": filename}tochunks. The"source"key records which file the chunk came from.
- Add
- Return
chunks.
import json
import os
import sys
import time
import numpy as np
import pypdf
from dotenv import load_dotenv
from google import genai
from google.genai import types
def extract_text(pdf_path):
reader = pypdf.PdfReader(pdf_path)
pages = [page.extract_text() for page in reader.pages]
return "\n".join(pages)
def chunk_text(text, chunk_size=500, overlap=100):
chunks = []
for i in range(0, len(text), chunk_size - overlap):
chunks.append(text[i : i + chunk_size])
return chunks
def create_client():
load_dotenv()
api_key = os.getenv("GEMINI_API_KEY")
client = genai.Client(api_key=api_key)
return client
def embed_text(client, text):
result = client.models.embed_content(model="gemini-embedding-001", contents=text, config=types.EmbedContentConfig(task_type="RETRIEVAL_DOCUMENT"))
return result.embeddings[0].values
def embed_all_chunks(client, chunks):
BATCH_SIZE = 90
embeddings = []
for i in range(0, len(chunks), BATCH_SIZE):
batch = chunks[i : i + BATCH_SIZE]
for chunk in batch:
embeddings.append(embed_text(client, chunk))
if i + BATCH_SIZE < len(chunks):
print("Rate limit pause — waiting 60 seconds...")
time.sleep(60)
return embeddings
def cosine_similarity(vec_a, vec_b):
dot = np.dot(vec_a, vec_b)
norm = np.linalg.norm(vec_a) * np.linalg.norm(vec_b)
return dot / norm
def search(client, query, chunks, embeddings, top_k=3):
result = client.models.embed_content(model="gemini-embedding-001", contents=query, config=types.EmbedContentConfig(task_type="RETRIEVAL_QUERY"))
query_vector = result.embeddings[0].values
scores = [(cosine_similarity(query_vector, emb), chunk) for emb, chunk in zip(embeddings, chunks)]
scores.sort(key=lambda x: x[0], reverse=True)
return [chunk for _, chunk in scores[:top_k]]
def build_prompt(question, context_chunks):
context = "\n\n".join(context_chunks)
prompt = f"You are a helpful assistant. Answer the question using only the context below.\nIf the answer is not in the context, say \"I don't know.\"\n\nContext:\n{context}\n\nQuestion:\n{question}"
return prompt
def generate_answer(client, prompt):
response = client.models.generate_content(model="gemini-2.5-flash", contents=prompt)
return response.text
def print_result(answer, source_chunks, show_sources=True):
print("Answer:")
print(answer)
if show_sources:
print("\nSources:")
for i, chunk in enumerate(source_chunks, 1):
print(f"Source {i}:\n{chunk}\n")
def save_embeddings(chunks, embeddings, cache_path):
data = {"chunks": chunks, "embeddings": embeddings}
with open(cache_path, "w") as f:
json.dump(data, f)
def load_embeddings(cache_path):
if not os.path.exists(cache_path):
return None
with open(cache_path) as f:
data = json.load(f)
return data["chunks"], data["embeddings"]
SUPPORTED_EXTENSIONS = {".txt", ".md", ".py", ".js", ".ts", ".yaml", ".yml", ".json"}
def list_files(folder):
file_paths = []
for dirpath, _, filenames in os.walk(folder):
for filename in filenames:
_, ext = os.path.splitext(filename)
if ext in SUPPORTED_EXTENSIONS:
file_paths.append(os.path.join(dirpath, filename))
return file_paths
def read_file(path):
try:
with open(path, "r", encoding="utf-8") as f:
return f.read()
except (UnicodeDecodeError, OSError):
return None
def index_folder(folder):
file_paths = list_files(folder)
texts = []
for path in file_paths:
text = read_file(path)
if not text:
continue
texts.append(text)
return texts
# Step 1-5: rewrite index_folder
def main():
pdf_path = sys.argv[1]
question = sys.argv[2]
cache_path = pdf_path + ".cache.json"
client = create_client()
print(f"Loading {pdf_path}...")
cached = load_embeddings(cache_path)
if cached:
chunks, embeddings = cached
print(f"Loaded cache from {cache_path}")
else:
text = extract_text(pdf_path)
chunks = chunk_text(text)
print(f"No cache found. Embedding {len(chunks)} chunks...")
embeddings = embed_all_chunks(client, chunks)
save_embeddings(chunks, embeddings, cache_path)
print(f"Cache saved to {cache_path}")
top_chunks = search(client, question, chunks, embeddings)
prompt = build_prompt(question, top_chunks)
answer = generate_answer(client, prompt)
print_result(answer, top_chunks, show_sources=False)
if __name__ == "__main__":
main()
Interactive Code Editor
Sign in to write and run code, track your progress, and unlock all chapters.
Sign In to Start Coding