Read a File's Content
Add a function that reads any text file and silently skips files it can't read
Writing code and entering commands is only available on desktop. Open this page on a larger screen to complete this chapter.
Before the assistant can index a file, it needs to read the file's text. Most files in a typical project folder open without issue — but two categories of files will cause the program to crash if you don't handle them explicitly:
- Binary files: Images, compiled binaries, and similar files contain bytes that aren't valid UTF-8. Reading them with a text decoder raises
UnicodeDecodeError. - Locked or inaccessible files: Files that are open by another process, or that you don't have permission to read, raise
OSError.
Rather than stopping the entire indexing run when one file fails, read_file catches both errors and returns None. The caller — index_folder, which you'll write in the next chapter — checks for None and skips that file with continue.
This pattern keeps the assistant resilient: a single unreadable file in a folder of 200 won't prevent the other 199 from being indexed.
Instructions
- Define a function called
read_filethat takespath. - Inside a
tryblock, openpathwithopen(path, "r", encoding="utf-8"). Return the file's full text using.read(). - Add an
except (UnicodeDecodeError, OSError):clause that returnsNone.
import json
import os
import sys
import time
import numpy as np
import pypdf
from dotenv import load_dotenv
from google import genai
from google.genai import types
def extract_text(pdf_path):
reader = pypdf.PdfReader(pdf_path)
pages = [page.extract_text() for page in reader.pages]
return "\n".join(pages)
def chunk_text(text, chunk_size=500, overlap=100):
chunks = []
for i in range(0, len(text), chunk_size - overlap):
chunks.append(text[i : i + chunk_size])
return chunks
def create_client():
load_dotenv()
api_key = os.getenv("GEMINI_API_KEY")
client = genai.Client(api_key=api_key)
return client
def embed_text(client, text):
result = client.models.embed_content(model="gemini-embedding-001", contents=text, config=types.EmbedContentConfig(task_type="RETRIEVAL_DOCUMENT"))
return result.embeddings[0].values
def embed_all_chunks(client, chunks):
BATCH_SIZE = 90
embeddings = []
for i in range(0, len(chunks), BATCH_SIZE):
batch = chunks[i : i + BATCH_SIZE]
for chunk in batch:
embeddings.append(embed_text(client, chunk))
if i + BATCH_SIZE < len(chunks):
print("Rate limit pause — waiting 60 seconds...")
time.sleep(60)
return embeddings
def cosine_similarity(vec_a, vec_b):
dot = np.dot(vec_a, vec_b)
norm = np.linalg.norm(vec_a) * np.linalg.norm(vec_b)
return dot / norm
def search(client, query, chunks, embeddings, top_k=3):
result = client.models.embed_content(model="gemini-embedding-001", contents=query, config=types.EmbedContentConfig(task_type="RETRIEVAL_QUERY"))
query_vector = result.embeddings[0].values
scores = [(cosine_similarity(query_vector, emb), chunk) for emb, chunk in zip(embeddings, chunks)]
scores.sort(key=lambda x: x[0], reverse=True)
return [chunk for _, chunk in scores[:top_k]]
def build_prompt(question, context_chunks):
context = "\n\n".join(context_chunks)
prompt = f"You are a helpful assistant. Answer the question using only the context below.\nIf the answer is not in the context, say \"I don't know.\"\n\nContext:\n{context}\n\nQuestion:\n{question}"
return prompt
def generate_answer(client, prompt):
response = client.models.generate_content(model="gemini-2.5-flash", contents=prompt)
return response.text
def print_result(answer, source_chunks, show_sources=True):
print("Answer:")
print(answer)
if show_sources:
print("\nSources:")
for i, chunk in enumerate(source_chunks, 1):
print(f"Source {i}:\n{chunk}\n")
def save_embeddings(chunks, embeddings, cache_path):
data = {"chunks": chunks, "embeddings": embeddings}
with open(cache_path, "w") as f:
json.dump(data, f)
def load_embeddings(cache_path):
if not os.path.exists(cache_path):
return None
with open(cache_path) as f:
data = json.load(f)
return data["chunks"], data["embeddings"]
SUPPORTED_EXTENSIONS = {".txt", ".md", ".py", ".js", ".ts", ".yaml", ".yml", ".json"}
def list_files(folder):
file_paths = []
for dirpath, _, filenames in os.walk(folder):
for filename in filenames:
_, ext = os.path.splitext(filename)
if ext in SUPPORTED_EXTENSIONS:
file_paths.append(os.path.join(dirpath, filename))
return file_paths
# Step 1-3: def read_file(path):
def main():
pdf_path = sys.argv[1]
question = sys.argv[2]
cache_path = pdf_path + ".cache.json"
client = create_client()
print(f"Loading {pdf_path}...")
cached = load_embeddings(cache_path)
if cached:
chunks, embeddings = cached
print(f"Loaded cache from {cache_path}")
else:
text = extract_text(pdf_path)
chunks = chunk_text(text)
print(f"No cache found. Embedding {len(chunks)} chunks...")
embeddings = embed_all_chunks(client, chunks)
save_embeddings(chunks, embeddings, cache_path)
print(f"Cache saved to {cache_path}")
top_chunks = search(client, question, chunks, embeddings)
prompt = build_prompt(question, top_chunks)
answer = generate_answer(client, prompt)
print_result(answer, top_chunks, show_sources=False)
if __name__ == "__main__":
main()
Interactive Code Editor
Sign in to write and run code, track your progress, and unlock all chapters.
Sign In to Start Coding