Large language models are powerful, but they don't know about your data — your internal docs, your codebase, your runbooks. Retrieval-Augmented Generation (RAG) bridges this gap by fetching relevant documents at query time and feeding them to the LLM as context.
Every RAG pipeline needs a document store — a place to keep the source material that gets searched, chunked, and fed to the model. S3-compatible object storage is the natural choice: it's cheap, durable, scales infinitely, and every tool in the AI ecosystem supports it natively.
This guide shows you how to build a production RAG pipeline using DanubeData's S3-compatible storage as the document layer, with a PostgreSQL vector database for embeddings — all hosted in Europe for GDPR compliance.
Architecture Overview
Here's what we're building:
┌─────────────────────────────────────────────────────────┐
│ Your Application │
│ │
│ User Query ──▶ Embed Query ──▶ Vector Search ──▶ LLM │
│ │ │ │
│ ▼ ▼ │
│ pgvector Claude/ │
│ (DanubeData GPT-4 │
│ PostgreSQL) │
│ ▲ │
│ │ │
│ S3 Bucket ──▶ Chunk ──▶ Embed ──▶ Store │
│ (DanubeData │
│ Object Storage) │
└─────────────────────────────────────────────────────────┘
The flow has two phases:
- Ingestion: Upload documents to S3 → chunk them → generate embeddings → store in pgvector
- Query: Embed the user's question → find similar chunks in pgvector → feed them to the LLM as context
Why S3 for the Document Layer?
- Separation of concerns: Raw documents live in S3, processed embeddings live in the vector DB. You can re-process documents anytime without re-uploading
- Versioning: S3 versioning tracks document changes — critical for knowing when to re-embed
- Scale: From 100 documents to 10 million, S3 handles it without architecture changes
- Ecosystem: LangChain, LlamaIndex, Haystack, and every other RAG framework has native S3 loaders
- Cost: DanubeData S3 includes 1 TB storage for €3.99/month — enough for millions of documents
Prerequisites
You'll need:
- DanubeData Object Storage bucket — for document storage (create one here)
- DanubeData PostgreSQL instance — for pgvector embeddings (create one here)
- Python 3.11+
- An LLM API key — Anthropic (Claude) or OpenAI
Install Dependencies
pip install boto3 langchain langchain-community langchain-anthropic
psycopg2-binary pgvector python-dotenv tiktoken
pypdf docx2txt unstructured
Step 1: Configure S3 and Database Connections
# config.py
import os
from dotenv import load_dotenv
load_dotenv()
# DanubeData S3
S3_ENDPOINT = "https://s3.danubedata.ro"
S3_ACCESS_KEY = os.getenv("DANUBEDATA_S3_ACCESS_KEY")
S3_SECRET_KEY = os.getenv("DANUBEDATA_S3_SECRET_KEY")
S3_BUCKET = "rag-knowledge-base"
S3_REGION = "fsn1"
# DanubeData PostgreSQL
DB_HOST = os.getenv("DANUBEDATA_DB_HOST") # e.g. db-xxxxx.danubedata.ro
DB_PORT = os.getenv("DANUBEDATA_DB_PORT", "5432")
DB_NAME = os.getenv("DANUBEDATA_DB_NAME", "rag")
DB_USER = os.getenv("DANUBEDATA_DB_USER")
DB_PASSWORD = os.getenv("DANUBEDATA_DB_PASSWORD")
# Anthropic
ANTHROPIC_API_KEY = os.getenv("ANTHROPIC_API_KEY")
Step 2: Upload Documents to S3
Organize your knowledge base in S3 with a clear folder structure:
# upload_docs.py
import boto3
from pathlib import Path
from config import S3_ENDPOINT, S3_ACCESS_KEY, S3_SECRET_KEY, S3_BUCKET
s3 = boto3.client(
"s3",
endpoint_url=S3_ENDPOINT,
aws_access_key_id=S3_ACCESS_KEY,
aws_secret_access_key=S3_SECRET_KEY,
region_name="fsn1",
)
def upload_directory(local_path: str, s3_prefix: str = ""):
"""Upload a directory of documents to S3."""
local = Path(local_path)
for file in local.rglob("*"):
if file.is_file() and file.suffix in (".pdf", ".md", ".txt", ".docx", ".html"):
key = f"{s3_prefix}/{file.relative_to(local)}" if s3_prefix else str(file.relative_to(local))
s3.upload_file(str(file), S3_BUCKET, key)
print(f"Uploaded: {key}")
# Upload your docs
upload_directory("./docs", s3_prefix="engineering")
upload_directory("./runbooks", s3_prefix="ops")
Your bucket structure will look like:
rag-knowledge-base/
├── engineering/
│ ├── api-design-guide.md
│ ├── architecture-decisions/
│ │ ├── adr-001-database-choice.md
│ │ └── adr-002-auth-strategy.md
│ └── onboarding.pdf
├── ops/
│ ├── incident-response.md
│ ├── deployment-checklist.md
│ └── monitoring-guide.md
└── product/
├── roadmap-2026.pdf
└── user-research-findings.docx
Step 3: Set Up pgvector
DanubeData PostgreSQL instances come with the pgvector extension available. Enable it and create your embeddings table:
# setup_db.py
import psycopg2
from config import DB_HOST, DB_PORT, DB_NAME, DB_USER, DB_PASSWORD
conn = psycopg2.connect(
host=DB_HOST, port=DB_PORT, dbname=DB_NAME,
user=DB_USER, password=DB_PASSWORD,
)
conn.autocommit = True
cur = conn.cursor()
# Enable pgvector
cur.execute("CREATE EXTENSION IF NOT EXISTS vector;")
# Create the embeddings table
cur.execute("""
CREATE TABLE IF NOT EXISTS document_chunks (
id BIGSERIAL PRIMARY KEY,
s3_key TEXT NOT NULL,
chunk_index INTEGER NOT NULL,
content TEXT NOT NULL,
metadata JSONB DEFAULT '{}'::jsonb,
embedding vector(1024),
created_at TIMESTAMPTZ DEFAULT NOW(),
UNIQUE(s3_key, chunk_index)
);
""")
# Create HNSW index for fast similarity search
cur.execute("""
CREATE INDEX IF NOT EXISTS idx_chunks_embedding
ON document_chunks
USING hnsw (embedding vector_cosine_ops)
WITH (m = 16, ef_construction = 64);
""")
print("Database ready.")
cur.close()
conn.close()
Step 4: Ingest Documents from S3
This is the core of the pipeline — read documents from S3, split them into chunks, generate embeddings, and store them in pgvector.
# ingest.py
import boto3
import anthropic
import psycopg2
import json
from io import BytesIO
from config import *
# Initialize clients
s3 = boto3.client(
"s3",
endpoint_url=S3_ENDPOINT,
aws_access_key_id=S3_ACCESS_KEY,
aws_secret_access_key=S3_SECRET_KEY,
region_name="fsn1",
)
# Use Anthropic's Voyage embeddings (or OpenAI's)
# For this example we use a simple embedding via API
client = anthropic.Anthropic(api_key=ANTHROPIC_API_KEY)
conn = psycopg2.connect(
host=DB_HOST, port=DB_PORT, dbname=DB_NAME,
user=DB_USER, password=DB_PASSWORD,
)
def list_documents():
"""List all documents in the S3 bucket."""
paginator = s3.get_paginator("list_objects_v2")
docs = []
for page in paginator.paginate(Bucket=S3_BUCKET):
for obj in page.get("Contents", []):
if obj["Key"].endswith((".md", ".txt", ".pdf", ".docx")):
docs.append(obj["Key"])
return docs
def download_and_read(key: str) -> str:
"""Download a document from S3 and extract text."""
response = s3.get_object(Bucket=S3_BUCKET, Key=key)
content = response["Body"].read()
if key.endswith((".md", ".txt")):
return content.decode("utf-8")
elif key.endswith(".pdf"):
from pypdf import PdfReader
reader = PdfReader(BytesIO(content))
return "
".join(page.extract_text() for page in reader.pages)
elif key.endswith(".docx"):
import docx2txt
return docx2txt.process(BytesIO(content))
return ""
def chunk_text(text: str, chunk_size: int = 1000, overlap: int = 200) -> list[str]:
"""Split text into overlapping chunks."""
chunks = []
start = 0
while start < len(text):
end = start + chunk_size
chunks.append(text[start:end])
start += chunk_size - overlap
return [c.strip() for c in chunks if c.strip()]
def get_embeddings(texts: list[str]) -> list[list[float]]:
"""Generate embeddings using Voyage (via Anthropic) or any provider."""
import voyageai
vo = voyageai.Client()
result = vo.embed(texts, model="voyage-3", input_type="document")
return result.embeddings
def ingest_document(key: str):
"""Full pipeline: S3 → chunk → embed → pgvector."""
print(f"Processing: {key}")
# Read from S3
text = download_and_read(key)
if not text:
return
# Chunk
chunks = chunk_text(text)
print(f" {len(chunks)} chunks")
# Embed in batches of 32
for batch_start in range(0, len(chunks), 32):
batch = chunks[batch_start:batch_start + 32]
embeddings = get_embeddings(batch)
# Store in pgvector
cur = conn.cursor()
for i, (chunk, embedding) in enumerate(zip(batch, embeddings)):
chunk_idx = batch_start + i
cur.execute("""
INSERT INTO document_chunks (s3_key, chunk_index, content, metadata, embedding)
VALUES (%s, %s, %s, %s, %s::vector)
ON CONFLICT (s3_key, chunk_index)
DO UPDATE SET content = EXCLUDED.content, embedding = EXCLUDED.embedding
""", (key, chunk_idx, chunk, json.dumps({"source": key}), str(embedding)))
conn.commit()
cur.close()
print(f" Done.")
# Run ingestion
if __name__ == "__main__":
docs = list_documents()
print(f"Found {len(docs)} documents")
for doc in docs:
ingest_document(doc)
Step 5: Query Your Knowledge Base
Now the fun part — ask questions and get answers grounded in your documents:
# query.py
import anthropic
import psycopg2
from config import *
client = anthropic.Anthropic(api_key=ANTHROPIC_API_KEY)
conn = psycopg2.connect(
host=DB_HOST, port=DB_PORT, dbname=DB_NAME,
user=DB_USER, password=DB_PASSWORD,
)
def search_similar(query: str, limit: int = 5) -> list[dict]:
"""Find the most relevant chunks for a query."""
import voyageai
vo = voyageai.Client()
result = vo.embed([query], model="voyage-3", input_type="query")
query_embedding = result.embeddings[0]
cur = conn.cursor()
cur.execute("""
SELECT s3_key, chunk_index, content,
1 - (embedding <=> %s::vector) AS similarity
FROM document_chunks
ORDER BY embedding <=> %s::vector
LIMIT %s
""", (str(query_embedding), str(query_embedding), limit))
results = []
for row in cur.fetchall():
results.append({
"source": row[0],
"chunk": row[1],
"content": row[2],
"similarity": float(row[3]),
})
cur.close()
return results
def ask(question: str) -> str:
"""RAG query: search → context → LLM answer."""
# Retrieve relevant chunks
chunks = search_similar(question, limit=5)
# Build context
context = "
---
".join(
f"Source: {c['source']}
{c['content']}"
for c in chunks
)
# Ask Claude with retrieved context
response = client.messages.create(
model="claude-sonnet-4-6-20250514",
max_tokens=2048,
messages=[{
"role": "user",
"content": f"""Answer the following question based on the provided context.
If the context doesn't contain enough information, say so.
Context:
{context}
Question: {question}"""
}],
)
return response.content[0].text
# Example usage
if __name__ == "__main__":
answer = ask("What is our incident response process?")
print(answer)
Step 6: Keep Documents in Sync
Documents change over time. Use S3 event notifications or a simple polling script to detect changes and re-ingest:
# sync_changes.py
import boto3
from datetime import datetime, timedelta, timezone
from config import *
from ingest import ingest_document
s3 = boto3.client(
"s3",
endpoint_url=S3_ENDPOINT,
aws_access_key_id=S3_ACCESS_KEY,
aws_secret_access_key=S3_SECRET_KEY,
region_name="fsn1",
)
def get_recently_modified(hours: int = 24) -> list[str]:
"""Find documents modified in the last N hours."""
cutoff = datetime.now(timezone.utc) - timedelta(hours=hours)
paginator = s3.get_paginator("list_objects_v2")
modified = []
for page in paginator.paginate(Bucket=S3_BUCKET):
for obj in page.get("Contents", []):
if obj["LastModified"] > cutoff:
modified.append(obj["Key"])
return modified
# Re-ingest recently changed documents
if __name__ == "__main__":
changed = get_recently_modified(hours=24)
print(f"Re-ingesting {len(changed)} modified documents")
for key in changed:
ingest_document(key)
Run this daily via cron or a DanubeData Serverless Container on a schedule.
Using LangChain's Built-in S3 Loader
If you prefer a higher-level abstraction, LangChain has native S3 support:
from langchain_community.document_loaders import S3DirectoryLoader
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_community.vectorstores import PGVector
from langchain_anthropic import ChatAnthropic
from langchain.chains import RetrievalQA
# Load from DanubeData S3
loader = S3DirectoryLoader(
bucket=S3_BUCKET,
endpoint_url=S3_ENDPOINT,
aws_access_key_id=S3_ACCESS_KEY,
aws_secret_access_key=S3_SECRET_KEY,
region_name="fsn1",
)
documents = loader.load()
# Split into chunks
splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=200)
chunks = splitter.split_documents(documents)
# Store in DanubeData PostgreSQL with pgvector
CONNECTION = f"postgresql://{DB_USER}:{DB_PASSWORD}@{DB_HOST}:{DB_PORT}/{DB_NAME}"
vectorstore = PGVector.from_documents(
chunks,
embedding=your_embedding_model,
connection_string=CONNECTION,
collection_name="knowledge_base",
)
# Query with Claude
llm = ChatAnthropic(model="claude-sonnet-4-6-20250514")
qa = RetrievalQA.from_chain_type(
llm=llm,
retriever=vectorstore.as_retriever(search_kwargs={"k": 5}),
)
answer = qa.invoke("What is our deployment process?")
print(answer["result"])
GDPR Compliance: Why Europe Matters
If your knowledge base contains internal documents, customer data, or employee information, where that data is stored matters legally.
- DanubeData stores all data in Germany (Falkenstein datacenter) — fully within the EU
- No data transfer to the US — your documents and embeddings stay in Europe
- Data processing agreement (DPA) available for enterprise compliance
- You control the data lifecycle — delete documents from S3 and purge corresponding embeddings from PostgreSQL
Note: The LLM API call (to Anthropic or OpenAI) does send the query and context outside the EU. For fully EU-contained RAG, consider self-hosting an open-source model on a DanubeData VPS with a dedicated GPU.
Cost Comparison
| Component | DanubeData | AWS Equivalent |
|---|---|---|
| Document Storage (S3) | €3.99/month (1 TB included) | ~$23/month (1 TB S3 Standard) |
| Vector Database (PostgreSQL + pgvector) | €19.99/month | ~$50-150/month (RDS + pgvector) |
| Total Infrastructure | ~€24/month | ~$73-173/month |
That's a production RAG pipeline with European data residency for under €25/month.
Production Tips
- Chunk size matters: Start with 1000 characters, 200 overlap. Tune based on your document structure — API docs need smaller chunks, narrative docs work better with larger ones
- Metadata filtering: Store document category, author, and date in the JSONB metadata column. Filter searches by metadata to improve relevance
- Re-ranking: For better accuracy, use a re-ranking model (Cohere, Voyage) after the initial vector search to sort results by true relevance
- Hybrid search: Combine vector similarity with PostgreSQL full-text search (
tsvector) for best results — vectors catch semantic similarity, full-text catches exact keywords - Monitor costs: Embedding API calls are the main variable cost. Cache embeddings aggressively and only re-embed changed documents
Conclusion
S3-compatible storage is the ideal document layer for RAG pipelines:
- Cheap and durable — store millions of documents for pennies
- Universal compatibility — every RAG framework supports S3 natively
- Versioning built-in — track document changes and know when to re-embed
- GDPR compliant — DanubeData keeps your data in Germany
Paired with DanubeData's managed PostgreSQL (with pgvector), you get a complete, production-ready RAG infrastructure for under €25/month — a fraction of what you'd pay on AWS or GCP.
Get started: Create a DanubeData account and have your RAG pipeline running in under an hour.
Related: S3 Storage as Persistent Memory for AI Coding Agents