BlogTutorialBuilding a RAG Knowledge Base with S3-Compatible Storage in Europe (2026)

Building a RAG Knowledge Base with S3-Compatible Storage in Europe (2026)

Adrian Silaghi
Adrian Silaghi
April 6, 2026
14 min read
2 views
#s3 #rag #llm #pgvector #postgresql #python #langchain #ai #gdpr #europe #object-storage

Large language models are powerful, but they don't know about your data — your internal docs, your codebase, your runbooks. Retrieval-Augmented Generation (RAG) bridges this gap by fetching relevant documents at query time and feeding them to the LLM as context.

Every RAG pipeline needs a document store — a place to keep the source material that gets searched, chunked, and fed to the model. S3-compatible object storage is the natural choice: it's cheap, durable, scales infinitely, and every tool in the AI ecosystem supports it natively.

This guide shows you how to build a production RAG pipeline using DanubeData's S3-compatible storage as the document layer, with a PostgreSQL vector database for embeddings — all hosted in Europe for GDPR compliance.

Architecture Overview

Here's what we're building:

┌─────────────────────────────────────────────────────────┐
│                    Your Application                      │
│                                                          │
│   User Query ──▶ Embed Query ──▶ Vector Search ──▶ LLM  │
│                                       │            │     │
│                                       ▼            ▼     │
│                                   pgvector      Claude/  │
│                                  (DanubeData    GPT-4    │
│                                   PostgreSQL)            │
│                                       ▲                  │
│                                       │                  │
│   S3 Bucket ──▶ Chunk ──▶ Embed ──▶ Store                │
│  (DanubeData                                             │
│   Object Storage)                                        │
└─────────────────────────────────────────────────────────┘

The flow has two phases:

  1. Ingestion: Upload documents to S3 → chunk them → generate embeddings → store in pgvector
  2. Query: Embed the user's question → find similar chunks in pgvector → feed them to the LLM as context

Why S3 for the Document Layer?

  • Separation of concerns: Raw documents live in S3, processed embeddings live in the vector DB. You can re-process documents anytime without re-uploading
  • Versioning: S3 versioning tracks document changes — critical for knowing when to re-embed
  • Scale: From 100 documents to 10 million, S3 handles it without architecture changes
  • Ecosystem: LangChain, LlamaIndex, Haystack, and every other RAG framework has native S3 loaders
  • Cost: DanubeData S3 includes 1 TB storage for €3.99/month — enough for millions of documents

Prerequisites

You'll need:

  • DanubeData Object Storage bucket — for document storage (create one here)
  • DanubeData PostgreSQL instance — for pgvector embeddings (create one here)
  • Python 3.11+
  • An LLM API key — Anthropic (Claude) or OpenAI

Install Dependencies

pip install boto3 langchain langchain-community langchain-anthropic 
  psycopg2-binary pgvector python-dotenv tiktoken 
  pypdf docx2txt unstructured

Step 1: Configure S3 and Database Connections

# config.py
import os
from dotenv import load_dotenv

load_dotenv()

# DanubeData S3
S3_ENDPOINT = "https://s3.danubedata.ro"
S3_ACCESS_KEY = os.getenv("DANUBEDATA_S3_ACCESS_KEY")
S3_SECRET_KEY = os.getenv("DANUBEDATA_S3_SECRET_KEY")
S3_BUCKET = "rag-knowledge-base"
S3_REGION = "fsn1"

# DanubeData PostgreSQL
DB_HOST = os.getenv("DANUBEDATA_DB_HOST")  # e.g. db-xxxxx.danubedata.ro
DB_PORT = os.getenv("DANUBEDATA_DB_PORT", "5432")
DB_NAME = os.getenv("DANUBEDATA_DB_NAME", "rag")
DB_USER = os.getenv("DANUBEDATA_DB_USER")
DB_PASSWORD = os.getenv("DANUBEDATA_DB_PASSWORD")

# Anthropic
ANTHROPIC_API_KEY = os.getenv("ANTHROPIC_API_KEY")

Step 2: Upload Documents to S3

Organize your knowledge base in S3 with a clear folder structure:

# upload_docs.py
import boto3
from pathlib import Path
from config import S3_ENDPOINT, S3_ACCESS_KEY, S3_SECRET_KEY, S3_BUCKET

s3 = boto3.client(
    "s3",
    endpoint_url=S3_ENDPOINT,
    aws_access_key_id=S3_ACCESS_KEY,
    aws_secret_access_key=S3_SECRET_KEY,
    region_name="fsn1",
)

def upload_directory(local_path: str, s3_prefix: str = ""):
    """Upload a directory of documents to S3."""
    local = Path(local_path)
    for file in local.rglob("*"):
        if file.is_file() and file.suffix in (".pdf", ".md", ".txt", ".docx", ".html"):
            key = f"{s3_prefix}/{file.relative_to(local)}" if s3_prefix else str(file.relative_to(local))
            s3.upload_file(str(file), S3_BUCKET, key)
            print(f"Uploaded: {key}")

# Upload your docs
upload_directory("./docs", s3_prefix="engineering")
upload_directory("./runbooks", s3_prefix="ops")

Your bucket structure will look like:

rag-knowledge-base/
├── engineering/
│   ├── api-design-guide.md
│   ├── architecture-decisions/
│   │   ├── adr-001-database-choice.md
│   │   └── adr-002-auth-strategy.md
│   └── onboarding.pdf
├── ops/
│   ├── incident-response.md
│   ├── deployment-checklist.md
│   └── monitoring-guide.md
└── product/
    ├── roadmap-2026.pdf
    └── user-research-findings.docx

Step 3: Set Up pgvector

DanubeData PostgreSQL instances come with the pgvector extension available. Enable it and create your embeddings table:

# setup_db.py
import psycopg2
from config import DB_HOST, DB_PORT, DB_NAME, DB_USER, DB_PASSWORD

conn = psycopg2.connect(
    host=DB_HOST, port=DB_PORT, dbname=DB_NAME,
    user=DB_USER, password=DB_PASSWORD,
)
conn.autocommit = True
cur = conn.cursor()

# Enable pgvector
cur.execute("CREATE EXTENSION IF NOT EXISTS vector;")

# Create the embeddings table
cur.execute("""
    CREATE TABLE IF NOT EXISTS document_chunks (
        id BIGSERIAL PRIMARY KEY,
        s3_key TEXT NOT NULL,
        chunk_index INTEGER NOT NULL,
        content TEXT NOT NULL,
        metadata JSONB DEFAULT '{}'::jsonb,
        embedding vector(1024),
        created_at TIMESTAMPTZ DEFAULT NOW(),

        UNIQUE(s3_key, chunk_index)
    );
""")

# Create HNSW index for fast similarity search
cur.execute("""
    CREATE INDEX IF NOT EXISTS idx_chunks_embedding
    ON document_chunks
    USING hnsw (embedding vector_cosine_ops)
    WITH (m = 16, ef_construction = 64);
""")

print("Database ready.")
cur.close()
conn.close()

Step 4: Ingest Documents from S3

This is the core of the pipeline — read documents from S3, split them into chunks, generate embeddings, and store them in pgvector.

# ingest.py
import boto3
import anthropic
import psycopg2
import json
from io import BytesIO
from config import *

# Initialize clients
s3 = boto3.client(
    "s3",
    endpoint_url=S3_ENDPOINT,
    aws_access_key_id=S3_ACCESS_KEY,
    aws_secret_access_key=S3_SECRET_KEY,
    region_name="fsn1",
)

# Use Anthropic's Voyage embeddings (or OpenAI's)
# For this example we use a simple embedding via API
client = anthropic.Anthropic(api_key=ANTHROPIC_API_KEY)

conn = psycopg2.connect(
    host=DB_HOST, port=DB_PORT, dbname=DB_NAME,
    user=DB_USER, password=DB_PASSWORD,
)

def list_documents():
    """List all documents in the S3 bucket."""
    paginator = s3.get_paginator("list_objects_v2")
    docs = []
    for page in paginator.paginate(Bucket=S3_BUCKET):
        for obj in page.get("Contents", []):
            if obj["Key"].endswith((".md", ".txt", ".pdf", ".docx")):
                docs.append(obj["Key"])
    return docs

def download_and_read(key: str) -> str:
    """Download a document from S3 and extract text."""
    response = s3.get_object(Bucket=S3_BUCKET, Key=key)
    content = response["Body"].read()

    if key.endswith((".md", ".txt")):
        return content.decode("utf-8")
    elif key.endswith(".pdf"):
        from pypdf import PdfReader
        reader = PdfReader(BytesIO(content))
        return "
".join(page.extract_text() for page in reader.pages)
    elif key.endswith(".docx"):
        import docx2txt
        return docx2txt.process(BytesIO(content))
    return ""

def chunk_text(text: str, chunk_size: int = 1000, overlap: int = 200) -> list[str]:
    """Split text into overlapping chunks."""
    chunks = []
    start = 0
    while start < len(text):
        end = start + chunk_size
        chunks.append(text[start:end])
        start += chunk_size - overlap
    return [c.strip() for c in chunks if c.strip()]

def get_embeddings(texts: list[str]) -> list[list[float]]:
    """Generate embeddings using Voyage (via Anthropic) or any provider."""
    import voyageai
    vo = voyageai.Client()
    result = vo.embed(texts, model="voyage-3", input_type="document")
    return result.embeddings

def ingest_document(key: str):
    """Full pipeline: S3 → chunk → embed → pgvector."""
    print(f"Processing: {key}")

    # Read from S3
    text = download_and_read(key)
    if not text:
        return

    # Chunk
    chunks = chunk_text(text)
    print(f"  {len(chunks)} chunks")

    # Embed in batches of 32
    for batch_start in range(0, len(chunks), 32):
        batch = chunks[batch_start:batch_start + 32]
        embeddings = get_embeddings(batch)

        # Store in pgvector
        cur = conn.cursor()
        for i, (chunk, embedding) in enumerate(zip(batch, embeddings)):
            chunk_idx = batch_start + i
            cur.execute("""
                INSERT INTO document_chunks (s3_key, chunk_index, content, metadata, embedding)
                VALUES (%s, %s, %s, %s, %s::vector)
                ON CONFLICT (s3_key, chunk_index)
                DO UPDATE SET content = EXCLUDED.content, embedding = EXCLUDED.embedding
            """, (key, chunk_idx, chunk, json.dumps({"source": key}), str(embedding)))
        conn.commit()
        cur.close()

    print(f"  Done.")

# Run ingestion
if __name__ == "__main__":
    docs = list_documents()
    print(f"Found {len(docs)} documents")
    for doc in docs:
        ingest_document(doc)

Step 5: Query Your Knowledge Base

Now the fun part — ask questions and get answers grounded in your documents:

# query.py
import anthropic
import psycopg2
from config import *

client = anthropic.Anthropic(api_key=ANTHROPIC_API_KEY)

conn = psycopg2.connect(
    host=DB_HOST, port=DB_PORT, dbname=DB_NAME,
    user=DB_USER, password=DB_PASSWORD,
)

def search_similar(query: str, limit: int = 5) -> list[dict]:
    """Find the most relevant chunks for a query."""
    import voyageai
    vo = voyageai.Client()
    result = vo.embed([query], model="voyage-3", input_type="query")
    query_embedding = result.embeddings[0]

    cur = conn.cursor()
    cur.execute("""
        SELECT s3_key, chunk_index, content,
               1 - (embedding <=> %s::vector) AS similarity
        FROM document_chunks
        ORDER BY embedding <=> %s::vector
        LIMIT %s
    """, (str(query_embedding), str(query_embedding), limit))

    results = []
    for row in cur.fetchall():
        results.append({
            "source": row[0],
            "chunk": row[1],
            "content": row[2],
            "similarity": float(row[3]),
        })
    cur.close()
    return results

def ask(question: str) -> str:
    """RAG query: search → context → LLM answer."""
    # Retrieve relevant chunks
    chunks = search_similar(question, limit=5)

    # Build context
    context = "

---

".join(
        f"Source: {c['source']}
{c['content']}"
        for c in chunks
    )

    # Ask Claude with retrieved context
    response = client.messages.create(
        model="claude-sonnet-4-6-20250514",
        max_tokens=2048,
        messages=[{
            "role": "user",
            "content": f"""Answer the following question based on the provided context.
If the context doesn't contain enough information, say so.

Context:
{context}

Question: {question}"""
        }],
    )

    return response.content[0].text

# Example usage
if __name__ == "__main__":
    answer = ask("What is our incident response process?")
    print(answer)

Step 6: Keep Documents in Sync

Documents change over time. Use S3 event notifications or a simple polling script to detect changes and re-ingest:

# sync_changes.py
import boto3
from datetime import datetime, timedelta, timezone
from config import *
from ingest import ingest_document

s3 = boto3.client(
    "s3",
    endpoint_url=S3_ENDPOINT,
    aws_access_key_id=S3_ACCESS_KEY,
    aws_secret_access_key=S3_SECRET_KEY,
    region_name="fsn1",
)

def get_recently_modified(hours: int = 24) -> list[str]:
    """Find documents modified in the last N hours."""
    cutoff = datetime.now(timezone.utc) - timedelta(hours=hours)
    paginator = s3.get_paginator("list_objects_v2")
    modified = []

    for page in paginator.paginate(Bucket=S3_BUCKET):
        for obj in page.get("Contents", []):
            if obj["LastModified"] > cutoff:
                modified.append(obj["Key"])

    return modified

# Re-ingest recently changed documents
if __name__ == "__main__":
    changed = get_recently_modified(hours=24)
    print(f"Re-ingesting {len(changed)} modified documents")
    for key in changed:
        ingest_document(key)

Run this daily via cron or a DanubeData Serverless Container on a schedule.

Using LangChain's Built-in S3 Loader

If you prefer a higher-level abstraction, LangChain has native S3 support:

from langchain_community.document_loaders import S3DirectoryLoader
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_community.vectorstores import PGVector
from langchain_anthropic import ChatAnthropic
from langchain.chains import RetrievalQA

# Load from DanubeData S3
loader = S3DirectoryLoader(
    bucket=S3_BUCKET,
    endpoint_url=S3_ENDPOINT,
    aws_access_key_id=S3_ACCESS_KEY,
    aws_secret_access_key=S3_SECRET_KEY,
    region_name="fsn1",
)
documents = loader.load()

# Split into chunks
splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=200)
chunks = splitter.split_documents(documents)

# Store in DanubeData PostgreSQL with pgvector
CONNECTION = f"postgresql://{DB_USER}:{DB_PASSWORD}@{DB_HOST}:{DB_PORT}/{DB_NAME}"
vectorstore = PGVector.from_documents(
    chunks,
    embedding=your_embedding_model,
    connection_string=CONNECTION,
    collection_name="knowledge_base",
)

# Query with Claude
llm = ChatAnthropic(model="claude-sonnet-4-6-20250514")
qa = RetrievalQA.from_chain_type(
    llm=llm,
    retriever=vectorstore.as_retriever(search_kwargs={"k": 5}),
)

answer = qa.invoke("What is our deployment process?")
print(answer["result"])

GDPR Compliance: Why Europe Matters

If your knowledge base contains internal documents, customer data, or employee information, where that data is stored matters legally.

  • DanubeData stores all data in Germany (Falkenstein datacenter) — fully within the EU
  • No data transfer to the US — your documents and embeddings stay in Europe
  • Data processing agreement (DPA) available for enterprise compliance
  • You control the data lifecycle — delete documents from S3 and purge corresponding embeddings from PostgreSQL

Note: The LLM API call (to Anthropic or OpenAI) does send the query and context outside the EU. For fully EU-contained RAG, consider self-hosting an open-source model on a DanubeData VPS with a dedicated GPU.

Cost Comparison

Component DanubeData AWS Equivalent
Document Storage (S3) €3.99/month (1 TB included) ~$23/month (1 TB S3 Standard)
Vector Database (PostgreSQL + pgvector) €19.99/month ~$50-150/month (RDS + pgvector)
Total Infrastructure ~€24/month ~$73-173/month

That's a production RAG pipeline with European data residency for under €25/month.

Production Tips

  • Chunk size matters: Start with 1000 characters, 200 overlap. Tune based on your document structure — API docs need smaller chunks, narrative docs work better with larger ones
  • Metadata filtering: Store document category, author, and date in the JSONB metadata column. Filter searches by metadata to improve relevance
  • Re-ranking: For better accuracy, use a re-ranking model (Cohere, Voyage) after the initial vector search to sort results by true relevance
  • Hybrid search: Combine vector similarity with PostgreSQL full-text search (tsvector) for best results — vectors catch semantic similarity, full-text catches exact keywords
  • Monitor costs: Embedding API calls are the main variable cost. Cache embeddings aggressively and only re-embed changed documents

Conclusion

S3-compatible storage is the ideal document layer for RAG pipelines:

  • Cheap and durable — store millions of documents for pennies
  • Universal compatibility — every RAG framework supports S3 natively
  • Versioning built-in — track document changes and know when to re-embed
  • GDPR compliant — DanubeData keeps your data in Germany

Paired with DanubeData's managed PostgreSQL (with pgvector), you get a complete, production-ready RAG infrastructure for under €25/month — a fraction of what you'd pay on AWS or GCP.

Get started: Create a DanubeData account and have your RAG pipeline running in under an hour.

Related: S3 Storage as Persistent Memory for AI Coding Agents

Share this article

Ready to Get Started?

Deploy your infrastructure in minutes with DanubeData's managed services.