Python SDK

Python Quick Start

WowSQL Python SDK — Get started in minutes

Get your API key: In your WowSQL project dashboard, go to Settings → API to find your anon key (wowsql_anon_...) and service role key (wowsql_service_...). The anon key is safe for client-side use (respects Row Level Security). The service role key bypasses RLS — use server-side only.

Installation

Official package: View on PyPI →

pip install WowSQL

Initialize Client

from WowSQL import WowSQLClient

# Initialize client — connects to PostgREST at /rest/v1/
client = WowSQLClient(
    project_url="myproject",  # → https://myproject.wowsqlconnect.com
    api_key="wowsql_anon_your_key_here"
)

# Or with full URL
client = WowSQLClient(
    project_url="https://myproject.wowsqlconnect.com",
    api_key="wowsql_anon_your_key_here"
)

# Using context manager (recommended)
with WowSQLClient(project_url="myproject", api_key="wowsql_anon_your_key_here") as client:
    users = client.table("users").select("*").get()

How it works under the hood: The SDK translates your query-builder calls into standard PostgREST HTTP requests. For example, client.table("users").select("*").eq("status","active").limit(10).get() becomes GET /rest/v1/users?select=*&status=eq.active&limit=10 with the apikey header.

Query Data

Basic Queries

# Get all records - CORRECT: Always use .select() before .get() or .execute()
users = client.table("users").select("*").get()

# With filters - CORRECT: Use convenience methods (.gte()) or filter() method
adults = client.table("users") \
    .select("*") \
    .gte("age", 18) \
    .get()

# Or using filter() method
adults = client.table("users") \
    .select("*") \
    .filter("age", "gte", 18) \
    .get()

# Select specific columns - CORRECT
names = client.table("users") \
    .select("id", "name") \
    .get()

Advanced Queries

# Multiple filters with sorting
active_users = client.table("users") \
    .filter({"column": "status", "operator": "eq", "value": "active"}) \
    .filter({"column": "age", "operator": "gt", "value": 18}) \
    .order("created_at", "desc") \
    .limit(20) \
    .offset(0) \
    .get()

print(f"Found {active_users['count']} users")
for user in active_users["data"]:
    print(user["name"])

Filter Helper Methods

# Using helper methods (more readable)
users = client.table("users") \
    .eq("status", "active") \
    .gt("age", 18) \
    .like("email", "%@gmail.com") \
    .is_not_null("email_verified_at") \
    .in_("role", ["admin", "moderator"]) \
    .between("created_at", "2024-01-01", "2024-12-31") \
    .get()

# OR conditions
users = client.table("users") \
    .eq("status", "active") \
    .or_("role", "eq", "admin") \
    .get()

GROUP BY and Aggregates

Basic GROUP BY

# Group by category with counts
result = client.table("products") \
    .select("category", "COUNT(*) as count", "AVG(price) as avg_price") \
    .group_by("category") \
    .get()

# Group by date
result = client.table("orders") \
    .select("DATE(created_at) as date", "COUNT(*) as orders", "SUM(total) as revenue") \
    .group_by("DATE(created_at)") \
    .order_by("date", "desc") \
    .get()

# Multiple GROUP BY columns
result = client.table("sales") \
    .select("region", "category", "SUM(amount) as total") \
    .group_by("region", "category") \
    .get()

HAVING Clause

# Filter aggregated results
result = client.table("products") \
    .select("category", "COUNT(*) as count") \
    .group_by("category") \
    .having("COUNT(*)", "gt", 10) \
    .get()

# Multiple HAVING conditions
result = client.table("orders") \
    .select("DATE(created_at) as date", "SUM(total) as revenue") \
    .group_by("DATE(created_at)") \
    .having("SUM(total)", "gt", 1000) \
    .having("COUNT(*)", "gte", 5) \
    .get()

Multiple ORDER BY

# Order by multiple columns
result = client.table("products") \
    .select("*") \
    .order_by([("category", "asc"), ("price", "desc"), ("created_at", "desc")]) \
    .get()

CRUD Operations

Create Record

result = client.table("users").create({
    "name": "John Doe",
    "email": "john@example.com",
    "age": 30
})

print(f"Created user with ID: {result['id']}")

Update Record

client.table("users").update(123, {
    "email": "newemail@example.com"
})

print("User updated successfully")

Delete Record

client.table("users").delete(123)
print("User deleted successfully")

Get Single Record by ID

user = client.table("users").get_by_id(123)
print(f"User: {user['name']}")

Get First Record

# Get first matching record
user = client.table("users") \
    .filter({"column": "status", "operator": "eq", "value": "active"}) \
    .first()

if user:
    print(f"First active user: {user['name']}")

Filter Operators

Available operators for filtering data:

OperatorDescriptionExample
eqEquals{"column": "status", "operator": "eq", "value": "active"}
neqNot equals{"column": "status", "operator": "neq", "value": "deleted"}
gtGreater than{"column": "age", "operator": "gt", "value": 18}
gteGreater than or equal{"column": "age", "operator": "gte", "value": 18}
ltLess than{"column": "price", "operator": "lt", "value": 100}
lteLess than or equal{"column": "price", "operator": "lte", "value": 100}
likePattern matching{"column": "name", "operator": "like", "value": "john"}
isIS NULL / IS NOT NULL{"column": "deleted_at", "operator": "is", "value": "null"}
inIN operator{"column": "category", "operator": "in", "value": ["electronics", "books"]}
not_inNOT IN{"column": "status", "operator": "not_in", "value": ["deleted", "archived"]}
betweenBETWEEN{"column": "price", "operator": "between", "value": [10, 100]}
not_betweenNOT BETWEEN{"column": "age", "operator": "not_between", "value": [18, 65]}

Authentication

Auth endpoints are at /auth/v1/ on your project URL. Use the same API key.

Sign Up & Sign In

from WowSQL import WowSQLAuth

auth = WowSQLAuth(
    project_url="myproject",   # → https://myproject.wowsqlconnect.com
    api_key="wowsql_anon_your_key_here"
)

# Sign up → POST /auth/v1/signup
result = auth.sign_up(
    email="user@example.com",
    password="SecurePassword123",
    full_name="John Doe"
)
print(f"Access token: {result['access_token']}")

# Sign in → POST /auth/v1/login
login = auth.sign_in(
    email="user@example.com",
    password="SecurePassword123"
)
print(f"User ID: {login['user']['id']}")

Direct REST Call (Auth)

import requests

PROJECT_URL = "https://myproject.wowsqlconnect.com"
API_KEY = "wowsql_anon_your_key_here"

# Sign up directly via REST
resp = requests.post(f"{PROJECT_URL}/auth/v1/signup", json={
    "email": "user@example.com",
    "password": "SecurePassword123",
    "full_name": "John Doe"
}, headers={"apikey": API_KEY})

# Sign in directly via REST
resp = requests.post(f"{PROJECT_URL}/auth/v1/login", json={
    "email": "user@example.com",
    "password": "SecurePassword123"
}, headers={"apikey": API_KEY})
session = resp.json()  # { "access_token": "...", "refresh_token": "...", "user": {...} }

Password Reset

# Request password reset
auth.forgot_password(email="user@example.com")

# Reset password with token from email
auth.reset_password(
    token="reset_token_from_email",
    new_password="NewSecurePassword123"
)

OTP Authentication

# Send OTP for login
auth.send_otp(email="user@example.com", purpose="login")

# Verify OTP and login
result = auth.verify_otp(
    email="user@example.com",
    otp="123456",
    purpose="login"
)
print(f"Logged in: {result['user']['id']}")

# Send OTP for signup
auth.send_otp(email="newuser@example.com", purpose="signup")

# Verify OTP and create account
result = auth.verify_otp(
    email="newuser@example.com",
    otp="123456",
    purpose="signup"
)

Magic Link & Email Verification

# Send magic link
auth.send_magic_link(email="user@example.com", purpose="login")

# Verify email with token
auth.verify_email(token="verification_token_from_email")

# Resend verification email
auth.resend_verification(email="user@example.com")

OAuth Authentication

Setup checklist — do this once per provider, per project:
1. Dashboard → Project → Auth → Providers: enable the provider, paste Client ID and Client Secret (no leading/trailing spaces).
2. Copy the Redirect URI shown: https://<slug>.wowsqlconnect.com/auth/v1/oauth/<provider>/callback and register it in your provider's developer console (Google: Cloud Console → Credentials → OAuth 2.0 Client → Authorised redirect URIs).
3. The callback URL is handled entirely by WowSQL — your app does not send an API key on it. WowSQL exchanges the code and redirects to your frontend_redirect_uri with ?access_token=&refresh_token=.

# ── Step 1: Server-side — get the authorization URL ─────────────────────────
# Call from your server (Flask route, FastAPI endpoint, etc.).
# Never expose your API key in the browser.
result = auth.get_oauth_authorization_url(
    provider="google",
    frontend_redirect_uri="https://yourapp.com/auth/callback"
)
# Redirect the browser to:
print(result["authorization_url"])
# Google calls: https://<slug>.wowsqlconnect.com/auth/v1/oauth/google/callback
# WowSQL exchanges the code and redirects to your frontend_redirect_uri.


# ── Step 2: Frontend — read tokens from the redirect ────────────────────────
# WowSQL redirects to: https://yourapp.com/auth/callback?access_token=...&refresh_token=...
# Parse them from the URL query string in your frontend JS / template.


# ── (Advanced) Manual code exchange — only if you handle the callback yourself
result = auth.exchange_oauth_callback(
    provider="google",
    code="authorization_code_from_callback",
    redirect_uri="https://yourapp.com/auth/callback"  # must match step 1
)
print(f"OAuth login successful: {result['user']['id']}")

Error Handling

Wrap SDK calls in try/except to handle API and validation errors.

from WowSQL import WowSQLClient, WowSQLError

try:
    users = client.table("users").select("*").get()
    print(f"Success: {len(users['data'])} users")
except WowSQLError as e:
    if e.status_code == 401:
        print("Authentication failed — check your API key")
    elif e.status_code == 404:
        print("Table or resource not found")
    else:
        print(f"Error ({e.status_code}): {e.message}")

Storage Operations

Initialize Storage

from WowSQL import WowSQLStorage

# Storage endpoints are at /storage/v1/ on your project URL
storage = WowSQLStorage(
    project_url="myproject",   # → https://myproject.wowsqlconnect.com
    api_key="wowsql_anon_your_key_here"
)

File Operations

# Upload file → POST /storage/v1/object/{bucket}/{path}
result = storage.upload_from_path(
    "path/to/document.pdf",
    "uploads/document.pdf",
    folder="documents"
)
print(f"Uploaded: {result['file_key']}")

# Get file URL (with metadata)
url_data = storage.get_file_url("uploads/document.pdf", expires_in=3600)
print(f"Download URL: {url_data['file_url']}")
print(f"Expires at: {url_data['expires_at']}")

# Get presigned URL (simple)
presigned_url = storage.get_presigned_url(
    "uploads/document.pdf",
    expires_in=3600,
    operation="get_object"
)
print(f"URL: {presigned_url}")

# List files → POST /storage/v1/object/list/{bucket}
files = storage.list_files(prefix="uploads/")
for file in files:
    print(f"{file.key}: {file.size_mb:.2f} MB")

# Delete file → DELETE /storage/v1/object/{bucket}/{path}
storage.delete_file("uploads/document.pdf")

# Check quota
quota = storage.get_quota()
print(f"Used: {quota.used_gb:.2f} GB / {quota.quota_gb:.2f} GB")

Direct REST Call (Storage)

import requests

PROJECT_URL = "https://myproject.wowsqlconnect.com"
API_KEY = "wowsql_anon_your_key_here"

# Upload via REST
with open("photo.png", "rb") as f:
    resp = requests.post(
        f"{PROJECT_URL}/storage/v1/object/avatars/photo.png",
        files={"file": f},
        headers={"apikey": API_KEY}
    )

# Download via REST
resp = requests.get(
    f"{PROJECT_URL}/storage/v1/object/avatars/photo.png",
    headers={"apikey": API_KEY}
)

# Public file (no auth needed if bucket is public)
# GET https://myproject.wowsqlconnect.com/storage/v1/object/public/{bucket}/{path}

Bucket Management

# Create bucket → POST /storage/v1/bucket
storage.create_bucket(name="avatars", public=True)

# List buckets → GET /storage/v1/bucket
buckets = storage.list_buckets()

# Delete bucket → DELETE /storage/v1/bucket/{name}
storage.delete_bucket("avatars")

Schema DDL (WowSQLSchema)

Requires service role key: Schema operations use wowsql_service_... (not the anon key). POST /api/v2/schema/tables requires a primary_key column with type UUID. Use auto_increment: True for DEFAULT gen_random_uuid(). Raw DDL via POST /api/v2/schema/execute cannot use SERIAL PRIMARY KEY. Existing tables are unchanged.

from wowsql.schema import WowSQLSchema

schema = WowSQLSchema(
    project_url="myproject",   # → https://myproject.wowsqlconnect.com
    service_key="wowsql_service_your_key_here"
)

# Create table → POST /api/v2/schema/tables
schema.create_table(
    "items",
    [
        {"name": "id", "type": "UUID", "auto_increment": True},
        {"name": "title", "type": "TEXT"},
        {"name": "price", "type": "NUMERIC"},
        {"name": "created_at", "type": "TIMESTAMPTZ", "default": "NOW()"},
    ],
    primary_key="id",
)

# Execute raw DDL → POST /api/v2/schema/execute
schema.execute("""
    ALTER TABLE items ADD COLUMN description TEXT;
    CREATE INDEX idx_items_title ON items (title);
""")

Direct REST Call (PostgREST)

You can also call the PostgREST API directly. WowSQL uses standard PostgREST — all filtering, ordering, and pagination is done via query parameters:

import requests

PROJECT_URL = "https://myproject.wowsqlconnect.com"
API_KEY = "wowsql_anon_your_key_here"

# SELECT with filters, ordering, and pagination
resp = requests.get(f"{PROJECT_URL}/rest/v1/users", params={
    "select": "id,email,name",
    "status": "eq.active",
    "order": "created_at.desc",
    "limit": 10
}, headers={"apikey": API_KEY})
data = resp.json()

# INSERT a new row
resp = requests.post(f"{PROJECT_URL}/rest/v1/users", json={
    "email": "john@example.com",
    "name": "John Doe",
    "status": "active"
}, headers={
    "apikey": API_KEY,
    "Content-Type": "application/json",
    "Prefer": "return=representation"
})

# UPDATE a row
resp = requests.patch(f"{PROJECT_URL}/rest/v1/users", params={
    "id": "eq.123"
}, json={"name": "Updated Name"}, headers={
    "apikey": API_KEY,
    "Content-Type": "application/json",
    "Prefer": "return=representation"
})

# DELETE a row
resp = requests.delete(f"{PROJECT_URL}/rest/v1/users", params={
    "id": "eq.123"
}, headers={"apikey": API_KEY})