Recently, I was travelling for a couple of weeks with only an iPad and my phone for compute. The plan was to VPN home and ssh into my desktop to run the docker container that submits my predictions, every weekend.
The plan worked flawlessly for the first week. But from the 2nd week onwards, a dead IoT switch (Belkin Wemo) that I was using to turn my desktop on and off remotely, foiled the plan. Fortunately for me, my spouse was carrying her travel laptop, a 2017 Retina MacBook with a measly 8GB of RAM. I use git to version all my code and git lfs for versioning the model weights, so getting them on the laptop wasn’t a problem. But running my inference container with only ~5GB of usable memory was. My first thought was to spin up a big EC2 instance and run everything off of it. But then I thought it might be worth trying to run small batch inference on the tiny laptop. And it worked quite well. I managed to submit predictions for 2 weeks with it.
Now that I’m back home, I’ve clean up the code a bit and adapted it to run the example model. The code iterates over the tournament data one row at a time in a background thread and emits a DataFrame
every time it crosses an era boundary. The output file is also appended to on an incremental basis. Also, I profiled the code to measure its peak memory usage. I compared naively running inference on the whole tournament dataset with era-wise batched inference. Peak memory usage goes down from 11662.2 MiB
to 463.7 MiB
(96% decrease). Although the inference time goes up from 3:13.16
minutes to 10:46.79
minutes (234% increase). The inference time could perhaps be slightly improved by not counting the number of eras in the tournament dataset. Counting eras is necessary for displaying the tqdm progress bar, but it comes at the cost of having to read the compressed dataset once more.
I thought people people running compute might find it useful, which is why I’m posting it here.
import csv
import lzma
import pathlib
import queue
import re
import threading
import numpy as np
import pandas as pd
import tqdm
import xgboost as xgb
from typing import Any, Callable, IO, List, Set
TOURNAMENT_NAME = "kazutsugi"
PREDICTION_NAME = f"prediction_{TOURNAMENT_NAME}"
SUBMISSION_FILENAME = f"{TOURNAMENT_NAME}_submission.csv"
ERA_RE = re.compile(r"(era(?:X|\d+))")
def open_file(file_path: pathlib.Path, mode: str = "rt") -> IO:
if file_path.suffix == '.xz':
return lzma.open(file_path, mode)
else:
return open(file_path, mode)
def maybe_float16(x: str) -> np.float16:
return np.float16(x if x else np.nan)
def build_conversions(column_names: List[str]) \
-> List[Callable[[str], Any]]:
conversions = []
for name in column_names:
if name.startswith('feature_'):
conversions.append(np.float16)
elif name.startswith('target_'):
conversions.append(maybe_float16)
else:
conversions.append(str)
return conversions
def distinct_eras(file_path: pathlib.Path) -> Set[str]:
eras = set()
chunk_size = (1024 * 10) ** 2 # Read in chunks of 10 MiB
with open_file(file_path) as f:
while True:
s = f.read(chunk_size)
if not s:
break
matches = ERA_RE.findall(s)
eras.update(matches)
if 'eraX' in eras:
break
return eras
def iter_csv(file_path: pathlib.Path):
with open_file(file_path) as f:
reader = csv.reader(f)
column_names = None
era_col_idx = None
conversions = None
current_era = None
df_dict = {}
for row in reader:
if column_names is None:
era_col_idx = row.index('era')
conversions = build_conversions(row)
column_names = row[1:]
continue
row_ = [conversions[idx](x) for idx, x in enumerate(row)]
row_era = row_[era_col_idx]
if current_era is None:
current_era = row_era
elif current_era != row_era:
current_era = row_era
df = pd.DataFrame.from_dict(df_dict, orient='index',
columns=column_names)
df_dict = {}
yield df
df_dict[row_[0]] = row[1:]
if df_dict:
df = pd.DataFrame.from_dict(df_dict, orient='index',
columns=column_names)
yield df
class Prefetcher(threading.Thread):
SENTINEL = ()
def __init__(self, iterator, queue_length=2):
super().__init__()
self.iterator = iterator
self.queue = queue.Queue(queue_length)
def run(self):
for data in self.iterator:
self.queue.put(data)
self.queue.put(self.SENTINEL)
def __iter__(self):
return self
def __next__(self):
data = self.queue.get()
if data is self.SENTINEL:
raise StopIteration
return data
def main():
# The numerai example model
# https://github.com/numerai/example-scripts/blob/master/example_model.py
model = xgb.XGBRegressor()
model.load_model("example_model.xgb")
# Downloaded from:
# https://numerai-public-datasets.s3-us-west-2.amazonaws.com/
p = pathlib.Path("latest_numerai_tournament_data.csv.xz")
eras = distinct_eras(p)
pf = Prefetcher(iter_csv(p))
pf.start()
feature_cols = None
for idx, df in tqdm.tqdm(enumerate(pf), total=len(eras), disable=None):
if feature_cols is None:
feature_cols = [x for x in df.columns if x.startswith('feature_')]
pred = model.predict(df[feature_cols].values)
p = pd.Series(pred, index=df.index).to_frame(PREDICTION_NAME)
if idx == 0:
p.to_csv(SUBMISSION_FILENAME, index_label="id")
else:
p.to_csv(SUBMISSION_FILENAME, mode='a', header=False)
if __name__ == '__main__':
main()