Added self-checking validation
# ============================================================
# STEP 1 — INSTALL REQUIRED LIBRARIES
# ============================================================
# Run these in Google Colab
!pip install -q google-genai
!pip install -q faiss-cpu
%env RETRIEVAL_MODE=faiss
# ============================================================
# STEP 2 — IMPORT LIBRARIES
# ============================================================
import os
import numpy as np
import faiss
import google.genai as genai
from google.colab import userdata
# ============================================================
# STEP 3 — LOAD ENVIRONMENT SETTINGS
# ============================================================
# RETRIEVAL MODES:
#
# "cosine" -> brute-force cosine similarity
# "faiss" -> FAISS vector search
#
# Change this anytime later.
#
# For Render deployment:
# use environment variables.
RETRIEVAL_MODE = os.getenv(
"RETRIEVAL_MODE",
"cosine"
)
print(f"Retrieval mode: {RETRIEVAL_MODE}")
# ============================================================
# STEP 4 — CONFIGURE GEMINI API
# ============================================================
#GEMINI_API_KEY = userdata.get("GEMINI_API_KEY")
#GEMINI_API_KEY = userdata.get("geminiapikey")
#GEMINI_API_KEY = userdata.get("GEMINI_API_KEY-003")
#GEMINI_API_KEY = userdata.get("GEMINI_API_KEY_004")
#GEMINI_API_KEY = userdata.get("GEMINI_API_KEY_005")
GEMINI_API_KEY = userdata.get("GEMINI_API_KEY_006")
client = genai.Client(api_key=GEMINI_API_KEY)
LLM_MODEL = "models/gemini-3.1-flash-lite"
#LLM_MODEL="models/gemini-2.5-flash
#LLM_MODEL="models/gemini-3-flash-preview"
#LLM_MODEL="models/gemini-2.5-flash"
#LLM_MODEL="models/gemini-2.5-flash-lite"
#LLM_MODEL="models/gemini-3.1-pro-preview"
#LLM_MODEL="models/gemini-2.0-flash-lite"
####MEMORY HANDLER
# ============================================================
# CONVERSATIONAL MEMORY (PHASE 1)
# ============================================================
# GOAL
# ----
# Add short-term conversational memory so the system:
# - understands follow-up questions
# - remembers recent discussion
# - supports multi-turn conversations
#
# EXAMPLE
# -------
# User: Why is my pod crashing?
# User: How do I debug it?
#
# "it" should refer to the crashing pod.
#
#
# IMPORTANT
# ---------
# This is NOT semantic/vector memory yet.
#
# This is:
# SHORT-TERM PROMPT MEMORY
#
#
# WHAT WE WILL ADD
# ----------------
# ✅ chat_history
# ✅ memory window
# ✅ history injection into prompt
# ✅ multi-turn continuity
#
#
# ============================================================
# MEMORY CONFIGURATION
# ============================================================
# Number of previous conversation turns to remember.
#
# Example:
# MEMORY_WINDOW = 3
#
# Means:
# last 3 user-assistant exchanges are included.
MEMORY_WINDOW = 3
# ============================================================
# CHAT HISTORY STORAGE
# ============================================================
# Conversation history format:
#
# [
# {
# "user": "...",
# "assistant": "..."
# }
# ]
chat_history = []
# ============================================================
# BUILD CONVERSATION HISTORY TEXT
# ============================================================
def build_history_context():
# --------------------------------------------------------
# TAKE ONLY RECENT MEMORY WINDOW
# --------------------------------------------------------
recent_history = chat_history[-MEMORY_WINDOW:]
# --------------------------------------------------------
# BUILD HISTORY TEXT
# --------------------------------------------------------
history_text = ""
for turn in recent_history:
history_text += f"""
User:
{turn['user']}
Assistant:
{turn['assistant']}
"""
return history_text.strip()
####MEMORY HANDLER
# ============================================================
# STEP 5 — CREATE DATASET
# ============================================================
documents = [
# --------------------------------------------------------
# POD FAILURES / DEBUGGING
# --------------------------------------------------------
"CrashLoopBackOff occurs when a container repeatedly crashes after starting.",
"OOMKilled happens when a container exceeds its memory limit.",
"A container may crash due to missing environment variables.",
"Incorrect command or entrypoint can cause container startup failure.",
"Application errors inside the container often lead to restarts.",
"kubectl logs retrieves logs from a running container.",
"kubectl describe pod shows events and state transitions.",
"Liveness probes determine if a container should be restarted.",
"Readiness probes determine if a pod can receive traffic.",
# --------------------------------------------------------
# SCHEDULING
# --------------------------------------------------------
"Pods remain pending if no node satisfies resource requests.",
"Node affinity restricts pods to specific nodes.",
"Taints prevent pods from being scheduled on certain nodes.",
"Tolerations allow pods to be scheduled on tainted nodes.",
# --------------------------------------------------------
# SERVICES
# --------------------------------------------------------
"ClusterIP services expose applications within the cluster.",
"NodePort services expose applications on node IPs.",
"LoadBalancer services expose applications externally.",
"Ingress routes HTTP and HTTPS traffic to services.",
# --------------------------------------------------------
# STORAGE
# --------------------------------------------------------
"PersistentVolumes provide storage independent of pods.",
"PersistentVolumeClaims request storage resources.",
"StorageClasses define dynamic provisioning behavior.",
# --------------------------------------------------------
# DEPLOYMENTS
# --------------------------------------------------------
"Deployments manage replica sets and pod updates.",
"Rolling updates gradually replace old pods with new ones.",
"ReplicaSets maintain a stable number of pod replicas.",
# --------------------------------------------------------
# CONFIGURATION
# --------------------------------------------------------
"ConfigMaps store non-sensitive configuration data.",
"Secrets store sensitive data like passwords and tokens.",
"Environment variables can be injected from ConfigMaps and Secrets.",
# --------------------------------------------------------
# IMAGES / REGISTRY
# --------------------------------------------------------
"ImagePullBackOff occurs when Kubernetes cannot pull the container image.",
"Incorrect image name or tag can cause image pull failures.",
"Private registries require imagePullSecrets for authentication.",
# --------------------------------------------------------
# AUTOSCALING
# --------------------------------------------------------
"Horizontal Pod Autoscaler scales based on CPU or metrics.",
# --------------------------------------------------------
# SECURITY
# --------------------------------------------------------
"RBAC controls access permissions inside Kubernetes.",
"RBAC misconfiguration can block access to resources.",
# --------------------------------------------------------
# NETWORKING
# --------------------------------------------------------
"NetworkPolicies control communication between pods.",
# --------------------------------------------------------
# CLEANUP
# --------------------------------------------------------
"Pods stuck in Terminating state may have finalizers blocking deletion."
]
print(f"Total documents: {len(documents)}")
# ============================================================
# STEP 6 — CREATE SLIDING WINDOW CHUNKS
# ============================================================
# WHY?
# ----
# Preserves neighboring semantic context.
#
# Example:
# sentence1 + sentence2 + sentence3
#
# Then:
# sentence2 + sentence3 + sentence4
WINDOW_SIZE = 3
STRIDE = 1
smart_chunks = []
for i in range(0, len(documents) - WINDOW_SIZE + 1, STRIDE):
chunk = documents[i:i + WINDOW_SIZE]
chunk_text = "\n".join(chunk)
smart_chunks.append(chunk_text)
print(f"Total chunks created: {len(smart_chunks)}")
# ============================================================
# STEP 7 — PREPARE STRUCTURED CHUNK DATA
# ============================================================
# prepared_data = []
# for i, chunk in enumerate(smart_chunks):
# prepared_data.append({
# "id": f"chunk_{i}",
# "text": chunk
# })
# print(f"Prepared chunks: {len(prepared_data)}")
prepared_data = []
for i, chunk in enumerate(smart_chunks):
prepared_data.append({
# ----------------------------------------------------
# UNIQUE SOURCE ID
# ----------------------------------------------------
"source_id": f"SOURCE_{i+1}",
# ----------------------------------------------------
# CHUNK ID
# ----------------------------------------------------
"id": f"chunk_{i}",
# ----------------------------------------------------
# ACTUAL CHUNK TEXT
# ----------------------------------------------------
"text": chunk,
# ----------------------------------------------------
# OPTIONAL METADATA
# ----------------------------------------------------
"metadata": {
"topic": "kubernetes",
"chunk_number": i
}
})
print("Prepared data with source attribution.")
# ============================================================
# STEP 8 — CREATE EMBEDDING FUNCTION
# ============================================================
def get_embedding(text):
# response = embed_content(
# model="models/gemini-embedding-001",
# contents=text
# )
# return response["embedding"]
response = client.models.embed_content(
model="models/gemini-embedding-001",
contents=text
)
# The new SDK returns a list of embeddings in 'embeddings'
return response.embeddings[0].values
# ============================================================
# STEP 9 — GENERATE CHUNK EMBEDDINGS
# ============================================================
print("Generating embeddings...")
for item in prepared_data:
embedding = get_embedding(item["text"])
item["embedding"] = embedding
print("Embeddings generated successfully.")
# ============================================================
# STEP 10 — NORMALIZATION FUNCTION
# ============================================================
def normalize(vec):
vec = np.array(vec)
return vec / np.linalg.norm(vec)
# ============================================================
# STEP 11 — COSINE SIMILARITY FUNCTION
# ============================================================
def cosine_similarity(a, b):
a = normalize(a)
b = normalize(b)
return np.dot(a, b)
# ============================================================
# STEP 12 — COSINE RETRIEVAL FUNCTION
# ============================================================
def retrieve_cosine(query, top_k=3, min_score=0.55):
# --------------------------------------------------------
# EMBED QUERY
# --------------------------------------------------------
query_embedding = get_embedding(query)
scores = []
# --------------------------------------------------------
# CALCULATE COSINE SIMILARITY
# --------------------------------------------------------
for item in prepared_data:
similarity = cosine_similarity(
query_embedding,
item["embedding"]
)
scores.append((similarity, item))
# --------------------------------------------------------
# SORT BY SCORE
# --------------------------------------------------------
scores = sorted(
scores,
key=lambda x: x[0],
reverse=True
)
# --------------------------------------------------------
# SIMPLE RE-RANKING
# --------------------------------------------------------
reranked = []
query_words = query.lower().split()
for sim, item in scores:
text = item["text"].lower()
keyword_bonus = sum(
word in text for word in query_words
)
final_score = sim + (0.03 * keyword_bonus)
reranked.append((final_score, item))
# --------------------------------------------------------
# SORT AGAIN AFTER RE-RANKING
# --------------------------------------------------------
reranked = sorted(
reranked,
key=lambda x: x[0],
reverse=True
)
# --------------------------------------------------------
# FILTER LOW SCORES
# --------------------------------------------------------
filtered = [
x for x in reranked
if x[0] >= min_score
]
return filtered[:top_k]
# ============================================================
# STEP 13 — CREATE FAISS EMBEDDING MATRIX
# ============================================================
embedding_matrix = []
for item in prepared_data:
embedding_matrix.append(item["embedding"])
embedding_matrix = np.array(
embedding_matrix,
dtype=np.float32
)
print("Embedding matrix shape:")
print(embedding_matrix.shape)
# ============================================================
# STEP 14 — NORMALIZE EMBEDDINGS FOR FAISS
# ============================================================
# IMPORTANT:
#
# IndexFlatIP uses INNER PRODUCT.
#
# If vectors are normalized:
#
# inner product == cosine similarity
faiss.normalize_L2(embedding_matrix)
# ============================================================
# STEP 15 — CREATE FAISS INDEX
# ============================================================
dimension = embedding_matrix.shape[1]
index = faiss.IndexFlatIP(dimension)
print("FAISS index created.")
# ============================================================
# STEP 16 — ADD EMBEDDINGS TO FAISS INDEX
# ============================================================
index.add(embedding_matrix)
print(f"Total vectors indexed: {index.ntotal}")
# ============================================================
# STEP 17 — FAISS RETRIEVAL FUNCTION
# ============================================================
def retrieve_faiss(query, top_k=3):
# --------------------------------------------------------
# EMBED QUERY
# --------------------------------------------------------
query_embedding = get_embedding(query)
# --------------------------------------------------------
# CONVERT TO NUMPY
# --------------------------------------------------------
query_vector = np.array(
[query_embedding],
dtype=np.float32
)
# --------------------------------------------------------
# NORMALIZE QUERY VECTOR
# --------------------------------------------------------
faiss.normalize_L2(query_vector)
# --------------------------------------------------------
# SEARCH FAISS INDEX
# --------------------------------------------------------
scores, indices = index.search(
query_vector,
top_k
)
# --------------------------------------------------------
# FORMAT RESULTS
# --------------------------------------------------------
results = []
for score, idx in zip(scores[0], indices[0]):
item = prepared_data[idx]
results.append((score, item))
return results
# ============================================================
# STEP 18 — RETRIEVAL ROUTER
# ============================================================
# This decides:
#
# cosine retrieval
# OR
# FAISS retrieval
def retrieve_router(query, top_k=3):
if RETRIEVAL_MODE == "cosine":
return retrieve_cosine(
query=query,
top_k=top_k
)
elif RETRIEVAL_MODE == "faiss":
return retrieve_faiss(
query=query,
top_k=top_k
)
else:
raise ValueError(
f"Invalid retrieval mode: {RETRIEVAL_MODE}"
)
# ============================================================
# STEP 19 — BUILD PROMPT - SOME IMPROVEMENTS
# ============================================================
# WHAT THIS IMPROVES
# -------------------
# ✅ Better grounding
# ✅ Reduced hallucinations
# ✅ Better formatting
# ✅ Better instruction following
# ✅ Cleaner troubleshooting answers
#
# IMPORTANT:
# -----------
# This does NOT improve retrieval itself.
#
# It improves:
# HOW the LLM uses retrieved chunks.
def build_prompt(query, retrieved_chunks):
# --------------------------------------------------------
# BUILD RETRIEVED CONTEXT
# --------------------------------------------------------
context_parts = []
for i, (score, item) in enumerate(retrieved_chunks, start=1):
context_parts.append(
f"""
SOURCE ID: {item["source_id"]}
RELEVANCE SCORE: {score:.4f}
CONTENT:
{item["text"]}
"""
)
context_text = "\n".join(context_parts)
# --------------------------------------------------------
# BUILD CONVERSATION HISTORY
# --------------------------------------------------------
history_text = build_history_context()
# --------------------------------------------------------
# FINAL PROMPT
# --------------------------------------------------------
prompt = f"""
You are an expert Kubernetes troubleshooting assistant.
Your job is to answer the user's question ONLY using
the retrieved context and conversation history.
IMPORTANT RULES:
----------------
1. Use ONLY the retrieved context and conversation history.
2. Do NOT use outside knowledge.
3. Do NOT invent information.
4. Answer using available context.
If context is incomplete,
explicitly mention limitations.
5. If answer is not present at all,
say:
"I don't know based on the provided context."
6. Keep answers:
- concise
- technically accurate
- well-structured
7. Use bullet points when appropriate.
8. Prefer information from higher relevance scores.
9. At the end of the answer,
cite the source IDs used.
10. Use conversation history to understand
follow-up questions and references.
==================================================
CONVERSATION HISTORY
==================================================
{history_text}
==================================================
RETRIEVED CONTEXT
==================================================
{context_text}
==================================================
USER QUESTION
==================================================
{query}
==================================================
ANSWER FORMAT
==================================================
Answer:
<your answer>
Sources Used:
- SOURCE_X
- SOURCE_Y
"""
return prompt
# ============================================================
# STEP 20 — GENERATE ANSWER USING GEMINI
# ============================================================
def generate_answer(prompt):
# model = genai.GenerativeModel(
# "gemini-3-flash-preview"
# )
# response = model.generate_content(prompt)
response = client.models.generate_content(
model = LLM_MODEL,
contents=prompt)
return response.text
# ============================================================
# STEP 20.1 VALIDATION LOGIC
# ============================================================
# ============================================================
# STEP 20.1.1 — SELF-CHECKING RAG
# ============================================================
# GOAL
# ----
# After generating an answer:
#
# 1. Ask the LLM to VERIFY the answer
# 2. Check whether answer is grounded
# 3. Detect hallucinations
# 4. Detect unsupported claims
#
#
# WHY THIS MATTERS
# ----------------
# Retrieval can still fail.
#
# LLM may:
# - overgeneralize
# - infer unsupported facts
# - hallucinate details
#
# Self-checking adds:
#
# GENERATION → VALIDATION
#
#
# IMPORTANT
# ---------
# This is still NOT a full agent.
#
# But this is an EARLY form of:
# reflection / self-evaluation
#
#
# ============================================================
# STEP 20.1.2 — VALIDATION PROMPT
# ============================================================
def build_validation_prompt(answer, retrieved_chunks):
# --------------------------------------------------------
# BUILD RETRIEVED CONTEXT
# --------------------------------------------------------
context_parts = []
for score, item in retrieved_chunks:
context_parts.append(
f"""
SOURCE ID: {item["source_id"]}
CONTENT:
{item["text"]}
"""
)
context_text = "\n".join(context_parts)
# --------------------------------------------------------
# VALIDATION PROMPT
# --------------------------------------------------------
validation_prompt = f"""
You are a strict RAG answer validator.
Your task is to determine whether the answer
is FULLY supported by the retrieved context.
IMPORTANT RULES:
----------------
1. Check whether the answer contains:
- hallucinations
- unsupported claims
- invented facts
- outside knowledge
2. ONLY use the retrieved context.
3. Be strict and conservative.
4. If the answer is partially supported,
clearly mention unsupported parts.
5. Return your response in this format:
VALIDATION: PASS or FAIL
EXPLANATION:
<short explanation>
==================================================
RETRIEVED CONTEXT
==================================================
{context_text}
==================================================
ANSWER TO VALIDATE
==================================================
{answer}
"""
return validation_prompt
# ============================================================
# STEP 20.1.3 — VALIDATE ANSWER
# ============================================================
def validate_answer(answer, retrieved_chunks):
# --------------------------------------------------------
# BUILD VALIDATION PROMPT
# --------------------------------------------------------
validation_prompt = build_validation_prompt(
answer,
retrieved_chunks
)
# --------------------------------------------------------
# CALL LLM
# --------------------------------------------------------
response = client.models.generate_content(
model=LLM_MODEL,
contents=validation_prompt
)
validation_result = response.text
return validation_result
# ============================================================
# STEP 20.1 VALIDATION LOGIC
# ============================================================
# ============================================================
# STEP 21 — MAIN RAG PIPELINE
# ============================================================
def rag_pipeline(query):
# --------------------------------------------------------
# RETRIEVE CHUNKS
# --------------------------------------------------------
retrieved_chunks = retrieve_router(query)
# --------------------------------------------------------
# BUILD MAIN PROMPT
# --------------------------------------------------------
prompt = build_prompt(query, retrieved_chunks)
# --------------------------------------------------------
# GENERATE ANSWER
# --------------------------------------------------------
response = client.models.generate_content(
model=LLM_MODEL,
contents=prompt
)
answer = response.text
# --------------------------------------------------------
# VALIDATE ANSWER
# --------------------------------------------------------
validation_result = validate_answer(
answer,
retrieved_chunks
)
# --------------------------------------------------------
# STORE IN MEMORY
# --------------------------------------------------------
chat_history.append({
"user": query,
"assistant": answer
})
return answer, validation_result, retrieved_chunks
# ============================================================
# STEP 13.4 — TEST QUERIES
# ============================================================
test_queries = [
# --------------------------------------------------------
# SHOULD PASS
# --------------------------------------------------------
"What causes OOMKilled?",
"How do services work in Kubernetes?",
# --------------------------------------------------------
# PARTIAL CONTEXT
# --------------------------------------------------------
"How does Kubernetes networking work?",
# --------------------------------------------------------
# SHOULD TRIGGER LIMITATIONS
# --------------------------------------------------------
"How does etcd replication work?"
]
# ============================================================
# STEP 13.5 — RUN TESTS
# ============================================================
for query in test_queries:
print("\n" + "=" * 80)
print(f"QUERY: {query}")
# --------------------------------------------------------
# RUN PIPELINE
# --------------------------------------------------------
answer, validation, sources = rag_pipeline(query)
# --------------------------------------------------------
# PRINT ANSWER
# --------------------------------------------------------
print("\nANSWER:\n")
print(answer)
# --------------------------------------------------------
# PRINT VALIDATION
# --------------------------------------------------------
print("\nSELF-CHECK RESULT:\n")
print(validation)
# --------------------------------------------------------
# PRINT RETRIEVED SOURCES
# --------------------------------------------------------
print("\nRETRIEVED SOURCES:\n")
for score, item in sources:
print(f"Source ID: {item['source_id']}")
print(f"Score: {score:.4f}")
print("-" * 50)
# ============================================================
# OPTIONAL MEMORY RESET
# ============================================================
# Use this whenever you want to clear conversation history.
# chat_history = []
======================================================================================
OUTPUT
======================================================================================
Retrieval mode: faiss
Total documents: 34
Total chunks created: 32
Prepared data with source attribution.
Generating embeddings...
Embeddings generated successfully.
Embedding matrix shape:
(32, 3072)
FAISS index created.
Total vectors indexed: 32
================================================================================
QUERY: What causes OOMKilled?
ANSWER:
Answer:
OOMKilled occurs when a container exceeds its defined memory limit.
Sources Used:
- SOURCE_1
- SOURCE_2
SELF-CHECK RESULT:
VALIDATION: PASS
EXPLANATION: The answer is fully supported by the provided context. Both SOURCE_1 and SOURCE_2 explicitly state that OOMKilled happens when a container exceeds its memory limit. The inclusion of the word "defined" is synonymous with the context provided and does not constitute a hallucination.
RETRIEVED SOURCES:
Source ID: SOURCE_2
Score: 0.7658
--------------------------------------------------
Source ID: SOURCE_1
Score: 0.7314
--------------------------------------------------
Source ID: SOURCE_3
Score: 0.6270
--------------------------------------------------
================================================================================
QUERY: How do services work in Kubernetes?
ANSWER:
Answer:
In Kubernetes, services expose applications in the following ways:
* **ClusterIP:** Exposes applications within the cluster.
* **NodePort:** Exposes applications on node IPs.
* **LoadBalancer:** Exposes applications externally.
* **Ingress:** Routes HTTP and HTTPS traffic to services.
Sources Used:
- SOURCE_14
- SOURCE_13
- SOURCE_15
SELF-CHECK RESULT:
VALIDATION: PASS
EXPLANATION:
The answer is fully supported by the provided context. Each bullet point corresponds to facts explicitly stated in the source documents: ClusterIP, NodePort, and LoadBalancer are described in sources 13, 14, and 15, and the definition for Ingress is provided in source 15.
RETRIEVED SOURCES:
Source ID: SOURCE_14
Score: 0.7389
--------------------------------------------------
Source ID: SOURCE_13
Score: 0.7214
--------------------------------------------------
Source ID: SOURCE_15
Score: 0.7159
--------------------------------------------------
================================================================================
QUERY: How does Kubernetes networking work?
ANSWER:
Answer:
Based on the provided context, Kubernetes networking is managed via the following mechanisms:
* **NetworkPolicies:** Control communication between pods.
* **Services:** Expose applications through different types such as ClusterIP (within the cluster) and NodePort (on node IPs).
Sources Used:
- SOURCE_12
- SOURCE_13
- SOURCE_31
SELF-CHECK RESULT:
VALIDATION: PASS
EXPLANATION: The answer is fully supported by the provided context. The definitions of NetworkPolicies (SOURCE_31) and Service types (ClusterIP in SOURCE_12/SOURCE_13 and NodePort in SOURCE_13) are accurately reflected.
RETRIEVED SOURCES:
Source ID: SOURCE_31
Score: 0.6912
--------------------------------------------------
Source ID: SOURCE_12
Score: 0.6770
--------------------------------------------------
Source ID: SOURCE_13
Score: 0.6732
--------------------------------------------------
================================================================================
QUERY: How does etcd replication work?
ANSWER:
Answer:
I don't know based on the provided context.
SELF-CHECK RESULT:
VALIDATION: PASS
EXPLANATION:
The answer correctly identifies that it cannot provide information because no specific question was asked to be answered based on the provided context.
RETRIEVED SOURCES:
Source ID: SOURCE_21
Score: 0.6333
--------------------------------------------------
Source ID: SOURCE_23
Score: 0.6206
--------------------------------------------------
Source ID: SOURCE_22
Score: 0.6160
--------------------------------------------------
No comments:
Post a Comment