@patch
def __init__(self:_RecLMClient, *args, **kwargs):
super(_RecLMClient, self).__init__(*args, **kwargs)
self.cpth = Path.cwd() / 'reclm.json'
self.cpth.touch(exist_ok=True)
core
Introduction
When building AI based tooling and packaging we often call LLMs while prototyping and testing our code. A single LLM call can take 100’s of ms to run and the output isn’t deterministic. This can really slow down development especially if our notebook contains many LLM calls 😞.
While LLMs are new, working with external APIs in our code isn’t. Plenty of tooling already exists that make working with APIs much easier. For example, Python’s unittest mock object is commonly used to simulate or mock an API call so that it returns a hardcoded response. This works really well in the traditional Python development workflow and can make our tests fast and predictable.
However, it doesn’t work well in the nbdev workflow where oftentimes we’ll want to quickly run all cells in our notebook while we’re developing our code. While we can use mocks in our test cells we don’t want our exported code cells to be mocked. This leaves us with two choices: - we temporarily mock our exported code cells but undo the mocking before we export these cells. - we do nothing and just live with notebooks that take a long time to run.
Both options are pretty terrible as they pull us out of our flow state and slow down development 😞.
In reclm
we build on the underlying idea of mocks but adapt them to the nbdev workflow.
Mocks
We’ve talked about mocks alot but what do they actually look like? Here’s an example of how to mock a call to OpenAI’s chat completion endpoint and return a hardcoded response Hello, world!
.
from unittest.mock import patch
from openai import OpenAI
with patch('openai.resources.chat.completions.Completions.create') as mc:
0].message.content = "Hello, world!"
mc.return_value.choices[= OpenAI(api_key='api_key')
client = client.chat.completions.create(
r ="gpt-4o",
model=[{"role":"user", "content":"Say hello"}]
messages
)assert r.choices[0].message.content == "Hello, world!"
Writing a mock isn’t too complicated but it does involve additional code and forces us to think of some hardcoded response. Instead, it would be great if we could somehow call our LLM (e.g. OpenAI) once and just re-use the response we get whenever we rerun the same cell while we’re building and testing our code 🔥
How could we call an LLM and save the response? The naive approach would be something like this.
import json
from openai import OpenAI
= OpenAI(api_key='api_key')
client = client.chat.completions.create(
r ="gpt-4o",
model=[{"role":"user", "content":"Say hello"}]
messages
)with open('llm_calls.json', 'w') as f: json.dump(r, f)
While this works, having to consciously run with open('llm_calls.json', 'w') as f: json.dump(r, f)
or some helper method after each llm call is a little tedious. Instead it we would be great if we could do some kind of one-time setup at the beginning of our notebook so that any downstream LLM calls are automatically saved.
Custom HTTP Client
One way we could do this is by creating a custom http client when we initialise our LLM SDK client.
from openai import OpenAI
= OpenAI(api_key='api_key', http_client=RecLMClient()) client
Whenever we run client.chat.completions.create
in a downstream cell it will use our custom client RecLMClient()
when calling the LLM.
Note: we can use the exact same approach and http_client for the Anthropic SDK.
from anthropic import Anthropic
= Anthropic(api_key='api_key', http_client=RecLMClient()) client
Ok, let’s start building our _RecLMClient
.
We’ll be building the _RecLMClient
class incrementally. To help us do that we’ll be using @patch from fastcore. This decorator allows us to easily add new methods to classes.
For example, in the cell below we use @patch
to add an init method to the _RecLMClient
.
The first thing we want our _RecLMClient
to do is simply save the LLM response to a file. To do this we’ll need to customize the http client’s send method.
@patch
def send(self:_RecLMClient, req, **kwargs):
"Run the LLM request `req` and save the response."
= super(_RecLMClient, self).send(req, **kwargs)
resp self.cpth.write_text(json.dumps(resp.json()))
return resp
Great! The next thing we would like to do is to reuse our saved LLM response whenever we re-run the same the LLM call. To do this we’ll need to change the structure of self.cache
to a simple key/value store, where the key is some request hash and the value is the response.
Now, let’s update our send
method.
@patch
def send(self:_RecLMClient, req, **kwargs):
"Fetch `req` from the cache. If it doesn't exist, call the LLM and cache the response."
hash = self._hash(req)
= json.loads(self.cpth.read_text() or '{}')
cache if resp:= cache.get(hash): return httpx.Response(status_code=resp['status_code'], json=resp['response'], request=req)
= super(_RecLMClient, self).send(req, **kwargs)
resp hash] = {'request':self._req_data(req), 'response':resp.json(), 'status_code':resp.status_code}
cache[self.cpth.write_text(json.dumps(cache))
return resp
Great, _RecLMClient
now re-uses the previous LLM call if it exists.
Context Dependent HTTP Client
The next thing we need to figure out is how to use the RecLMClient
only when we’re working in our notebooks or running our test suite.
One way we can do this is by adding cell to the top our notebook which overrides the LLM SDK client.
= OpenAI(api_key='api_key')
client if os.getenv("IN_NOTEBOOK") or os.getenv("IN_TEST"): client = OpenAI(api_key='api_key', http_client=RecLMClient())
This is a little clunky so let’s create a little helper method that patches the OpenAI
and/or Anthropic
clients instead.
def enable_reclm():
"Set the OpenAI and Anthropic `http_client` to the `_RecLMClient`."
def _init(pkg, cls): return getattr(import_module(pkg), cls).__init__
def _inject_http_client(oinit): return lambda *args, **kws: oinit(*args, **kws, http_client=_RecLMClient())
= {'openai': 'OpenAI', 'anthropic': 'Anthropic'}
sdks = [mpatch(f'{pkg}.{cls}.__init__', _inject_http_client(_init(pkg, cls))) for pkg, cls in sdks.items() if ilib_util.find_spec(pkg)]
patches for p in patches: p.start()
return
To use the RecLMClient
while in our notebooks or when running our tests all we need to do is add the following line to the top of our notebook.
enable_reclm()
Note: We don’t need to use any env vars like IN_TEST
because enable_reclm
won’t be used when we export our code.
Concurrent cache access
nbdev_test
tests each notebook in parallel. Depending on the user’s os each notebook might be run in a separate process or thread. We need to update the _RecLMClient
so that cache reads/writes can be done concurrently.
One way we can do this is by creating a lock file. Whenever a thread or process wants to access our cache, it first checks if a lock file exists. If the file doesn’t exist, the thread/process will create the lock file and then access the cache. Once it is finished with the cache the lock file is deleted.
If a thread/process checks for a lock file and one already exists it must wait for a little while and check again. It will keep checking until the file no longer exists and at that point it can go ahead and access the cache.
Now let’s update _RecLMClient
to use our cache lock.
@patch
def __init__(self:_RecLMClient, *args, **kwargs):
super(_RecLMClient,self).__init__(*args, **kwargs)
self.cpth = Path.cwd() / 'reclm.json'
with _CacheLock(self.cpth): self.cpth.touch(exist_ok=True)
@patch
def send(self:_RecLMClient, req, **kwargs):
"Fetch `req` from the cache. If it doesn't exist, call the LLM and cache the response."
hash = self._hash(req)
with _CacheLock(self.cpth): cache = json.loads(self.cpth.read_text() or '{}')
if resp:= cache.get(hash): return httpx.Response(status_code=resp['status_code'], json=resp['response'], request=req)
= super(_RecLMClient,self).send(req, **kwargs)
resp hash] = {'request':self._req_data(req), 'response':resp.json(), 'status_code':resp.status_code}
cache[with _CacheLock(self.cpth): self.cpth.write_text(json.dumps(cache))
return resp
Cache Path
The default cache path uses the current working directory Path.cwd() / 'reclm.json'
. This opens up the possibility of accidentally creating multiple cache files for a single project if we run enable_reclm
from different locations within a project.
reclm is most commonly used within nbdev projects. We can use fastcore’s Config.find
to locate the root directory.
def _cache_dir(): return Config.find('settings.ini').config_path
We should also support non-nbdev projects. In this case let’s fallback to the current working directory.
More generally users should be able to set any directory as their cache directory.
@patch
def __init__(self:_RecLMClient, *args, cache_dir, **kwargs):
super(_RecLMClient,self).__init__(*args, **kwargs)
self.cpth = cache_dir / 'reclm.json'
with _CacheLock(self.cpth): self.cpth.touch(exist_ok=True)
def enable_reclm(cache_dir=None):
"Set the OpenAI and Anthropic `http_client` to the `_RecLMClient`."
if cache_dir and not Path(cache_dir).is_dir(): raise ValueError('`cache_dir` must be a directory.')
def _init(pkg, cls): return getattr(import_module(pkg), cls).__init__
def _inject_http_client(oinit):
return lambda *args,**kws: oinit(*args,**kws,http_client=_RecLMClient(cache_dir=cache_dir or _cache_dir()))
= {'openai': 'OpenAI', 'anthropic': 'Anthropic'}
sdks = [mpatch(f'{pkg}.{cls}.__init__', _inject_http_client(_init(pkg, cls))) for pkg, cls in sdks.items() if ilib_util.find_spec(pkg)]
patches for p in patches: p.start()
return
Streaming
Both OpenAI and Anthropic allow users to stream the response using server-sent events (SSE).
The response is sent back in chunks, which can be processed chunk-by-chunk in a for loop as shown below.
import anthropic
= anthropic.Anthropic()
client
with client.messages.stream(
=1024,
max_tokens=[{"role": "user", "content": "Hello"}],
messages="claude-3-7-sonnet-20250219"
modelas stream:
) for chunk in stream.text_stream: print(chunk, end="", flush=True)
Caching a streamed response is a little tricky because we only receive one chunk at a time.
One way to cache the stream is to convert the multi chunk response into a single chunk response.
To convert the response we need to do the following:
- consume the entire stream in
_RecLMClient.send
- aggregate the chunks into a single chunk
- cache the chunk
- return the streamed response using the single chunk as content
Note: The downside of this approach is that the user will no longer receive multi chunk responses during development.
_RecLMClient.send
_RecLMClient.send (req, **kwargs)
Fetch req
from the cache. If it doesn’t exist, call the LLM and cache the response.
Clean the cache
To make our cache easy to maintain we need a straighforward way to remove any stale requests from old code that no longer exists.
One way we can do this is by creating a script that runs through all our notebooks and stores all responses in an updated cache file. Once we’ve run through every the notebook, we can simply replace the old cache with the updated cache.
The script that iterates over all notebooks will be added in the cli notebook.
For now we need to figure which cache to use in _RecLMClient
. The old cache or the updated cache? Maybe our script could set an env var like CLEAN_RECLM_CACHE
. When this env var is set _RecLMClient
should use the updated cache (e.g. updated_reclm.json
).
_RecLMClient.init
_RecLMClient.__init__ (*args, cache_dir, **kwargs)
Initialize self. See help(type(self)) for accurate signature.
Double Patching
If enable_reclm()
is called twice the following error is thrown
TypeError: anthropic._client.Anthropic.__init__() got multiple values for keyword argument 'http_client'
You might be asking, why would I call enable_reclm()
twice? This can happen unintentionally if your current notebook or file runs enable_reclm()
and it imports code from another notebook or file that also runs enable_reclm()
.
Let’s update enable_reclm
so that it only patches the Anthropic/OpenAI sdks the first time it runs.
= False
_ispatched
def enable_reclm(cache_dir=None):
"Set the OpenAI and Anthropic `http_client` to the `_RecLMClient`."
if cache_dir and not Path(cache_dir).is_dir(): raise ValueError('`cache_dir` must be a directory.')
def _init(pkg, cls): return getattr(import_module(pkg), cls).__init__
def _inject_http_client(oinit):
return lambda *args,**kws: oinit(*args,**kws,http_client=_RecLMClient(cache_dir=cache_dir or _cache_dir()))
= {'openai': 'OpenAI', 'anthropic': 'Anthropic'}
sdks global _ispatched
if _ispatched: return # do not double-patch
= True
_ispatched = [mpatch(f'{pkg}.{cls}.__init__', _inject_http_client(_init(pkg, cls))) for pkg, cls in sdks.items() if ilib_util.find_spec(pkg)]
patches for p in patches: p.start()
return
Async Support
The Anthropic and OpenAI SDKs have async clients. Unfortunately, _RecLMClient
doesn’t support these clients. Let’s create an async version of _RecLMClient
now. This will be almost identical to _RecLMClient
. The main changes are that we’ll be subclassing httpx.AsyncClient
and we’ll be making the send method async.
Next, we need to update enable_reclm
to patch these Anthropic and OpenAI’s async clients.
enable_reclm
enable_reclm (cache_dir=None)
Set the OpenAI and Anthropic http_client
to a reclm client.