Python SDK
Python Quick Start
WowSQL Python SDK — Get started in minutes
Get your API key: In your WowSQL project dashboard, go to Settings → API to find your anon key (wowsql_anon_...) and service role key (wowsql_service_...). The anon key is safe for client-side use (respects Row Level Security). The service role key bypasses RLS — use server-side only.
Installation
Official package: View on PyPI →
pip install WowSQL
Initialize Client
from WowSQL import WowSQLClient
# Initialize client — connects to PostgREST at /rest/v1/
client = WowSQLClient(
project_url="myproject", # → https://myproject.wowsqlconnect.com
api_key="wowsql_anon_your_key_here"
)
# Or with full URL
client = WowSQLClient(
project_url="https://myproject.wowsqlconnect.com",
api_key="wowsql_anon_your_key_here"
)
# Using context manager (recommended)
with WowSQLClient(project_url="myproject", api_key="wowsql_anon_your_key_here") as client:
users = client.table("users").select("*").get()
How it works under the hood: The SDK translates your query-builder calls into standard PostgREST HTTP requests. For example, client.table("users").select("*").eq("status","active").limit(10).get() becomes GET /rest/v1/users?select=*&status=eq.active&limit=10 with the apikey header.
Query Data
Basic Queries
# Get all records - CORRECT: Always use .select() before .get() or .execute()
users = client.table("users").select("*").get()
# With filters - CORRECT: Use convenience methods (.gte()) or filter() method
adults = client.table("users") \
.select("*") \
.gte("age", 18) \
.get()
# Or using filter() method
adults = client.table("users") \
.select("*") \
.filter("age", "gte", 18) \
.get()
# Select specific columns - CORRECT
names = client.table("users") \
.select("id", "name") \
.get()
Advanced Queries
# Multiple filters with sorting
active_users = client.table("users") \
.filter({"column": "status", "operator": "eq", "value": "active"}) \
.filter({"column": "age", "operator": "gt", "value": 18}) \
.order("created_at", "desc") \
.limit(20) \
.offset(0) \
.get()
print(f"Found {active_users['count']} users")
for user in active_users["data"]:
print(user["name"])
Filter Helper Methods
# Using helper methods (more readable)
users = client.table("users") \
.eq("status", "active") \
.gt("age", 18) \
.like("email", "%@gmail.com") \
.is_not_null("email_verified_at") \
.in_("role", ["admin", "moderator"]) \
.between("created_at", "2024-01-01", "2024-12-31") \
.get()
# OR conditions
users = client.table("users") \
.eq("status", "active") \
.or_("role", "eq", "admin") \
.get()
GROUP BY and Aggregates
Basic GROUP BY
# Group by category with counts
result = client.table("products") \
.select("category", "COUNT(*) as count", "AVG(price) as avg_price") \
.group_by("category") \
.get()
# Group by date
result = client.table("orders") \
.select("DATE(created_at) as date", "COUNT(*) as orders", "SUM(total) as revenue") \
.group_by("DATE(created_at)") \
.order_by("date", "desc") \
.get()
# Multiple GROUP BY columns
result = client.table("sales") \
.select("region", "category", "SUM(amount) as total") \
.group_by("region", "category") \
.get()
HAVING Clause
# Filter aggregated results
result = client.table("products") \
.select("category", "COUNT(*) as count") \
.group_by("category") \
.having("COUNT(*)", "gt", 10) \
.get()
# Multiple HAVING conditions
result = client.table("orders") \
.select("DATE(created_at) as date", "SUM(total) as revenue") \
.group_by("DATE(created_at)") \
.having("SUM(total)", "gt", 1000) \
.having("COUNT(*)", "gte", 5) \
.get()
Multiple ORDER BY
# Order by multiple columns
result = client.table("products") \
.select("*") \
.order_by([("category", "asc"), ("price", "desc"), ("created_at", "desc")]) \
.get()
CRUD Operations
Create Record
result = client.table("users").create({
"name": "John Doe",
"email": "john@example.com",
"age": 30
})
print(f"Created user with ID: {result['id']}")
Update Record
client.table("users").update(123, {
"email": "newemail@example.com"
})
print("User updated successfully")
Delete Record
client.table("users").delete(123)
print("User deleted successfully")
Get Single Record by ID
user = client.table("users").get_by_id(123)
print(f"User: {user['name']}")
Get First Record
# Get first matching record
user = client.table("users") \
.filter({"column": "status", "operator": "eq", "value": "active"}) \
.first()
if user:
print(f"First active user: {user['name']}")
Filter Operators
Available operators for filtering data:
| Operator | Description | Example |
|---|---|---|
eq | Equals | {"column": "status", "operator": "eq", "value": "active"} |
neq | Not equals | {"column": "status", "operator": "neq", "value": "deleted"} |
gt | Greater than | {"column": "age", "operator": "gt", "value": 18} |
gte | Greater than or equal | {"column": "age", "operator": "gte", "value": 18} |
lt | Less than | {"column": "price", "operator": "lt", "value": 100} |
lte | Less than or equal | {"column": "price", "operator": "lte", "value": 100} |
like | Pattern matching | {"column": "name", "operator": "like", "value": "john"} |
is | IS NULL / IS NOT NULL | {"column": "deleted_at", "operator": "is", "value": "null"} |
in | IN operator | {"column": "category", "operator": "in", "value": ["electronics", "books"]} |
not_in | NOT IN | {"column": "status", "operator": "not_in", "value": ["deleted", "archived"]} |
between | BETWEEN | {"column": "price", "operator": "between", "value": [10, 100]} |
not_between | NOT BETWEEN | {"column": "age", "operator": "not_between", "value": [18, 65]} |
Authentication
Auth endpoints are at /auth/v1/ on your project URL. Use the same API key.
Sign Up & Sign In
from WowSQL import WowSQLAuth
auth = WowSQLAuth(
project_url="myproject", # → https://myproject.wowsqlconnect.com
api_key="wowsql_anon_your_key_here"
)
# Sign up → POST /auth/v1/signup
result = auth.sign_up(
email="user@example.com",
password="SecurePassword123",
full_name="John Doe"
)
print(f"Access token: {result['access_token']}")
# Sign in → POST /auth/v1/login
login = auth.sign_in(
email="user@example.com",
password="SecurePassword123"
)
print(f"User ID: {login['user']['id']}")
Direct REST Call (Auth)
import requests
PROJECT_URL = "https://myproject.wowsqlconnect.com"
API_KEY = "wowsql_anon_your_key_here"
# Sign up directly via REST
resp = requests.post(f"{PROJECT_URL}/auth/v1/signup", json={
"email": "user@example.com",
"password": "SecurePassword123",
"full_name": "John Doe"
}, headers={"apikey": API_KEY})
# Sign in directly via REST
resp = requests.post(f"{PROJECT_URL}/auth/v1/login", json={
"email": "user@example.com",
"password": "SecurePassword123"
}, headers={"apikey": API_KEY})
session = resp.json() # { "access_token": "...", "refresh_token": "...", "user": {...} }
Password Reset
# Request password reset
auth.forgot_password(email="user@example.com")
# Reset password with token from email
auth.reset_password(
token="reset_token_from_email",
new_password="NewSecurePassword123"
)
OTP Authentication
# Send OTP for login
auth.send_otp(email="user@example.com", purpose="login")
# Verify OTP and login
result = auth.verify_otp(
email="user@example.com",
otp="123456",
purpose="login"
)
print(f"Logged in: {result['user']['id']}")
# Send OTP for signup
auth.send_otp(email="newuser@example.com", purpose="signup")
# Verify OTP and create account
result = auth.verify_otp(
email="newuser@example.com",
otp="123456",
purpose="signup"
)
Magic Link & Email Verification
# Send magic link
auth.send_magic_link(email="user@example.com", purpose="login")
# Verify email with token
auth.verify_email(token="verification_token_from_email")
# Resend verification email
auth.resend_verification(email="user@example.com")
OAuth Authentication
Setup checklist — do this once per provider, per project:
1. Dashboard → Project → Auth → Providers: enable the provider, paste Client ID and Client Secret (no leading/trailing spaces).
2. Copy the Redirect URI shown: https://<slug>.wowsqlconnect.com/auth/v1/oauth/<provider>/callback and register it in your provider's developer console (Google: Cloud Console → Credentials → OAuth 2.0 Client → Authorised redirect URIs).
3. The callback URL is handled entirely by WowSQL — your app does not send an API key on it. WowSQL exchanges the code and redirects to your frontend_redirect_uri with ?access_token=&refresh_token=.
# ── Step 1: Server-side — get the authorization URL ─────────────────────────
# Call from your server (Flask route, FastAPI endpoint, etc.).
# Never expose your API key in the browser.
result = auth.get_oauth_authorization_url(
provider="google",
frontend_redirect_uri="https://yourapp.com/auth/callback"
)
# Redirect the browser to:
print(result["authorization_url"])
# Google calls: https://<slug>.wowsqlconnect.com/auth/v1/oauth/google/callback
# WowSQL exchanges the code and redirects to your frontend_redirect_uri.
# ── Step 2: Frontend — read tokens from the redirect ────────────────────────
# WowSQL redirects to: https://yourapp.com/auth/callback?access_token=...&refresh_token=...
# Parse them from the URL query string in your frontend JS / template.
# ── (Advanced) Manual code exchange — only if you handle the callback yourself
result = auth.exchange_oauth_callback(
provider="google",
code="authorization_code_from_callback",
redirect_uri="https://yourapp.com/auth/callback" # must match step 1
)
print(f"OAuth login successful: {result['user']['id']}")
Error Handling
Wrap SDK calls in try/except to handle API and validation errors.
from WowSQL import WowSQLClient, WowSQLError
try:
users = client.table("users").select("*").get()
print(f"Success: {len(users['data'])} users")
except WowSQLError as e:
if e.status_code == 401:
print("Authentication failed — check your API key")
elif e.status_code == 404:
print("Table or resource not found")
else:
print(f"Error ({e.status_code}): {e.message}")
Storage Operations
Initialize Storage
from WowSQL import WowSQLStorage
# Storage endpoints are at /storage/v1/ on your project URL
storage = WowSQLStorage(
project_url="myproject", # → https://myproject.wowsqlconnect.com
api_key="wowsql_anon_your_key_here"
)
File Operations
# Upload file → POST /storage/v1/object/{bucket}/{path}
result = storage.upload_from_path(
"path/to/document.pdf",
"uploads/document.pdf",
folder="documents"
)
print(f"Uploaded: {result['file_key']}")
# Get file URL (with metadata)
url_data = storage.get_file_url("uploads/document.pdf", expires_in=3600)
print(f"Download URL: {url_data['file_url']}")
print(f"Expires at: {url_data['expires_at']}")
# Get presigned URL (simple)
presigned_url = storage.get_presigned_url(
"uploads/document.pdf",
expires_in=3600,
operation="get_object"
)
print(f"URL: {presigned_url}")
# List files → POST /storage/v1/object/list/{bucket}
files = storage.list_files(prefix="uploads/")
for file in files:
print(f"{file.key}: {file.size_mb:.2f} MB")
# Delete file → DELETE /storage/v1/object/{bucket}/{path}
storage.delete_file("uploads/document.pdf")
# Check quota
quota = storage.get_quota()
print(f"Used: {quota.used_gb:.2f} GB / {quota.quota_gb:.2f} GB")
Direct REST Call (Storage)
import requests
PROJECT_URL = "https://myproject.wowsqlconnect.com"
API_KEY = "wowsql_anon_your_key_here"
# Upload via REST
with open("photo.png", "rb") as f:
resp = requests.post(
f"{PROJECT_URL}/storage/v1/object/avatars/photo.png",
files={"file": f},
headers={"apikey": API_KEY}
)
# Download via REST
resp = requests.get(
f"{PROJECT_URL}/storage/v1/object/avatars/photo.png",
headers={"apikey": API_KEY}
)
# Public file (no auth needed if bucket is public)
# GET https://myproject.wowsqlconnect.com/storage/v1/object/public/{bucket}/{path}
Bucket Management
# Create bucket → POST /storage/v1/bucket
storage.create_bucket(name="avatars", public=True)
# List buckets → GET /storage/v1/bucket
buckets = storage.list_buckets()
# Delete bucket → DELETE /storage/v1/bucket/{name}
storage.delete_bucket("avatars")
Schema DDL (WowSQLSchema)
Requires service role key: Schema operations use wowsql_service_... (not the anon key). POST /api/v2/schema/tables requires a primary_key column with type UUID. Use auto_increment: True for DEFAULT gen_random_uuid(). Raw DDL via POST /api/v2/schema/execute cannot use SERIAL PRIMARY KEY. Existing tables are unchanged.
from wowsql.schema import WowSQLSchema
schema = WowSQLSchema(
project_url="myproject", # → https://myproject.wowsqlconnect.com
service_key="wowsql_service_your_key_here"
)
# Create table → POST /api/v2/schema/tables
schema.create_table(
"items",
[
{"name": "id", "type": "UUID", "auto_increment": True},
{"name": "title", "type": "TEXT"},
{"name": "price", "type": "NUMERIC"},
{"name": "created_at", "type": "TIMESTAMPTZ", "default": "NOW()"},
],
primary_key="id",
)
# Execute raw DDL → POST /api/v2/schema/execute
schema.execute("""
ALTER TABLE items ADD COLUMN description TEXT;
CREATE INDEX idx_items_title ON items (title);
""")
Direct REST Call (PostgREST)
You can also call the PostgREST API directly. WowSQL uses standard PostgREST — all filtering, ordering, and pagination is done via query parameters:
import requests
PROJECT_URL = "https://myproject.wowsqlconnect.com"
API_KEY = "wowsql_anon_your_key_here"
# SELECT with filters, ordering, and pagination
resp = requests.get(f"{PROJECT_URL}/rest/v1/users", params={
"select": "id,email,name",
"status": "eq.active",
"order": "created_at.desc",
"limit": 10
}, headers={"apikey": API_KEY})
data = resp.json()
# INSERT a new row
resp = requests.post(f"{PROJECT_URL}/rest/v1/users", json={
"email": "john@example.com",
"name": "John Doe",
"status": "active"
}, headers={
"apikey": API_KEY,
"Content-Type": "application/json",
"Prefer": "return=representation"
})
# UPDATE a row
resp = requests.patch(f"{PROJECT_URL}/rest/v1/users", params={
"id": "eq.123"
}, json={"name": "Updated Name"}, headers={
"apikey": API_KEY,
"Content-Type": "application/json",
"Prefer": "return=representation"
})
# DELETE a row
resp = requests.delete(f"{PROJECT_URL}/rest/v1/users", params={
"id": "eq.123"
}, headers={"apikey": API_KEY})