import gc
import os
import time
import numpy as np
import pandas as pd
from pyarrow.compat import guid
import pyarrow as pa
import pyarrow.parquet as pq
import fastparquet as fp
import snappy
def generate_floats(n, pct_null, repeats=1):
= int(n / repeats)
nunique = np.random.randn(nunique)
unique_values
= int(nunique * pct_null)
num_nulls = np.random.choice(nunique, size=num_nulls, replace=False)
null_indices = np.nan
unique_values[null_indices]
return unique_values.repeat(repeats)
= {
DATA_GENERATORS 'float64': generate_floats
}
def generate_data(nrows, ncols, pct_null=0.1, repeats=1, dtype='float64'):
= np.dtype('float64')
type_
= DATA_GENERATORS[dtype]
datagen_func
= {
data 'c' + str(i): datagen_func(nrows, pct_null, repeats)
for i in range(ncols)
}return pd.DataFrame(data)
def write_to_parquet(df, out_path, use_dictionary=True,
='SNAPPY'):
compression= pa.Table.from_pandas(df)
arrow_table
if compression.lower() == 'uncompressed':
= None
compression
=use_dictionary,
pq.write_table(arrow_table, out_path, use_dictionary=compression)
compression
def read_fastparquet(path):
return fp.ParquetFile(path).to_pandas()
def read_pyarrow(path, nthreads=1):
return pq.read_table(path, nthreads=nthreads).to_pandas()
def get_timing(f, path, niter):
= time.clock_gettime(time.CLOCK_MONOTONIC)
start #gc.disable()
for i in range(niter):
f(path)= time.clock_gettime(time.CLOCK_MONOTONIC) - start
elapsed #gc.enable()
return elapsed
= 1 << 20
MEGABYTE = 1024 * MEGABYTE
DATA_SIZE = 16
NCOLS = DATA_SIZE / NCOLS / np.dtype('float64').itemsize
NROWS
= {
cases 'low_entropy_dict': {
'pct_null': 0.1,
'repeats': 1000,
'use_dictionary': True
},'low_entropy': {
'pct_null': 0.1,
'repeats': 1000,
'use_dictionary': False
},'high_entropy_dict': {
'pct_null': 0.1,
'repeats': 1,
'use_dictionary': True
},'high_entropy': {
'pct_null': 0.1,
'repeats': 1,
'use_dictionary': False
} }
NameError: name 'np' is not defined
= 5
NITER
= []
results
= [
readers # ('fastparquet', lambda path: read_fastparquet(path)),
'pyarrow', lambda path: read_pyarrow(path)),
('pyarrow 2 threads', lambda path: read_pyarrow(path, nthreads=2)),
('pyarrow 4 threads', lambda path: read_pyarrow(path, nthreads=4))
(
]
= ['UNCOMPRESSED', 'SNAPPY'] # , 'GZIP']
COMPRESSIONS
= {}
case_files
for case, params in cases.items():
for compression in COMPRESSIONS:
= '{0}_{1}.parquet'.format(case, compression or 'UNCOMPRESSED')
path # df = generate_data(NROWS, NCOLS, repeats=params['repeats'])
# write_to_parquet(df, path, compression=compression,
# use_dictionary=params['use_dictionary'])
# df = None
= path
case_files[case, compression]
for case, params in cases.items():
for compression in COMPRESSIONS:
= case_files[case, compression]
path = compression if compression != 'UNCOMPRESSED' else None
compression
# prime the file cache
read_pyarrow(path)
read_pyarrow(path)
for reader_name, f in readers:
= get_timing(f, path, NITER) / NITER
elapsed = case, compression, reader_name, elapsed
result print(result)
results.append(result)
('low_entropy', None, 'pyarrow', 0.6907784113980597)
('low_entropy', None, 'pyarrow 2 threads', 0.47830765520047863)
('low_entropy', None, 'pyarrow 4 threads', 0.3837389957974665)
('low_entropy', 'SNAPPY', 'pyarrow', 0.7614571548008826)
('low_entropy', 'SNAPPY', 'pyarrow 2 threads', 0.5111582818004535)
('low_entropy', 'SNAPPY', 'pyarrow 4 threads', 0.4159896053984994)
('low_entropy_dict', None, 'pyarrow', 0.4947156649985118)
('low_entropy_dict', None, 'pyarrow 2 threads', 0.34644180460018104)
('low_entropy_dict', None, 'pyarrow 4 threads', 0.2585927422012901)
('low_entropy_dict', 'SNAPPY', 'pyarrow', 0.49566616139782127)
('low_entropy_dict', 'SNAPPY', 'pyarrow 2 threads', 0.33864884299982806)
('low_entropy_dict', 'SNAPPY', 'pyarrow 4 threads', 0.2592433379992144)
('high_entropy', None, 'pyarrow', 1.215046143598738)
('high_entropy', None, 'pyarrow 2 threads', 0.744592010800261)
('high_entropy', None, 'pyarrow 4 threads', 0.49727433520019987)
('high_entropy', 'SNAPPY', 'pyarrow', 1.2913883829984116)
('high_entropy', 'SNAPPY', 'pyarrow 2 threads', 0.8023010949982563)
('high_entropy', 'SNAPPY', 'pyarrow 4 threads', 0.5566735885979142)
('high_entropy_dict', None, 'pyarrow', 1.2103678311978001)
('high_entropy_dict', None, 'pyarrow 2 threads', 0.7619444314012072)
('high_entropy_dict', None, 'pyarrow 4 threads', 0.48606574420118703)
('high_entropy_dict', 'SNAPPY', 'pyarrow', 1.3105062498012559)
('high_entropy_dict', 'SNAPPY', 'pyarrow 2 threads', 0.8178394393995404)
('high_entropy_dict', 'SNAPPY', 'pyarrow 4 threads', 0.5823997781990329)
= [('low_entropy', 'UNCOMPRESSED', 'pyarrow', 0.720387073198799),
results 'low_entropy', 'UNCOMPRESSED', 'pyarrow 2 threads', 0.4857448926020879),
('low_entropy', 'UNCOMPRESSED', 'pyarrow 4 threads', 0.3761960049974732),
('low_entropy', 'SNAPPY', 'pyarrow', 0.7959197070013033),
('low_entropy', 'SNAPPY', 'pyarrow 2 threads', 0.5189196020015515),
('low_entropy', 'SNAPPY', 'pyarrow 4 threads', 0.4060830580012407),
('low_entropy_dict', 'UNCOMPRESSED', 'pyarrow', 0.5112200214003678),
('low_entropy_dict', 'UNCOMPRESSED', 'pyarrow 2 threads', 0.33887453260249456),
('low_entropy_dict', 'UNCOMPRESSED', 'pyarrow 4 threads', 0.24363951059931424),
('low_entropy_dict', 'SNAPPY', 'pyarrow', 0.5163240254012635),
('low_entropy_dict', 'SNAPPY', 'pyarrow 2 threads', 0.34845937459904236),
('low_entropy_dict', 'SNAPPY', 'pyarrow 4 threads', 0.2493806065991521),
('high_entropy', 'UNCOMPRESSED', 'pyarrow', 1.215046143598738),
('high_entropy', 'UNCOMPRESSED', 'pyarrow 2 threads', 0.744592010800261),
('high_entropy', 'UNCOMPRESSED', 'pyarrow 4 threads', 0.49727433520019987),
('high_entropy', 'SNAPPY', 'pyarrow', 1.2913883829984116),
('high_entropy', 'SNAPPY', 'pyarrow 2 threads', 0.8023010949982563),
('high_entropy', 'SNAPPY', 'pyarrow 4 threads', 0.5566735885979142),
('high_entropy_dict', 'UNCOMPRESSED', 'pyarrow', 1.2103678311978001),
('high_entropy_dict', 'UNCOMPRESSED', 'pyarrow 2 threads', 0.7619444314012072),
('high_entropy_dict', 'UNCOMPRESSED', 'pyarrow 4 threads', 0.48606574420118703),
('high_entropy_dict', 'SNAPPY', 'pyarrow', 1.3105062498012559),
('high_entropy_dict', 'SNAPPY', 'pyarrow 2 threads', 0.8178394393995404),
('high_entropy_dict', 'SNAPPY', 'pyarrow 4 threads', 0.5823997781990329)] (
= [('low_entropy', 'UNCOMPRESSED', 'fastparquet', 1.2280616964009823),
results 'low_entropy', 'UNCOMPRESSED', 'pyarrow', 0.720387073198799),
('low_entropy', 'UNCOMPRESSED', 'pyarrow 2 threads', 0.4857448926020879),
('low_entropy', 'UNCOMPRESSED', 'pyarrow 4 threads', 0.3761960049974732),
('low_entropy', 'SNAPPY', 'fastparquet', 1.5866433696006426),
('low_entropy', 'SNAPPY', 'pyarrow', 0.7959197070013033),
('low_entropy', 'SNAPPY', 'pyarrow 2 threads', 0.5189196020015515),
('low_entropy', 'SNAPPY', 'pyarrow 4 threads', 0.4060830580012407),
('low_entropy_dict', 'UNCOMPRESSED', 'fastparquet', 1.0114025997987484),
('low_entropy_dict', 'UNCOMPRESSED', 'pyarrow', 0.5112200214003678),
('low_entropy_dict', 'UNCOMPRESSED', 'pyarrow 2 threads', 0.33887453260249456),
('low_entropy_dict', 'UNCOMPRESSED', 'pyarrow 4 threads', 0.24363951059931424),
('low_entropy_dict', 'SNAPPY', 'fastparquet', 0.995060551800998),
('low_entropy_dict', 'SNAPPY', 'pyarrow', 0.5163240254012635),
('low_entropy_dict', 'SNAPPY', 'pyarrow 2 threads', 0.34845937459904236),
('low_entropy_dict', 'SNAPPY', 'pyarrow 4 threads', 0.2493806065991521)] (
fp
<module 'fastparquet' from '/home/wesm/anaconda3/lib/python3.5/site-packages/fastparquet/__init__.py'>
%matplotlib inline
g.fig.set_tight_layout?
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
= pd.DataFrame.from_records(results, columns=['test_case', 'compression', 'library', 'timing'])
results 'MB/s'] = DATA_SIZE / results['timing'] / MEGABYTE
results[
results'type'] = results['test_case'] + ' ' + results['compression']
results[
= results[~(results.library == 'fastparquet')]
results 'mode'] = results.library
results[
= plt.figure(figsize=(12, 6))
fig
= 'Parquet read performance (longer bars = faster)'
title = sns.factorplot(y='type', x='MB/s', hue='mode', data=results, kind='bar', orient='h', size=(10), legend=False)
g =True)
g.despine(left#g.fig.get_axes()[0].set_xscale('log', basex=2)
12, 6)
g.fig.set_size_inches('Parquet read performance (longer bars = faster)')
plt.title(True)
g.fig.set_tight_layout(# g.fig.legend(loc='best')
0].legend(loc='best')
g.fig.axes[
'parquet_benchmarks.png') plt.savefig(
<matplotlib.figure.Figure at 0x7f2fc7ef8860>
/home/wesm/anaconda3/envs/arrow-test/lib/python3.5/site-packages/matplotlib/figure.py:1742: UserWarning: This figure includes Axes that are not compatible with tight_layout, so its results might be incorrect.
warnings.warn("This figure includes Axes that are not "
%timeit t = read_pyarrow('low_entropy.parquet', nthreads=1)
= fp.ParquetFile('low_entropy.parquet').to_pandas() pf
import ibis
= ibis.hdfs_connect('localhost', port=5070)
hdfs #hdfs.delete('/tmp/parquet-test-1', True)
#hdfs.mkdir('/tmp/parquet-test-1')
# hdfs.put('/tmp/parquet-test-1/low_entropy.parquet', 'low_entropy.parquet')
'/tmp/parquet-test-1/dict_encoded.parquet',
hdfs.put('/home/wesm/code/wesm-blog/notebooks/dict_encoded.parquet')
'/tmp/parquet-test-1/dict_encoded.parquet'
'/tmp/parquet-test-1') hdfs.ls(
['dict_encoded.parquet']
'dict_encoded.parquet').to_pandas().c0.sum() pq.read_table(
-5173.725333513341
import ibis
= ibis.hdfs_connect('localhost', port=5070)
hdfs = ibis.impala.connect('localhost', port=21050, hdfs_client=hdfs)
con
'/tmp/parquet-test-3')
hdfs.mkdir('/tmp/parquet-test-3/data.parquet', 'dict_encoded.parquet')
hdfs.put(
'/tmp/parquet-test-4')
hdfs.mkdir('/tmp/parquet-test-4/data.parquet', 'dict_encoded_impala.parq') hdfs.put(
'/tmp/parquet-test-4/data.parquet'
'/tmp/parquet-test-3').c0.sum().execute() con.parquet_file(
-5173.725333513341
'/tmp/parquet-test-4').c0.sum().execute() con.parquet_file(
-5173.725333513341
'/tmp/parquet-test-2', True)
hdfs.delete('/tmp/parquet-test-2')
hdfs.mkdir('pq_testing_5', pf, location='/tmp/parquet-test-2', format='parquet') con.create_table(
'pq_testing_5').c0.sum().execute() con.table(
NameError: name 'con' is not defined
'/tmp/parquet-test-2') hdfs.ls(
['75479cc886794383-4ae97b96d00adb95_463262430_data.0.parq',
'_impala_insert_staging']
'/tmp/parquet-test-2/75479cc886794383-4ae97b96d00adb95_463262430_data.0.parq', 'dict_encoded_impala.parq') hdfs.get(
'/home/wesm/code/wesm-blog/notebooks/dict_encoded_impala.parq'
# pq.read_table('dict_encoded_impala.parq').to_pandas().c0.sum()
'dict_encoded.parquet').to_pandas().c0.sum() pq.read_table(
-5173.725333513341
import pandas.util.testing as tm
import pyarrow.parquet as pq
= pq.read_table('dict_encoded_impala.parq').to_pandas()
df1 = pq.read_table('dict_encoded.parquet').to_pandas()
df2
tm.assert_frame_equal(df1, df2)
'dict_encoded.parquet').to_pandas() fp.ParquetFile(
0 |
1 |
2 |
3 |
4 |
5 |
6 |
7 |
8 |
9 |
10 |
11 |
12 |
13 |
14 |
15 |
16 |
17 |
18 |
19 |
20 |
21 |
22 |
23 |
24 |
25 |
26 |
27 |
28 |
29 |
... |
130970 |
130971 |
130972 |
130973 |
130974 |
130975 |
130976 |
130977 |
130978 |
130979 |
130980 |
130981 |
130982 |
130983 |
130984 |
130985 |
130986 |
130987 |
130988 |
130989 |
130990 |
130991 |
130992 |
130993 |
130994 |
130995 |
130996 |
130997 |
130998 |
130999 |
131000 rows × 0 columns
'dict_encoded_impala.parq').to_pandas() fp.ParquetFile(
ValueError: cannot assign slice from input of different size
%debug
> /home/wesm/code/ibis/ibis/impala/client.py(275)_wait_synchronous()
273 print('Canceling query')
274 self.cancel()
--> 275 raise
276
277 def is_finished(self):
ipdb> u
> /home/wesm/code/ibis/ibis/impala/client.py(244)execute()
242 return
243 else:
--> 244 self._wait_synchronous()
245
246 def _wait_synchronous(self):
ipdb> u
> /home/wesm/code/ibis/ibis/impala/client.py(133)execute()
131 self.error('Exception caused by {0}: {1}'.format(query,
132 buf.getvalue()))
--> 133 raise
134
135 return cursor
ipdb> p query
'SELECT *\nFROM __ibis_tmp.`__ibis_tmp_edbaa986e05e484283634da60f8556b0`\nLIMIT 1000'
ipdb> q