from IPython.display import Markdown
from fastcore.test import test_fail, test_eq
import logging
from import urlsave
= ''
url = Path('chinook.sqlite')
path if not path.exists(): urlsave(url, path)
logging.getLogger(= Database("chinook.sqlite") db
Database.t ()
Exported source
class _Getter:
"Abstract class with dynamic attributes providing access to DB objects"
def __init__(self, db): self.db = db
# NB: Define `__dir__` in subclass to get list of objects
def __repr__(self): return ", ".join(dir(self))
def __contains__(self, s): return (s if isinstance(s,str) else in dir(self)
def __iter__(self): return iter(self[dir(self)])
def __getitem__(self, idxs):
if isinstance(idxs,str): return self.db.table(idxs)
return [self.db.table(o) for o in idxs]
def __getattr__(self, k):
if k[0]=='_': raise AttributeError
return self.db[k]
class _TablesGetter(_Getter):
def __dir__(self): return [o for o in self.db.table_names() if not o.startswith('sqlite_')]
def t(self:Database): return _TablesGetter(self)
By returning a _TablesGetter
we get a repr and auto-complete that shows all tables in the DB.
= db.t
dt dt
Album, Artist, Customer, Employee, Genre, Invoice, InvoiceLine, MediaType, Playlist, PlaylistTrack, Track
= dt.Artist
artist artist
<Table Artist (ArtistId, Name)>
This also can be used to get multiple tables at once.
'Album','Artist'] dt[
[<Table Album (AlbumId, Title, ArtistId)>, <Table Artist (ArtistId, Name)>]
You can iterate through it:
for tbl in dt:
if[0]=='A': print(tbl)
<Table Album (AlbumId, Title, ArtistId)>
<Table Artist (ArtistId, Name)>
assert 'Artist' in dt
assert artist in dt
assert 'foo' not in dt
View.c ()
Exported source
class _Col:
def __init__(self, t, c): self.t,self.c = t,c
def __str__(self): return f'"{self.t}"."{self.c}"'
def __repr__(self): return self.c
def __iter__(self): return iter(self.c)
class _ColsGetter:
def __init__(self, tbl): self.tbl = tbl
def __dir__(self): return map(repr, self())
def __call__(self): return [_Col(, for o in self.tbl.columns]
def __contains__(self, s): return (s if isinstance(s,str) else s.c) in self.tbl.columns_dict
def __repr__(self): return ", ".join(dir(self))
def __getattr__(self, k):
if k[0]=='_': raise AttributeError
return _Col(, k)
def c(self:Table): return _ColsGetter(self)
def c(self:View): return _ColsGetter(self)
Table.c ()
Column auto-complete and repr are much the same as tables.
= artist.c
ac ac
ArtistId, Name
Columns stringify in a format suitable for including in SQL statements.
print(f"select {ac.Name} ...")
select "Artist"."Name" ...
View.__str__ ()
Return str(self).
Exported source
def __str__(self:Table): return f'"{}"'
def __str__(self:View): return f'"{}"'
Table.__str__ ()
Return str(self).
Tables and views do the same.
print(f"select {ac.Name} from {artist}")
select "Artist"."Name" from "Artist"
assert 'Name' in ac
assert ac.Name in ac
assert 'foo' not in ac
Queries and views
Database.q (sql:str, params=None)
Exported source
def q(self:Database, sql: str, params=None):
return list(self.query(sql, params=params))
This is a minor shortcut for interactive use.
= db.q(f"select * from {artist} where {ac.Name} like 'AC/%'")
acdc acdc
[{'ArtistId': 1, 'Name': 'AC/DC'}]
Exported source
def _get_flds(tbl):
return [(k, v|None, field(default=tbl.default_values.get(k,None)))
for k,v in tbl.columns_dict.items()]
def _dataclass(self:Table, store=True, suf='')->type:
"Create a `dataclass` with the types and defaults of this table"
= make_dataclass(, _get_flds(self))
flexiclass(res)if store: self.cls = res
return res
= _dataclass Table.dataclass
= artist.dataclass()
artist_dc = artist_dc(**acdc[0])
art1_obj art1_obj
Artist(ArtistId=1, Name='AC/DC')
You can get the definition of the dataclass using fastcore’s dataclass_src
= dataclass_src(artist_dc)
src 'python') hl_md(src,
class Artist:
int | None = None
ArtistId: str | None = None Name:
all_dcs (db, with_views=False, store=True, suf='')
dataclasses for all objects in db
Exported source
def all_dcs(db, with_views=False, store=True, suf=''):
"dataclasses for all objects in `db`"
return [o.dataclass(store=store, suf=suf) for o in list(db.t) + (db.views if with_views else [])]
create_mod (db, mod_fn, with_views=False, store=True, suf='')
Create module for dataclasses for db
Exported source
def create_mod(db, mod_fn, with_views=False, store=True, suf=''):
"Create module for dataclasses for `db`"
= str(mod_fn)
mod_fn if not mod_fn.endswith('.py'): mod_fn+='.py'
with open(mod_fn, 'w') as f:
print('from dataclasses import dataclass', file=f)
for o in all_dcs(db, with_views, store=store, suf=suf): print(dataclass_src(o), file=f)
'db_dc') create_mod(db,
from db_dc import *
**dt.Track.get(1)) Track(
Track(TrackId=1, Name='For Those About To Rock (We Salute You)', AlbumId=1, MediaTypeId=1, GenreId=1, Composer='Angus Young, Malcolm Young, Brian Johnson', Milliseconds=343719, Bytes=11170334, UnitPrice=0.99)
*Built-in mutable sequence.
If no argument is given, the constructor creates a new empty list. The argument must be an iterable if specified.*
Exported source
def __call__(
str|None=None, # SQL where fragment to use, for example `id > ?`
where:|dict|NoneType=None, # Parameters to use with `where`; iterable for `id>?`, or dict for `id>:id`
where_args: Iterablestr|None=None, # Column or fragment of SQL to order by
order_by: int|None=None, # Number of rows to limit to
limit:int|None=None, # SQL offset
offset:str = "*", # Comma-separated list of columns to select
select:bool=False, # Return tuple of (pk,row)?
with_pk:bool=True, # Convert returned dict to stored dataclass?
as_cls:dict|None=None, # Extra constraints
xtra:bool=False, # Only fetch one result
"Shortcut for `rows_where` or `pks_and_rows_where`, depending on `with_pk`"
= getattr(self, 'pks_and_rows_where' if with_pk else 'rows_where')
f if not xtra: xtra = getattr(self, 'xtra_id', {})
if xtra:
= ' and '.join(f"[{k}] = {v!r}" for k,v in xtra.items())
xw = f'{xw} and {where}' if where else xw
where = f(where=where, where_args=where_args, order_by=order_by, limit=limit, offset=offset, select=select, **kwargs)
res if as_cls and hasattr(self,'cls'):
if with_pk: res = ((k,self.cls(**v)) for k,v in res)
else: res = (self.cls(**o) for o in res)
return next(res) if fetchone else list(res)
This calls either rows_where
(if with_pk
) or with_pk
(otherwise). If dataclass(store=True)
has been called, then if as_cls
rows will be returned as dataclass objects.
=2) artist(limit
[Artist(ArtistId=1, Name='AC/DC'), Artist(ArtistId=2, Name='Accept')]
If with_pk
then tuples are returns with PKs 1st.
=True, limit=2) artist(with_pk
[(1, Artist(ArtistId=1, Name='AC/DC')), (2, Artist(ArtistId=2, Name='Accept'))]
1) artist.get(
{'ArtistId': 1, 'Name': 'AC/DC'}
*Built-in mutable sequence.
If no argument is given, the constructor creates a new empty list. The argument must be an iterable if specified.*
Exported source
def fetchone(
str|None=None, # SQL where fragment to use, for example `id > ?`
where:|dict|NoneType=None, # Parameters to use with `where`; iterable for `id>?`, or dict for `id>:id`
where_args: Iterablestr = "*", # Comma-separated list of columns to select
select:bool=True, # Convert returned dict to stored dataclass?
as_cls:dict|None=None, # Extra constraints
"Shortcut for `__call__` that returns one item"
return self(where=where, where_args=where_args, select=select, as_cls=as_cls, xtra=xtra, fetchone=True)
'Name=?', ('AC/DC',)) artist.fetchone(
Artist(ArtistId=1, Name='AC/DC')
Database.set_classes (glb)
Add set all table dataclasses using types in namespace glb
Exported source
def set_classes(self:Database, glb):
"Add set all table dataclasses using types in namespace `glb`"
for tbl in self.t: tbl.cls = glb[]
Album, Artist, Customer, Employee, Genre, Invoice, InvoiceLine, MediaType, Playlist, PlaylistTrack, Track
Database.get_tables (glb)
Add objects for all table objects to namespace glb
Exported source
def get_tables(self:Database, glb):
"Add objects for all table objects to namespace `glb`"
for tbl in self.t: glb['s'] = tbl
db.set_classes(globals()) db.get_tables(
=1) albums(limit
[Album(AlbumId=1, Title='For Those About To Rock We Salute You', ArtistId=1)]
= dt.Album
album = f"""select {album}.*
acca_sql from {album} join {artist} using (ArtistId)
where {ac.Name} like 'AC/%'"""
'sql') hl_md(acca_sql,
select "Album".*
from "Album" join "Artist" using (ArtistId)
where "Artist"."Name" like 'AC/%'
[{'AlbumId': 1,
'Title': 'For Those About To Rock We Salute You',
'ArtistId': 1},
{'AlbumId': 4, 'Title': 'Let There Be Rock', 'ArtistId': 1}]
"AccaDaccaAlbums", acca_sql, replace=True) db.create_view(
<Database <apsw.Connection object "/Users/jhoward/git/fastlite/nbs/chinook.sqlite">>
Database.v ()
Exported source
class _ViewsGetter(_Getter):
def __dir__(self): return self.db.view_names()
def v(self:Database): return _ViewsGetter(self)
= db.v
dv dv
[{'AlbumId': 1,
'Title': 'For Those About To Rock We Salute You',
'ArtistId': 1},
{'AlbumId': 4, 'Title': 'Let There Be Rock', 'ArtistId': 1}]
Exported source
def _parse_typ(t): return t if not (_args:= get_args(t)) else first(_args, bool)
int, None]) _parse_typ(Union[
get_typ (t)
Get the underlying type.
int, None]) get_typ(Union[
int) get_typ(
If you have an Enum
where all the fields are the same type, then _get_typ
will return that type.
class _Test(Enum): foo='val1'; bar=2
class _Test2(Enum): foo='val3'; bar='val4'
# fields are not the same type
<enum '_Test'>
# fields are all of type `str`
Database.create (cls=None, name=None, pk='id', foreign_keys=None, defaults=None, column_order=None, not_null=None, hash_id=None, hash_id_columns=None, extracts=None, if_not_exists=False, replace=False, ignore=True, transform=False, strict=False)
Create table from cls
, default name to snake-case version of class name
Type | Default | Details | |
cls | NoneType | None | Dataclass to create table from |
name | NoneType | None | Name of table to create |
pk | str | id | Column(s) to use as a primary key |
foreign_keys | NoneType | None | Foreign key definitions |
defaults | NoneType | None | Database table defaults |
column_order | NoneType | None | Which columns should come first |
not_null | NoneType | None | Columns that should be created as NOT NULL |
hash_id | NoneType | None | Column to be used as a primary key using hash |
hash_id_columns | NoneType | None | Columns used when calculating hash |
extracts | NoneType | None | Columns to be extracted during inserts |
if_not_exists | bool | False | Use CREATE TABLE IF NOT EXISTS |
replace | bool | False | Drop and replace table if it already exists |
ignore | bool | True | Silently do nothing if table already exists |
transform | bool | False | If table exists transform it to fit schema |
strict | bool | False | Apply STRICT mode to table |
The class you pass to create
is converted to a dataclass where any fields missing a default are defaulted to None
class Nm(Enum): fn='meow'; ln='prr'
class Cat: id: int; name:Nm|None; age: int|None; city: str = "Unknown"
= db.create(Cat)
cats 1) Cat(
Cat(id=1, name=UNSET, age=UNSET, city='Unknown')
[name] TEXT,
[age] INTEGER,
[city] TEXT
<Table cat (id, name, age, city)>
To transform a table after creation, use the .create()
method again, this time with the transform
keyword set to True
class Cat: id: int; name: str; age: int; city: str = "Unknown"; breed: str = "Unknown"
= db.create(Cat, transform=True)
cats cats
<Table cat (id, name, age, city, breed)>
1) Cat(
Cat(id=1, name=UNSET, age=UNSET, city='Unknown', breed='Unknown')
[name] TEXT,
[age] INTEGER,
[city] TEXT,
[breed] TEXT
Database.import_file (table_name, file, format=None, pk=None, alter=False)
Import path or handle file
to new table table_name
This uses sqlite_utils.utils.rows_from_file
to load the file.
= Database(":memory:")
db = "id,name,age\n1,Alice,30\n2,Bob,25"
csv1 = "id,name,age\n3,Charlie,35\n4,David,40"
csv2 = "id,name,age,city\n5,Eve,45,New York"
# import file to new table
= db.import_file("people", csv1)
tbl assert len(tbl()) == 2
# import file to existing table (same schema)
= db.import_file("people", csv2)
tbl assert len(tbl()) == 4
# import file to existing table (schema change fails)
lambda: db.import_file("people", csv3),contains='city')
# import file to existing table (schema change succeeds)
assert 'city' not in tbl.c
= db.import_file("people", csv3, alter=True)
tbl assert 'city' in tbl.c
[{'id': 1, 'name': 'Alice', 'age': 30, 'city': None}, {'id': 2, 'name': 'Bob', 'age': 25, 'city': None}, {'id': 3, 'name': 'Charlie', 'age': 35, 'city': None}, {'id': 4, 'name': 'David', 'age': 40, 'city': None}, {'id': 5, 'name': 'Eve', 'age': 45, 'city': 'New York'}]
Database diagrams
(Requires graphviz.)
= album.foreign_keys[0]
fk fk
ForeignKey(table='Album', column='ArtistId', other_table='Artist', other_column='ArtistId')
diagram (tbls, ratio=0.7, size='10', neato=False, render=True)
Exported source
def _edge(tbl):
return "\n".join(f"{fk.table}:{fk.column} -> {fk.other_table}:{fk.other_column};"
for fk in tbl.foreign_keys)
def _row(col):
= " 🔑" if col.is_pk else ""
xtra = ' bgcolor="#ffebcd"' if col.is_pk else ""
bg return f' <tr><td port="{}"{bg}>{}{xtra}</td></tr>'
def _tnode(tbl):
= "\n".join(_row(o) for o in tbl.columns)
rows = f"""<table cellborder="1" cellspacing="0">
res <tr><td bgcolor="lightgray">{}</td></tr>
return f"{} [label=<{res}>];\n"
Exported source
def diagram(tbls, ratio=0.7, size="10", neato=False, render=True):
= "\nlayout=neato;\noverlap=prism;\noverlap_scaling=0.5;""" if neato else ""
layout = "\n".join(map(_edge, tbls))
edges = "\n".join(map(_tnode, tbls))
= f"""digraph G {{
res rankdir=LR;{layout}
node [shape=plaintext]
return Source(res) if render else res
= Database("chinook.sqlite") db