UP | HOME
Impaktor

Impaktor

Inject Emacs intravenously

Conserving memory when reading SQL tables in python pandas
Published on Apr 22, 2021 by Impaktor.

For surely the food that memory gives to eat is bitter to the taste, and it
is only with the teeth of hope that we can bear to bite it.

“She”, H. Rider Haggard

Introduction

Herein, I will outline the several “gotchas” I have stumbled upon in my quest to read data from an SQL table using python pandas in a memory efficient way. In my case, the table was 52M rows, 23 columns, and used 900 MB RAM when loaded, but uses north of 12 GB when fetching it from SQL, which was more than the production environment ideally can handle. In this post I detail different solutions that each will reduce memory consumption.

Below is the first generic starting code which I want to improve. All other snippets are additions/modifications of the below code. (It is assumed you have a working connection to SQL from python from before. Here, we use luigi to read in needed parameters.)

import pandas as pd
import pyodbc

# We use luigi to get the SQL user, password, database, and driver.
# It's assumed you can conjure up correct information to establish a
# pyodbc connection to SQL
import luigi

def get_db():
    """Returns connection object to SQL"""
    config = luigi.configuration.get_config()
    kwargs = dict(config.items("database"))
    return pyodbc.connect(**kwargs)

# Table name to be read
table = "my_table"

# Get connection to SQL database
conn = get_db()

# Select only the columns we actually need, from SQL
columns = ["COLUMN01",
           "COLUMN02",
           # ....
           "COLUMN23"]

query = f"""SELECT {', '.join(columns)} FROM {table}"""

df = pd.read_sql(query, conn).reset_index(drop=True)

Since the end goal is to run this in a docker image in production environment, we do all bench marking, such as RAM usage, by monitoring the running docker image, as described in a previous post. We see it reaches 12 GB of RAM use in the figure below (we sample the RAM usage once per second). For interest, we also include CPU usage.

After all the data is loaded from SQL, it is validated using Great Expectations, which is the second half of activity seen below. When this is done, it returns, and the docker image stops, which here takes under 2 minutes.

base.png

Figure 1: Our starting base case. RAM usage hits 12 GB

Improvement 1: Read SQL database in chunks

Pandas’ read_sql() has an argument chunksize, if given it will return the data in chunks. We will then have to concatenate each df, corresponding to the chunks.

Since we are appending, we need to initialize a data frame, and specify the columns.

df = pd.DataFrame(columns=columns)

chunksize = 100_000
for i, df_chunk in enumerate(pd.read_sql(query, conn, chunksize=chunksize)):
    df = pd.concat((df, df_chunk), axis=0, copy=False)
    print(f"{i} Got dataframe with {len(df_chunk):,} rows")

df = df.reset_index(drop=True)

With this modification, RAM usage goes down and now peaks at 8 GB. But this is not one 50th of what we had when we read in all 52M rows at once, why?

chunks.png

Figure 2: Reading SQL in chunks of size 100k.

Improvement 2: Read SQL database in chunks (properly!)

It turns out, when setting chunksize, it just returns it in chunks, it still reads and loads the table all at once, as detailed in this blog post.

This is fixed by using sqlalchemy, instead of pyodbc to establish a connection, and setting stream_results=True (there is an open issue on pandas to do this by default).

from sqlalchemy import create_engine
import luigi

def get_db():
    """Returns connection object to SQL"""
    config = luigi.configuration.get_config()
    kwargs = dict(config.items("database"))

    engine_stmt = ("mssql+pyodbc://%s:%s@%s/%s?driver=%s" % \
                   (kwargs['uid'],
                    kwargs['pwd'],
                    kwargs['server'],
                    kwargs['database'],
                    kwargs['driver'])) # 'ODBC+DRIVER+17+for+SQL+Server'

    # Query in streaming chunk size, to save RAM.
    engine = create_engine(engine_stmt)
    conn = engine.connect().execution_options(stream_results=True)
    return conn

However, this did not get me below 6GB of RAM usage (results not shown).

Improvement 3: Make ’category’ columns work with concatenate

Depending on the data, significant RAM can be saved by casting each column into more suitable data types, especially if few values are repeated often, such that we can cast to category type. However, since we are concatenating the chunked reading, the “top”/original df and the “bottom”/appending one need to be in agreement with which categories map, internally, to which placeholder index, which we now address here.

Strategy to find best dtype for each column

Columns with few integers need not be represented as the default np.int64. E.g. a persons “age”, is a number between 0-127, thus np.int8, or np.uint8, is enough to represent numbers in the interval [-127, 127] or [0, 255], respectively. Similarly for bool, datetime64, etc.

For columns of object type, consisting of only a few unique repeated values, the best representation is as category type, such that they are internally represented by an index + lookup dictionary.

Use df.info() to see full ram usage of the data frame, and df.memory_usage(deep=True, index=False) to break it down on each column, and combine with df.nunique().

Below is a function I wrote to serve this purpose, that returns a df with needed information to make a decision.

def mem_usage(df):
    """Display each column's memory usage, dtype, and number of unique values,
    e.g. for assessing optimal data type for each column.

    Parameters
    ----------
    df : pd.DataFrame
        A pandas data frame, with data to be probed

    Returns
    -------
    pd.DataFrame
        Data frame summarizing the stas of input df.
    """
    mem = df.memory_usage(deep=True, index=False).rename('mem')
    typ = df.dtypes.rename('dtype')
    unq = df.nunique().rename('nunique')
    nan = df.isna().sum().rename('Nans')

    # If N < classes, get relative value count distribution:
    N_class = 5
    stat = pd.Series(index=mem.index, dtype=str).fillna("-").rename('stats')
    for col in unq[unq < N_class].index:
        v = df[col].value_counts() / len(df)
        print(col)
        stat[col] = ", ".join([f"{i:.2}" for i in v])

    print(f"Total memory usage: {mem.sum()/1000**2:,.2f} MB")
    return pd.concat([mem, typ, unq, nan, stat], axis=1)

Applying on our example data

The table below shows a diagnosis of our 52M x 23 data frame. We see that much of the size originates from COLUMN07 - COLUMN11, which are all just repetitions of a few unique values (10, or 31) making them prime candidates for categorical type. Similar for other columns, e.g. the last two.

Table 1: Columns with few Unique values, but large Size are good candidates for category type
Title Unique Size dtypes
COLUMN01 5252294 627963342 object
COLUMN02 21123 421612999 object
COLUMN03 383098 429001287 object
COLUMN04 9407 54018352 datetime64[ns]
COLUMN05 834 54018352 float64
COLUMN06 51882 54018352 float64
COLUMN07 174 422764016 object
COLUMN08 10 390175396 object
COLUMN09 10 459187154 object
COLUMN10 31 396693120 object
COLUMN11 31 445256806 object
COLUMN12 8101 54018352 float64
COLUMN13 11607 54018352 float64
COLUMN14 6081 54018352 float64
COLUMN15 5392 54018352 datetime64[ns]
COLUMN16 4851 54018352 datetime64[ns]
COLUMN17 1411 54018352 float64
COLUMN18 712 627950508 object
COLUMN19 712 422654617 object
COLUMN20 22 54018352 float64
COLUMN21 2 54018352 float64
COLUMN22 3 464442564 object
COLUMN23 5 445651704 object

Reading from SQL and concatenating categorical data frames

The main caveat when concatenating categorical columns, is that we must also combine the underlying lookup dictionary between the known categories (which is different between different data frames), as a category that is only represented in the new appending df will not exist in the original df’s looup dictionary. Below demonstrates how to fix this:

from pandas.api.types import union_categoricals

# Example of columns that should be cast to categorical type
categorical = ["COLUMN07", "COLUMN08", "COLUMN22", "COLUMN23"]

df = pd.DataFrame(columns=columns).astype({col: 'category' for col in categorical})

chunksize = 100_000
for i, df_chunk in enumerate(pd.read_sql(query, conn, chunksize=chunksize)):

     # Cast to categorical, and prepare df for concatenation
     df_chunk = df_chunk.astype({col: 'category' for col in categorical})
     for col in categorical:
         uc = union_categoricals([df[col], df_chunk[col]])
         df[col] = pd.Categorical(df[col], categories=uc.categories)
         df_chunk[col] = pd.Categorical(df_chunk[col], categories=uc.categories)

     df = pd.concat((df, df_chunk), axis=0, copy=False)
     print(f"{i} Got dataframe with {len(df_chunk):,} rows")
df = df.reset_index(drop=True)

This code reduces RAM usage to below 5 GB, for our data.

cat.png

Figure 3: Reading SQL in chunks of size 100k, and casting 7 columns to category type, before concatenating

Improvement 4: Avoid using pandas concatenate!

It turns out that the real offending culprit is pd.concat(), which uses multiple times more RAM than the resulting df. The workaround is to not use it, e.g. by concatenating manually to disk, instead of to RAM, as pd.concat does. This is addressed in this post where they use read_csv(), but since we are reading from sql, we use the code snippet below.

Note: when incrementally fetching data with an OFFSET, it is important the column we are telling SQL to do its ordering on is all unique values, or there will be repeating rows, from the different chunks.

import pyarrow
import pyarrow.parquet as pq

chunksize, chunk = 100_000, 0

# Note: values in ordering column (here: 'COLUMN01') must be all unique:
q = query + " ORDER BY COLUMN01 OFFSET {} ROWS FETCH NEXT {} ROWS ONLY"

while True:
    df_tmp = pd.read_sql_query(q.format(chunksize*chunk, chunksize), conn)

    # Save df to parquet by appending
    table = pyarrow.Table.from_pandas(df_tmp)
    pq.write_to_dataset(table, root_path="disk_data", use_deprecated_int96_timestamps=True)

    print(f"Got chunk {chunk} of size {len(df_tmp):,}")

    if len(df_tmp) < chunksize:
        break
    chunk += 1

df = pd.read_parquet("disk_data").reset_index(drop=True)

Note, "disk_data" here is the name of a folder. One can wrap the block in a with tempfile.TemporaryDirectory() as disk_data block.

This used 500 MiB RAM during reading from SQL, and then a spike just below 4 GB when reading the data back, and validating it using Great Expectations, as mentioned in the Introduction.

disk.png

Figure 4: Reading SQL in chunks of size 100k, and appending to disk.

Conclusion

Comparing all methods together, its more clear that there is also a trade off with speed. Appending to disk is the slowest, but saves the most memory usage. The increase in RAM towards the end is due to Great Expectations reading in the full data and running validations on it.

compare.png

Figure 5: Reading SQL data from SQL - comparison