Collect Text from All Files
Exit
Collect Text from All Files
Add the first part of the folder indexer: list files, read each one, and accumulate results
💻
Writing code and entering commands is only available on desktop. Open this page on a larger screen to complete this chapter.
With list_files and read_file in place, you can now write the function that ties them together: index_folder. This function is the entry point for turning a directory on disk into a set of text strings ready for embedding.
The indexer pattern
The pattern has three steps:
- List — call
list_files(folder)to get every supported file path. - Read — call
read_file(path)for each path. If it returnsNone, skip the file. - Collect — accumulate the successfully-read text strings into a list.
This chapter covers steps 1–3. The next chapter replaces the raw text accumulation with chunking, which splits each file's content into overlapping segments and records which file each segment came from.
Instructions
- Define a function called
index_folderthat takesfolder. - Call
list_files(folder)and store the result infile_paths. - Create an empty list called
texts. - Loop over
file_paths. For eachpath:- Call
read_file(path)and store the result intext. - If
not text, addcontinue—read_filereturnsNonefor files it cannot read (binary files, encoding errors). Skipping them prevents errors downstream. - Append
texttotexts.
- Call
- Return
texts.
import json
import os
import sys
import time
import numpy as np
import pypdf
from dotenv import load_dotenv
from google import genai
from google.genai import types
def extract_text(pdf_path):
reader = pypdf.PdfReader(pdf_path)
pages = [page.extract_text() for page in reader.pages]
return "\n".join(pages)
def chunk_text(text, chunk_size=500, overlap=100):
chunks = []
for i in range(0, len(text), chunk_size - overlap):
chunks.append(text[i : i + chunk_size])
return chunks
def create_client():
load_dotenv()
api_key = os.getenv("GEMINI_API_KEY")
client = genai.Client(api_key=api_key)
return client
def embed_text(client, text):
result = client.models.embed_content(model="gemini-embedding-001", contents=text, config=types.EmbedContentConfig(task_type="RETRIEVAL_DOCUMENT"))
return result.embeddings[0].values
def embed_all_chunks(client, chunks):
BATCH_SIZE = 90
embeddings = []
for i in range(0, len(chunks), BATCH_SIZE):
batch = chunks[i : i + BATCH_SIZE]
for chunk in batch:
embeddings.append(embed_text(client, chunk))
if i + BATCH_SIZE < len(chunks):
print("Rate limit pause — waiting 60 seconds...")
time.sleep(60)
return embeddings
def cosine_similarity(vec_a, vec_b):
dot = np.dot(vec_a, vec_b)
norm = np.linalg.norm(vec_a) * np.linalg.norm(vec_b)
return dot / norm
def search(client, query, chunks, embeddings, top_k=3):
result = client.models.embed_content(model="gemini-embedding-001", contents=query, config=types.EmbedContentConfig(task_type="RETRIEVAL_QUERY"))
query_vector = result.embeddings[0].values
scores = [(cosine_similarity(query_vector, emb), chunk) for emb, chunk in zip(embeddings, chunks)]
scores.sort(key=lambda x: x[0], reverse=True)
return [chunk for _, chunk in scores[:top_k]]
def build_prompt(question, context_chunks):
context = "\n\n".join(context_chunks)
prompt = f"You are a helpful assistant. Answer the question using only the context below.\nIf the answer is not in the context, say \"I don't know.\"\n\nContext:\n{context}\n\nQuestion:\n{question}"
return prompt
def generate_answer(client, prompt):
response = client.models.generate_content(model="gemini-2.5-flash", contents=prompt)
return response.text
def print_result(answer, source_chunks, show_sources=True):
print("Answer:")
print(answer)
if show_sources:
print("\nSources:")
for i, chunk in enumerate(source_chunks, 1):
print(f"Source {i}:\n{chunk}\n")
def save_embeddings(chunks, embeddings, cache_path):
data = {"chunks": chunks, "embeddings": embeddings}
with open(cache_path, "w") as f:
json.dump(data, f)
def load_embeddings(cache_path):
if not os.path.exists(cache_path):
return None
with open(cache_path) as f:
data = json.load(f)
return data["chunks"], data["embeddings"]
SUPPORTED_EXTENSIONS = {".txt", ".md", ".py", ".js", ".ts", ".yaml", ".yml", ".json"}
def list_files(folder):
file_paths = []
for dirpath, _, filenames in os.walk(folder):
for filename in filenames:
_, ext = os.path.splitext(filename)
if ext in SUPPORTED_EXTENSIONS:
file_paths.append(os.path.join(dirpath, filename))
return file_paths
def read_file(path):
try:
with open(path, "r", encoding="utf-8") as f:
return f.read()
except (UnicodeDecodeError, OSError):
return None
# Step 1-5: def index_folder(folder):
def main():
pdf_path = sys.argv[1]
question = sys.argv[2]
cache_path = pdf_path + ".cache.json"
client = create_client()
print(f"Loading {pdf_path}...")
cached = load_embeddings(cache_path)
if cached:
chunks, embeddings = cached
print(f"Loaded cache from {cache_path}")
else:
text = extract_text(pdf_path)
chunks = chunk_text(text)
print(f"No cache found. Embedding {len(chunks)} chunks...")
embeddings = embed_all_chunks(client, chunks)
save_embeddings(chunks, embeddings, cache_path)
print(f"Cache saved to {cache_path}")
top_chunks = search(client, question, chunks, embeddings)
prompt = build_prompt(question, top_chunks)
answer = generate_answer(client, prompt)
print_result(answer, top_chunks, show_sources=False)
if __name__ == "__main__":
main()
Interactive Code Editor
Sign in to write and run code, track your progress, and unlock all chapters.
Sign In to Start Coding