fastasyncpg

fastasyncpg is a simple wrapper for asyncpg. We’ll explain how it works and build up the module in a “literate” nbdev style.

Installation

On macOS, the recommended way to install PostgreSQL is via Homebrew. Other options include Postgres.app (a menubar app) and the EDB installer, but Homebrew integrates best with command-line workflows and makes updates simple.

To install PostgreSQL 18 (the current latest stable release):

brew install postgresql@18

To check if you already have PostgreSQL installed via Homebrew, run brew list | grep postgres. You can also check which version is in your PATH with psql --version.

Let’s verify the installation is working:

!brew list | grep postgres
postgresql@18
!psql --version
psql (PostgreSQL) 18.1 (Homebrew)

After installation, run brew info postgresql@18 to see setup instructions. PostgreSQL 18 is “keg-only”, meaning it’s not automatically symlinked into your PATH.

You’ll see something like:

This formula has created a default database cluster with:
  initdb --locale=en_US.UTF-8 -E UTF-8 /opt/homebrew/var/postgresql@18

When uninstalling, some dead symlinks are left behind so you may want to run:
  brew cleanup --prune-prefix

postgresql@18 is keg-only, which means it was not symlinked into /opt/homebrew,
because this is an alternate version of another formula.

If you need to have postgresql@18 first in your PATH, run:
  echo 'export PATH="/opt/homebrew/opt/postgresql@18/bin:$PATH"' >> /Users/jhoward/.bash_profile

To start postgresql@18 now and restart at login:
  brew services start postgresql@18

The brew info output (above) tells you exactly what to do:

  1. Add to PATH (for bash): echo 'export PATH="/opt/homebrew/opt/postgresql@18/bin:$PATH"' >> ~/.bash_profile && source ~/.bash_profile
  2. Start the service: brew services start postgresql@18

This registers PostgreSQL to start automatically at login.

To run non-interactive queries from a shell, use -c to pass a command directly:

!psql -d postgres -c "SELECT version();"
                                                           version                                                           
-----------------------------------------------------------------------------------------------------------------------------
 PostgreSQL 18.1 (Homebrew) on aarch64-apple-darwin25.2.0, compiled by Apple clang version 17.0.0 (clang-1700.6.3.2), 64-bit
(1 row)

Running brew services start registers PostgreSQL to start automatically at login/reboot. You can verify this with brew services list, which shows all Homebrew-managed services and their status.

!brew services list
Name          Status  User    File
cloudflared   none            
postgresql@18 started jhoward ~/Library/LaunchAgents/homebrew.mxcl.postgresql@18.plist
unbound       none            

To control auto-start behavior: - brew services stop postgresql@18 — stop and disable auto-start - brew services start postgresql@18 — start and enable auto-start
- brew services run postgresql@18 — run once without enabling auto-start

On Ubuntu, the standard way to get the latest PostgreSQL is through the official PostgreSQL APT repository (PGDG), since Ubuntu’s default repos often have older versions.

Connecting with Python

The most popular Python libraries for PostgreSQL are psycopg2/psycopg3 (synchronous) and asyncpg (async). For async work, asyncpg is about 5x faster than psycopg3 and is the recommended choice.

We’ll use asyncpg for this wrapper — it’s the fastest Python PostgreSQL library for async code.

import os
user = os.environ['USER']; user
'jhoward'

asyncpg uses await for all database operations. The connect function returns a connection object, and fetchval is a convenience method that returns a single value from the first row:

conn = await asyncpg.connect(user=user, database='postgres', host='127.0.0.1')
await conn.fetchval('SELECT version()')
'PostgreSQL 18.1 (Homebrew) on aarch64-apple-darwin25.2.0, compiled by Apple clang version 17.0.0 (clang-1700.6.3.2), 64-bit'

Let’s create a simple test table to explore basic operations:

await conn.execute('''DROP TABLE IF EXISTS users''')
await conn.execute('''CREATE TABLE users ( id SERIAL PRIMARY KEY, name TEXT NOT NULL, age INTEGER )''')
'CREATE TABLE'

Great! Now let’s insert some data:

await conn.execute("INSERT INTO users (name, age) VALUES ($1, $2)", 'Alice', 30)
'INSERT 0 1'

PostgreSQL uses $1, $2, etc. for parameterized queries, not ? like SQLite. This syntax allows you to reference the same parameter multiple times and makes the order explicit.

fetch returns a list of Record objects. Each record supports dict-like access by column name or index:

rs = await conn.fetch("SELECT * FROM users")
rs
[<Record id=1 name='Alice' age=30>]
r = rs[0]
type(rs),type(r)
(list, asyncpg.protocol.record.Record)

asyncpg.Record objects use dict-like access (r['name'] or r[0]), not attribute access. You can use dict2obj if you want the latter.

ro = dict2obj(dict(r))
ro.name
'Alice'

Unlike psycopg2, asyncpg doesn’t use traditional cursors. Instead, use async for record in conn.cursor(...) to iterate over results. However, cursors in asyncpg require an explicit transaction:

# raises "NoActiveSQLTransactionError: cursor cannot be created outside of a transaction"
# async for record in conn.cursor("SELECT * FROM users"): print(record)
async with conn.transaction():
    async for record in conn.cursor("SELECT * FROM users"): print(record)
<Record id=1 name='Alice' age=30>

By default, asyncpg operates in auto-commit mode — changes are applied immediately when not in an explicit transaction block. Regular queries (execute, fetch, etc.) don’t need transactions, but cursors do. This is a direct reflection of how PostgreSQL itself handles “portals” (the underlying mechanism for cursors).

In PostgreSQL, a portal is generally only valid for the duration of a transaction. If a transaction isn’t explicitly started, PostgreSQL runs each command in its own “one-shot” transaction. For a cursor to stay open so you can fetch multiple batches of rows, the transaction it belongs to must remain open.

Other libraries (like psycopg2) often hide this by starting a transaction for you automatically when you create a cursor, whereas asyncpg chooses to be more “explicit” about the underlying database state.

Chinook

For testing with a real dataset, we’ll use the Chinook sample database, which has tables for artists, albums, tracks, etc. The PostgreSQL version is available on GitHub:

curl -L -O https://github.com/lerocha/chinook-database/raw/master/ChinookDatabase/DataSources/Chinook_PostgreSql.sql

Now we need to create a database and run that script. First, let’s create a database called chinook:

createdb chinook

Then we can load the SQL file into it:

psql -d chinook -f Chinook_PostgreSql.sql

Always close connections when done — this releases the database connection back to PostgreSQL:

await conn.close()

Now let’s connect to the Chinook database to work with more realistic data:

conn = await asyncpg.connect(user=user, database='chinook', host='127.0.0.1')
await conn.fetchval("SELECT count(*) FROM artist")
275

Metadata

Results is a simple list subclass that renders as an HTML table in notebooks. It displays all rows with column headers, making query results easy to read:


source

Results


def Results(
    args:VAR_POSITIONAL, kwargs:VAR_KEYWORD
):

Built-in mutable sequence.

If no argument is given, the constructor creates a new empty list. The argument must be an iterable if specified.

sql is a quick helper to run SQL and return results as a list of records:

from IPython.core.magic import register_cell_magic
@register_cell_magic
async def sql(l,c): return Results(await conn.fetch(c))
SELECT column_name, data_type, is_nullable
FROM information_schema.columns
WHERE table_name = 'artist'
column_name data_type is_nullable
artist_id integer NO
name character varying YES

In PostgreSQL, every table belongs to a schema — think of it as a folder or namespace. The default schema is public. When you create a table without specifying a schema, it lands there. You can query schema information via information_schema views:

SELECT column_name, data_type, is_nullable
FROM information_schema.columns
WHERE table_name = 'artist'
column_name data_type is_nullable
artist_id integer NO
name character varying YES

To list all tables in the public schema, you can query the information_schema.tables view:

await conn.fetch("SELECT table_name FROM information_schema.tables WHERE table_schema = 'public'")
[<Record table_name='artist'>,
 <Record table_name='album'>,
 <Record table_name='employee'>,
 <Record table_name='customer'>,
 <Record table_name='invoice'>,
 <Record table_name='invoice_line'>,
 <Record table_name='track'>,
 <Record table_name='playlist'>,
 <Record table_name='playlist_track'>,
 <Record table_name='genre'>,
 <Record table_name='media_type'>,
 <Record table_name='cats'>,
 <Record table_name='cat'>,
 <Record table_name='dog'>]

To customize how records behave, we need access to the underlying Record class:

FRecord extends asyncpg’s Record with two conveniences: attribute access (r.name instead of r['name']) and HTML rendering for notebooks:


source

FRecord


def FRecord(
    args:VAR_POSITIONAL, kwargs:VAR_KEYWORD
):

Initialize self. See help(type(self)) for accurate signature.

We can use record_class to auto-wrap with FRecord:

await conn.close()

conn = await asyncpg.connect(user=user, database='chinook', host='127.0.0.1', record_class=FRecord)

table_names and view_names query PostgreSQL’s system catalogs to list tables and views in a schema. We use pg_class and pg_namespace rather than information_schema for better performance:


source

view_names


def view_names(
    conn, schema:str='public'
):

List of view names in schema


source

table_names


def table_names(
    conn, schema:str='public'
):

List of table names in schema

print(' '.join(await table_names(conn)))
artist album employee customer invoice invoice_line track playlist playlist_track genre media_type cats cat dog
await view_names(conn)
[]

columns_info returns a dict mapping column names to their PostgreSQL data types. It queries pg_attribute directly for efficiency:


source

columns_info


def columns_info(
    conn, table, schema:str='public'
):

Dict mapping column names to data types for table

list(await columns_info(conn, 'artist'))
['artist_id', 'name']

We’ll need to know each table’s primary key. PostgreSQL stores this in pg_index. The ::regclass cast is idiomatic PostgreSQL — it converts a table name string to its internal object ID, automatically handling schema resolution. The indisprimary flag identifies the primary key index.


source

pk_cols


def pk_cols(
    conn, table
):

Get primary key column(s) for table

await pk_cols(conn, 'artist')
['artist_id']

Database wraps an asyncpg connection (or pool) and provides table/view metadata caching. It delegates unknown attributes to the underlying connection via __getattr__, so you can call db.fetch(...) directly. The t property returns a _TablesGetter for convenient table access.


source

Database


def Database(
    conn, refresh:bool=True
):

Initialize self. See help(type(self)) for accurate signature.

Table represents a database table with metadata like columns and primary keys. The xtra method lets you set persistent row filters (useful for multi-tenancy). Tables stringify as quoted identifiers for safe SQL interpolation.


source

Table


def Table(
    db, name
):

Initialize self. See help(type(self)) for accurate signature.

_Getter is a base class for “magic accessor” objects that provide multiple ways to access items — by attribute (dt.artist), by index (dt['artist']), or by iteration (for t in dt). It implements __dir__ so tab-completion works in notebooks, __repr__ for nice display, __contains__ for in checks, and both __getattr__ and __getitem__ for flexible access.

_TablesGetter specializes this for tables, reading from the database’s _tnames list. The db.t property returns one of these, giving you a clean API: db.t.artist instead of db.table('artist').

connect is our main entry point — it creates an asyncpg connection with FRecord as the default record class, sets up JSON codecs, and returns a Database wrapper with metadata already loaded:

async def connect(*args, **kwargs):
    kwargs.setdefault('record_class', FRecord)
    conn = await asyncpg.connect(*args, **kwargs)
    res = Database(conn, refresh=False)
    await res.refresh()
    return res
await conn.close()

db = await connect(user=user, database='chinook', host='127.0.0.1'); str(db)
'postgresql://jhoward@127.0.0.1:5432/chinook'
dt = db.t
artist = dt.artist
artist
Table "artist"
artist.pks
['artist_id']
dt['album','artist']
[Table "album", Table "artist"]
for tbl in dt:
    if tbl.name[0]=='a': print(tbl)
"album"
"artist"
assert 'artist' in dt
assert artist in dt
assert 'foo' not in dt
artist.cols
{'artist_id': 'integer', 'name': 'character varying(120)'}

_Col represents a single column, with __str__ returning fully-qualified SQL ("table"."column"). _ColsGetter follows the same pattern as _TablesGetter — it’s a magic accessor that lets you write artist.c.name to get a column reference. The __call__ method returns all columns as _Col objects, useful for building queries programmatically.


source

Table.c


def c(
    
):
ac = artist.c
ac
artist_id, name

Columns stringify in a format suitable for including in SQL statements.

print(f"select {ac.name} ...")
select "artist"."name" ...

Tables and views do the same.

print(f"select {ac.Name} from {artist}")
select "artist"."Name" from "artist"
assert 'name' in ac
assert ac.name in ac
assert 'foo' not in ac

Queries and views

We will support ? in addition to the standard pgsql $1 form placeholders, by using sqlparse to parse the query.

sqlparse returns a token list per statement, eg:

ts = sqlparse.parse("SELECT * FROM artist WHERE artist_id = ?")[0].flatten()
for t in ts: print(repr(t.ttype), repr(t.value))
Token.Keyword.DML 'SELECT'
Token.Text.Whitespace ' '
Token.Wildcard '*'
Token.Text.Whitespace ' '
Token.Keyword 'FROM'
Token.Text.Whitespace ' '
Token.Name 'artist'
Token.Text.Whitespace ' '
Token.Keyword 'WHERE'
Token.Text.Whitespace ' '
Token.Name 'artist_id'
Token.Text.Whitespace ' '
Token.Operator.Comparison '='
Token.Text.Whitespace ' '
Token.Name.Placeholder '?'

source

conv_placeholders


def conv_placeholders(
    sql, kwargs:VAR_KEYWORD
):

Convert ? and :name placeholders to PostgreSQL $n style

print(conv_placeholders("""
    SELECT * FROM a WHERE id = ?;
    SELECT * FROM b WHERE name = ?
""")[0])

    SELECT * FROM a WHERE id = $1;
    SELECT * FROM b WHERE name = $2
conv_placeholders("SELECT * FROM artist WHERE artist_id = $1 AND name = ?")
('SELECT * FROM artist WHERE artist_id = $1 AND name = $1', [])

We get back values for the kwargs in the order they appear:

conv_placeholders("SELECT * FROM artist WHERE artist_id = :id AND name = :name", id=5, name='AC/DC')
('SELECT * FROM artist WHERE artist_id = $1 AND name = $2', [5, 'AC/DC'])

A repeated kw placeholder name results in a single arg result:

conv_placeholders("SELECT * FROM a WHERE id = :uid OR creator = :uid", uid=42)
('SELECT * FROM a WHERE id = $1 OR creator = $1', [42])

Placeholder types can be mixed:

conv_placeholders("SELECT * FROM a WHERE id = ? AND name = :name and foo=?", name='test')
('SELECT * FROM a WHERE id = $1 AND name = $3 and foo=$2', ['test'])

db.q is a convenience method that runs a SQL query and wraps results in a Results list for nice HTML rendering:


source

Database.q


def q(
    sql, args:VAR_POSITIONAL, kwargs:VAR_KEYWORD
):
acdc = await db.q(f"select * from {artist} where {ac.name} like 'AC/%'")
acdc
artist_id name
1 AC/DC
await db.q(f"select * from {artist} where {ac.name}=$1", 'AC/DC')
artist_id name
1 AC/DC
await db.q(f"select * from {artist} where {ac.name}=?", 'AC/DC')
artist_id name
1 AC/DC
await db.q(f"select * from {artist} where {ac.name}=:name", name='AC/DC')
artist_id name
1 AC/DC

Dataclasses

PostgreSQL has many data types that map to Python equivalents. We’ll import the Python types we need:

get_typ extracts the base PostgreSQL type (stripping size specifiers like (120)) and maps it to the corresponding Python type:


source

get_typ


def get_typ(
    pg_type
):

Get Python type for PostgreSQL type string

The pg_to_py dict maps PostgreSQL type names to Python types. This covers the most common types — numeric, string, temporal, JSON, network, and geometric:

asyncpg doesn’t automatically decode JSON columns — we need to register custom codecs. The setup_json function configures both json and jsonb types to use Python’s json module:


source

setup_json


def setup_json(
    conn
):

We’ll re-define connect to use json now:


source

connect


def connect(
    args:VAR_POSITIONAL, kwargs:VAR_KEYWORD
):
await db.close()
db = await connect(user=user, database='chinook', host='127.0.0.1')

We’ll use Python’s dataclasses module to auto-generate typed classes from table schemas:

With the type mapping in place, we can auto-generate Python dataclasses from table schemas. The _get_flds helper extracts field definitions, and dataclass() creates a dataclass matching the table structure. We use flexiclass from fastcore to make the dataclass more flexible (allowing partial instantiation).

dt = db.t
artist = dt.artist

Artist = artist.dataclass()
art1_obj = Artist(**acdc[0])
art1_obj
Artist(artist_id=1, name='AC/DC')
artist.cls
__main__.Artist

You can get the definition of the dataclass using fastcore’s dataclass_src:

src = dataclass_src(Artist)
hl_md(src, 'python')
@dataclass
class Artist:
    artist_id: int | None = UNSET
    name: str | None = UNSET

all_dcs generates dataclasses for every table (and optionally views) in the database. This is useful for type-checking and IDE autocompletion:


source

all_dcs


def all_dcs(
    db, with_views:bool=False, store:bool=True, suf:str=''
):

dataclasses for all objects in db

get

The xtra method (defined earlier) lets you set persistent filters on a table. The _add_xtra helper injects these constraints into WHERE clauses. This is useful for multi-tenant apps or any situation where you want automatic row filtering — e.g., album.xtra(artist_id=1) ensures all subsequent queries only see albums by artist 1.

__getitem__ provides dict-style access by primary key. It raises NotFoundError if the row doesn’t exist (or doesn’t match xtra constraints). If a dataclass has been generated for the table, results are automatically converted to that type.


source

Table.__getitem__


def __getitem__(
    pk
):

Get row by primary key, raising NotFoundError if missing


source

NotFoundError


def NotFoundError(
    args:VAR_POSITIONAL, kwargs:VAR_KEYWORD
):

Common base class for all non-exit exceptions.

a = await artist[1]
a
Artist(artist_id=1, name='AC/DC')
a._db
<__main__.Database>
album = dt.album
Album = album.dataclass()

print("Album 1:", await album[1])
print("Artist ID of album 1:", (await album[1]).artist_id)

album.xtra(artist_id=1)
print("\nWith xtra(artist_id=1):")
print("Album 1:", await album[1])  # Should work - album 1 is by artist 1

try: await album[2]  # Album 2 is by a different artist
except NotFoundError as e: print('Error correctly raised:', type(e))
Album 1: Album(album_id=1, title='For Those About To Rock We Salute You', artist_id=1)
Artist ID of album 1: 1

With xtra(artist_id=1):
Album 1: Album(album_id=1, title='For Those About To Rock We Salute You', artist_id=1)
Error correctly raised: <class '__main__.NotFoundError'>

get is the “safe” version of __getitem__ — it returns None instead of raising an exception when a row isn’t found. This mirrors the pattern in fastlite and Python’s dict.get().


source

Table.get


def get(
    pk
):

Get row by primary key, or None if missing

await artist.get(1)
Artist(artist_id=1, name='AC/DC')
await artist.get(99999)  # Should return None

call/select

rows_where is the core query method. It builds SQL from its parameters, applies xtra constraints, and optionally converts results to the table’s dataclass. Unlike psycopg2/sqlite which use ? placeholders, PostgreSQL uses $1, $2 positional parameters.


source

Table.rows_where


def rows_where(
    where:NoneType=None, where_args:NoneType=None, order_by:NoneType=None, select:str='*', limit:NoneType=None,
    offset:NoneType=None, as_cls:bool=True, debug:bool=False
):

Iterate over rows matching where clause

await artist.rows_where(limit=3)
[Artist(artist_id=1, name='AC/DC'),
 Artist(artist_id=2, name='Accept'),
 Artist(artist_id=3, name='Aerosmith')]
album.xtra(artist_id=1)
await album.rows_where(limit=5)
[Album(album_id=1, title='For Those About To Rock We Salute You', artist_id=1),
 Album(album_id=4, title='Let There Be Rock', artist_id=1)]

count is an async property that returns the number of rows in a table. It respects xtra constraints, so if you’ve set filters, only matching rows are counted:


source

Table.count_where


def count_where(
    where:NoneType=None, where_args:NoneType=None
):

source

Table.count


def count(
    
):
album.xtra(artist_id=1)
await album.count
2
album.xtra()
await album.count
347

get_field extracts a value from either a dict-like object (using [k]) or a dataclass/object (using getattr). This lets us handle both Record and dataclass results uniformly:


source

get_field


def get_field(
    r, k
):

pks_and_rows_where wraps rows_where but returns (pk, row) tuples — useful when you need to know which primary key each row has without inspecting the row itself.


source

Table.pks_and_rows_where


def pks_and_rows_where(
    kwargs:VAR_KEYWORD
):

Like rows_where but returns (pk, row) tuples

await artist.pks_and_rows_where(limit=3)
[(1, Artist(artist_id=1, name='AC/DC')),
 (2, Artist(artist_id=2, name='Accept')),
 (3, Artist(artist_id=3, name='Aerosmith'))]

__call__ makes tables callable, providing a convenient shorthand for queries. await artist(limit=3) is equivalent to await artist.rows_where(limit=3). The with_pk parameter switches to returning (pk, row) tuples.


source

Table.__call__


def __call__(
    where:NoneType=None, where_args:NoneType=None, order_by:NoneType=None, limit:NoneType=None, offset:NoneType=None,
    select:str='*', with_pk:bool=False, as_cls:bool=True, debug:bool=False
):

Query table rows

await artist(limit=3)
[Artist(artist_id=1, name='AC/DC'),
 Artist(artist_id=2, name='Accept'),
 Artist(artist_id=3, name='Aerosmith')]
await artist(limit=3, with_pk=True)
[(1, Artist(artist_id=1, name='AC/DC')),
 (2, Artist(artist_id=2, name='Accept')),
 (3, Artist(artist_id=3, name='Aerosmith'))]
album.xtra(artist_id=1)
await album(limit=5)
[Album(album_id=1, title='For Those About To Rock We Salute You', artist_id=1),
 Album(album_id=4, title='Let There Be Rock', artist_id=1)]

selectone returns exactly one row matching the query, raising NotFoundError if none found or ValueError if multiple found. It passes limit=2 internally so it can detect the “not unique” case without fetching the entire table.


source

Table.selectone


def selectone(
    where:str | None=None, # SQL where fragment to use, for example `id > ?`
    where_args:Union=None, # Parameters to use with `where`; iterable for `id>?`, or dict for `id>:id`
    select:str='*', # Comma-separated list of columns to select
    as_cls:bool=True, # Convert returned dict to stored dataclass?
    debug:bool=False
)->list:

Shortcut for __call__ that returns exactly one item

await artist.selectone('Name=$1', ('AC/DC',), debug=True)
SELECT * FROM "artist" WHERE Name=$1 LIMIT 2
Artist(artist_id=1, name='AC/DC')
try: await artist.selectone('Name like $1', ('%a%',))
except ValueError: pass
else: raise Exception("Failed to get non unique exception")
try: await artist.selectone('Name=$1', ('i do not exist',))
except NotFoundError: pass
else: raise Exception("Failed to get NotFoundError")

db.item is for scalar queries — it returns a single field from a single row. Useful for things like SELECT count(*) or SELECT max(price).


source

Database.item


def item(
    sql, args:NoneType=None
):

Execute sql and return a single field from a single row

await db.item('select artist_id from artist where name=$1', ('AC/DC',))
1

create_mod

create_mod generates a Python module file containing dataclass definitions for all tables in the database. This lets you import type-checked dataclasses directly rather than regenerating them each time. The generated file includes proper imports and uses UNSET defaults for flexible instantiation.


source

create_mod


def create_mod(
    db, mod_fn, with_views:bool=False, store:bool=True, suf:str=''
):

Create module for dataclasses for db

create_mod(db, 'db_dc')

link_dcs reconnects a database’s tables to dataclasses from a previously generated module. This is useful when you’ve imported dataclasses from a file created by create_mod and want the ORM to use them.


source

Database.set_classes


def set_classes(
    glb
):

Add set all table dataclasses using types in namespace glb

db.t
artist, album, employee, customer, invoice, invoice_line, track, playlist, playlist_track, genre, media_type, cats, cat, dog

get_tables injects table objects into a namespace with pluralized names — so db.t.album becomes available as albums. Combined with set_classes, this gives you a clean API: await albums(limit=3) returns a list of Album dataclass instances.


source

Database.get_tables


def get_tables(
    glb
):

Add objects for all table objects to namespace glb

db.set_classes(globals())
db.get_tables(globals())

await albums(limit=1)
[Album(album_id=1, title='For Those About To Rock We Salute You', artist_id=1)]
await employees(limit=1)
[Employee(employee_id=1, last_name='Adams', first_name='Andrew', title='General Manager', reports_to=None, birth_date=datetime.datetime(1962, 2, 18, 0, 0), hire_date=datetime.datetime(2002, 8, 14, 0, 0), address='11120 Jasper Ave NW', city='Edmonton', state='AB', country='Canada', postal_code='T5K 2N1', phone='+1 (780) 428-9482', fax='+1 (780) 428-3457', email='andrew@chinookcorp.com')]

insert

To support both dataclasses and dicts as input, and to handle Enum values properly, we need these imports:

_process_row converts a dataclass (or dict) to a plain dict, filtering out UNSET values and extracting .value from Enum fields. This lets you pass partially-filled dataclasses to insert/update.

insert adds a row to the table. It accepts either a dataclass/dict as record, keyword arguments, or both (kwargs override record fields). PostgreSQL’s RETURNING * clause lets us get the inserted row back in one query — including any auto-generated values like SERIAL primary keys. The xtra constraints are automatically merged in.


source

Table.insert


def insert(
    record:NoneType=None, kwargs:VAR_KEYWORD
):

Insert a row and return it

For DDL statements like CREATE TABLE, use execute rather than fetch/q. DDL statements don’t return rows — they return a status string like 'CREATE TABLE'. PostgreSQL uses SERIAL for auto-incrementing integers (instead of SQLite’s INTEGER PRIMARY KEY) and REAL instead of FLOAT.

await db.execute('''
DROP TABLE IF EXISTS cat;
CREATE TABLE cat (
    id SERIAL PRIMARY KEY,
    name TEXT,
    weight REAL,
    uid INTEGER
)''')
'CREATE TABLE'

_retr_tbl is a helper that refreshes the database metadata and returns the table object for a given name. This ensures you’re working with up-to-date schema information after creating or modifying tables.

table2glb is a convenience method that refreshes metadata, creates the dataclass, and injects both the table object (pluralized name) and the dataclass into a namespace. This is handy after creating a new table.


source

Database.table2glb


def table2glb(
    name, glb:NoneType=None
):

Get table by name, refreshing metadata and creating dataclass, adding to glb

await db.table2glb('cat')
cats
Table "cat"
c = await cats.insert(name='meow', weight=6, uid=2)
c
Cat(id=1, name='meow', weight=6.0, uid=2)
await cats()
[Cat(id=1, name='meow', weight=6.0, uid=2)]

With xtra set, insert automatically includes those constraints. Here we set uid=1, so the inserted cat gets that value even though we didn’t pass it explicitly.

cats.xtra(uid=1)
c2 = await cats.insert(name='purr', weight=4)
c2
Cat(id=2, name='purr', weight=4.0, uid=1)
await cats()
[Cat(id=2, name='purr', weight=4.0, uid=1)]

Calling xtra() with no arguments clears the filter by setting xtra_id = {}. Now queries return all rows again.

cats.xtra()
Table "cat"
await cats()  # Should now return all cats, not just uid=1
[Cat(id=1, name='meow', weight=6.0, uid=2),
 Cat(id=2, name='purr', weight=4.0, uid=1)]

update

c.name = "moo"
c.uid = 1
c
Cat(id=1, name='moo', weight=6.0, uid=1)

_pk_where builds a WHERE clause for primary key matching, using PostgreSQL’s $1, $2 placeholders with an offset to account for preceding parameters in the query.

update modifies an existing row by primary key. It builds an UPDATE ... SET ... WHERE pk = $n RETURNING * statement. Like insert, it respects xtra constraints — if you try to update a row that doesn’t match the xtra filter, you’ll get NotFoundError.


source

Table.update


def update(
    record:NoneType=None, pk_values:NoneType=None, kwargs:VAR_KEYWORD
):

Update a row and return it

await cats.update(c)
Cat(id=1, name='moo', weight=6.0, uid=1)
await cats()
[Cat(id=2, name='purr', weight=4.0, uid=1),
 Cat(id=1, name='moo', weight=6.0, uid=1)]
cats.xtra(uid=2)
c.uid = 2
try: await cats.update(c)  # Should fail - c has id=1 which has uid=1, not uid=2
except NotFoundError as e: print('Correctly blocked:', e)
Correctly blocked: cat[[1, 2]]
cats.xtra()
Table "cat"

delete

delete removes a row by primary key, returning the deleted row (using RETURNING *). Like the other methods, it respects xtra constraints — attempting to delete a row that doesn’t match the filter raises NotFoundError.


source

Table.delete


def delete(
    pk_values
):

Delete row by primary key, returning the deleted row

Let’s verify delete works — first check what cats we have:

await cats()
[Cat(id=2, name='purr', weight=4.0, uid=1),
 Cat(id=1, name='moo', weight=6.0, uid=1)]

Delete returns the deleted row, so you can see exactly what was removed:

await cats.delete(c.id)
Cat(id=1, name='moo', weight=6.0, uid=1)
await cats()
[Cat(id=2, name='purr', weight=4.0, uid=1)]

The xtra filter also applies to deletes. If you try to delete a row that doesn’t match the constraint, you get NotFoundError:

cats.xtra(uid=99)
try: await cats.delete(2)  # Should fail - cat 2 has uid=1, not uid=99
except NotFoundError as e: print('Correctly blocked:', e)
Correctly blocked: cat[2]
cats.xtra()
Table "cat"

delete_where is the bulk version of delete — it removes all rows matching a WHERE clause (or all rows if none given), returning the deleted rows as a list. Like delete, it respects xtra constraints and uses RETURNING * to give back what was removed. This is useful for cleanup operations like removing all rows above a threshold.


source

Table.delete_where


def delete_where(
    where:NoneType=None, where_args:NoneType=None
):
await cats.insert(name='Cat McCat Face', weight=9)
await cats.insert(name='Skitter', weight=2)
Cat(id=4, name='Skitter', weight=2.0, uid=None)
await cats.delete_where('weight > $1', [3])
[Cat(id=2, name='purr', weight=4.0, uid=1),
 Cat(id=3, name='Cat McCat Face', weight=9.0, uid=None)]
await cats()
[Cat(id=4, name='Skitter', weight=2.0, uid=None)]

Create

To create tables from Python classes, we need a reverse mapping from Python types to PostgreSQL types. We generate it from pg_to_py and override some entries for cleaner defaults (e.g., TEXT instead of character varying).

col_def builds a column definition for CREATE TABLE. If the column is the primary key and has type int, it becomes SERIAL PRIMARY KEY (PostgreSQL’s auto-incrementing integer). Otherwise it maps the Python type to PostgreSQL and adds NOT NULL if specified.


source

col_def


def col_def(
    name, typ, pk, not_null
):

Build column definition SQL for CREATE TABLE

col_def('id', int, 'id', None)  # 'id' is pk and int -> SERIAL PRIMARY KEY
'"id" SERIAL PRIMARY KEY'
col_def('name', str, 'id', {'name'})  # not pk, in not_null -> TEXT NOT NULL
'"name" TEXT NOT NULL'
col_def('age', int, 'id', None)  # not pk -> INTEGER
'"age" INTEGER'

db.create creates a table from a Python class (or dataclass). It extracts field names and types, builds column definitions, and executes the CREATE TABLE statement. The replace=True option drops any existing table first (with CASCADE to handle dependencies).


source

Database.create


def create(
    cls:NoneType=None, name:NoneType=None, pk:str='id', foreign_keys:NoneType=None, defaults:NoneType=None,
    column_order:NoneType=None, not_null:NoneType=None, if_not_exists:bool=False, replace:bool=False
):

Create table from cls

drop removes a table from the database. The cascade=True option also drops any dependent objects (like foreign key references from other tables).


source

Table.drop


def drop(
    cascade:bool=False
):

Drop this table

await db.t.dog.drop(cascade=True)

Now let’s test create with a simple Dog class:

class Dog: id:int; name:str; age:int

dogs = await db.create(Dog, replace=True)
dogs.cols
{'id': 'integer', 'name': 'text', 'age': 'integer'}

The auto-generated SERIAL primary key handles auto-increment automatically:

d = await dogs.insert(name='Rex', age=5)
d
Dog(id=1, name='Rex', age=5)

Foreign keys are specified as a dict mapping column names to (table, column) tuples:

class Toy: id:int; name:str; dog_id:int

toys = await db.create(Toy, replace=True, foreign_keys={'dog_id': ('dog', 'id')})
toys.cols
{'id': 'integer', 'name': 'text', 'dog_id': 'integer'}

The foreign key constraint is enforced by PostgreSQL — inserting a toy with an invalid dog_id would raise an error:

t = await toys.insert(name='Ball', dog_id=d.id)
t
Toy(id=1, name='Ball', dog_id=1)

Upsert

upsert performs an “insert or update” operation using PostgreSQL’s ON CONFLICT ... DO UPDATE clause. If a row with the same primary key exists, it updates it; otherwise it inserts a new row. Like insert, it uses _prep_row for row processing and _exec_returning for result handling, and respects xtra constraints.


source

Table.upsert


def upsert(
    record:NoneType=None, kwargs:VAR_KEYWORD
):

Insert or update a row and return it

Let’s test upsert — first check current state:

await dogs()
[Dog(id=1, name='Rex', age=5)]

Updating an existing row — change Rex’s age from 5 to 6:

d.age = 6
await dogs.upsert(d)
Dog(id=1, name='Rex', age=6)
await dogs()
[Dog(id=1, name='Rex', age=6)]

Inserting a new row — upsert without an existing id creates a new record:

await dogs.upsert(name='Spot', age=3)
Dog(id=2, name='Spot', age=3)
await dogs()
[Dog(id=1, name='Rex', age=6), Dog(id=2, name='Spot', age=3)]

With xtra set, upsert merges those constraints into the row:

dogs.xtra(age=6)
d.name = 'Rexy'
await dogs.upsert(d)  # Should set age=6 from xtra
Dog(id=1, name='Rexy', age=6)

Connection pool

For production use, you’ll want a connection pool instead of a single connection. asyncpg.Pool has the same query methods (fetch, execute, etc.) as a connection, so our Database wrapper works with both. The key difference is that JSON codecs must be registered via the init callback (which runs on each new connection in the pool).


source

create_pool


def create_pool(
    args:VAR_POSITIONAL, kwargs:VAR_KEYWORD
):
await db.close()

Let’s test that the pool works the same as a single connection:

db = await create_pool(user=user, database='chinook', host='127.0.0.1')
await db.t.artist(limit=3)
[<FRecord artist_id=1 name='AC/DC'>,
 <FRecord artist_id=2 name='Accept'>,
 <FRecord artist_id=3 name='Aerosmith'>]
str(db)
'postgresql://jhoward@127.0.0.1:5432/chinook'