import gc
import os
import time

import numpy as np
import pandas as pd
from pyarrow.compat import guid
import pyarrow as pa
import pyarrow.parquet as pq
import fastparquet as fp
import snappy

def generate_floats(n, pct_null, repeats=1):
    nunique = int(n / repeats)
    unique_values = np.random.randn(nunique)
    
    num_nulls = int(nunique * pct_null)
    null_indices = np.random.choice(nunique, size=num_nulls, replace=False)
    unique_values[null_indices] = np.nan
    
    return unique_values.repeat(repeats)

DATA_GENERATORS = {
    'float64': generate_floats
}

def generate_data(nrows, ncols, pct_null=0.1, repeats=1, dtype='float64'):
    type_ = np.dtype('float64')
    
    datagen_func = DATA_GENERATORS[dtype]
    
    data = {
        'c' + str(i): datagen_func(nrows, pct_null, repeats)
        for i in range(ncols)
    }
    return pd.DataFrame(data)

def write_to_parquet(df, out_path, use_dictionary=True,
                     compression='SNAPPY'):
    arrow_table = pa.Table.from_pandas(df)
    
    if compression.lower() == 'uncompressed':
        compression = None
    
    pq.write_table(arrow_table, out_path, use_dictionary=use_dictionary,
                   compression=compression)

def read_fastparquet(path):
    return fp.ParquetFile(path).to_pandas()

def read_pyarrow(path, nthreads=1):
    return pq.read_table(path, nthreads=nthreads).to_pandas()

def get_timing(f, path, niter):
    start = time.clock_gettime(time.CLOCK_MONOTONIC)
    #gc.disable()
    for i in range(niter):
        f(path)
    elapsed = time.clock_gettime(time.CLOCK_MONOTONIC) - start
    #gc.enable()
    return elapsed
MEGABYTE = 1 << 20
DATA_SIZE = 1024 * MEGABYTE
NCOLS = 16
NROWS = DATA_SIZE / NCOLS / np.dtype('float64').itemsize

cases = {
    'low_entropy_dict': {
        'pct_null': 0.1,
        'repeats': 1000,
        'use_dictionary': True
    },
    'low_entropy': {
        'pct_null': 0.1,
        'repeats': 1000,
        'use_dictionary': False
    },
    'high_entropy_dict': {
        'pct_null': 0.1,
        'repeats': 1,
        'use_dictionary': True
    },
    'high_entropy': {
        'pct_null': 0.1,
        'repeats': 1,
        'use_dictionary': False
    }
}
NameError: name 'np' is not defined
NITER = 5

results = []

readers = [
#     ('fastparquet', lambda path: read_fastparquet(path)),
    ('pyarrow', lambda path: read_pyarrow(path)),
    ('pyarrow 2 threads', lambda path: read_pyarrow(path, nthreads=2)),
    ('pyarrow 4 threads', lambda path: read_pyarrow(path, nthreads=4))
]

COMPRESSIONS = ['UNCOMPRESSED', 'SNAPPY'] # , 'GZIP']

case_files = {}

for case, params in cases.items():
    for compression in COMPRESSIONS:
        path = '{0}_{1}.parquet'.format(case, compression or 'UNCOMPRESSED')
#         df = generate_data(NROWS, NCOLS, repeats=params['repeats'])
#         write_to_parquet(df, path, compression=compression, 
#                          use_dictionary=params['use_dictionary'])
#         df = None
        case_files[case, compression] = path

        
for case, params in cases.items():
    for compression in COMPRESSIONS:
        path = case_files[case, compression]
        compression = compression if compression != 'UNCOMPRESSED' else None
        
        # prime the file cache
        read_pyarrow(path)
        read_pyarrow(path)
        
        for reader_name, f in readers:
            elapsed = get_timing(f, path, NITER) / NITER
            result = case, compression, reader_name, elapsed
            print(result)
            results.append(result)
('low_entropy', None, 'pyarrow', 0.6907784113980597)
('low_entropy', None, 'pyarrow 2 threads', 0.47830765520047863)
('low_entropy', None, 'pyarrow 4 threads', 0.3837389957974665)
('low_entropy', 'SNAPPY', 'pyarrow', 0.7614571548008826)
('low_entropy', 'SNAPPY', 'pyarrow 2 threads', 0.5111582818004535)
('low_entropy', 'SNAPPY', 'pyarrow 4 threads', 0.4159896053984994)
('low_entropy_dict', None, 'pyarrow', 0.4947156649985118)
('low_entropy_dict', None, 'pyarrow 2 threads', 0.34644180460018104)
('low_entropy_dict', None, 'pyarrow 4 threads', 0.2585927422012901)
('low_entropy_dict', 'SNAPPY', 'pyarrow', 0.49566616139782127)
('low_entropy_dict', 'SNAPPY', 'pyarrow 2 threads', 0.33864884299982806)
('low_entropy_dict', 'SNAPPY', 'pyarrow 4 threads', 0.2592433379992144)
('high_entropy', None, 'pyarrow', 1.215046143598738)
('high_entropy', None, 'pyarrow 2 threads', 0.744592010800261)
('high_entropy', None, 'pyarrow 4 threads', 0.49727433520019987)
('high_entropy', 'SNAPPY', 'pyarrow', 1.2913883829984116)
('high_entropy', 'SNAPPY', 'pyarrow 2 threads', 0.8023010949982563)
('high_entropy', 'SNAPPY', 'pyarrow 4 threads', 0.5566735885979142)
('high_entropy_dict', None, 'pyarrow', 1.2103678311978001)
('high_entropy_dict', None, 'pyarrow 2 threads', 0.7619444314012072)
('high_entropy_dict', None, 'pyarrow 4 threads', 0.48606574420118703)
('high_entropy_dict', 'SNAPPY', 'pyarrow', 1.3105062498012559)
('high_entropy_dict', 'SNAPPY', 'pyarrow 2 threads', 0.8178394393995404)
('high_entropy_dict', 'SNAPPY', 'pyarrow 4 threads', 0.5823997781990329)
results = [('low_entropy', 'UNCOMPRESSED', 'pyarrow', 0.720387073198799),
('low_entropy', 'UNCOMPRESSED', 'pyarrow 2 threads', 0.4857448926020879),
('low_entropy', 'UNCOMPRESSED', 'pyarrow 4 threads', 0.3761960049974732),
('low_entropy', 'SNAPPY', 'pyarrow', 0.7959197070013033),
('low_entropy', 'SNAPPY', 'pyarrow 2 threads', 0.5189196020015515),
('low_entropy', 'SNAPPY', 'pyarrow 4 threads', 0.4060830580012407),
('low_entropy_dict', 'UNCOMPRESSED', 'pyarrow', 0.5112200214003678),
('low_entropy_dict', 'UNCOMPRESSED', 'pyarrow 2 threads', 0.33887453260249456),
('low_entropy_dict', 'UNCOMPRESSED', 'pyarrow 4 threads', 0.24363951059931424),
('low_entropy_dict', 'SNAPPY', 'pyarrow', 0.5163240254012635),
('low_entropy_dict', 'SNAPPY', 'pyarrow 2 threads', 0.34845937459904236),
('low_entropy_dict', 'SNAPPY', 'pyarrow 4 threads', 0.2493806065991521),
('high_entropy', 'UNCOMPRESSED', 'pyarrow', 1.215046143598738),
('high_entropy', 'UNCOMPRESSED', 'pyarrow 2 threads', 0.744592010800261),
 ('high_entropy', 'UNCOMPRESSED', 'pyarrow 4 threads', 0.49727433520019987),
 ('high_entropy', 'SNAPPY', 'pyarrow', 1.2913883829984116),
 ('high_entropy', 'SNAPPY', 'pyarrow 2 threads', 0.8023010949982563),
 ('high_entropy', 'SNAPPY', 'pyarrow 4 threads', 0.5566735885979142),
 ('high_entropy_dict', 'UNCOMPRESSED', 'pyarrow', 1.2103678311978001),
 ('high_entropy_dict', 'UNCOMPRESSED', 'pyarrow 2 threads', 0.7619444314012072),
 ('high_entropy_dict', 'UNCOMPRESSED', 'pyarrow 4 threads', 0.48606574420118703),
 ('high_entropy_dict', 'SNAPPY', 'pyarrow', 1.3105062498012559),
 ('high_entropy_dict', 'SNAPPY', 'pyarrow 2 threads', 0.8178394393995404),
 ('high_entropy_dict', 'SNAPPY', 'pyarrow 4 threads', 0.5823997781990329)]
results = [('low_entropy', 'UNCOMPRESSED', 'fastparquet', 1.2280616964009823),
('low_entropy', 'UNCOMPRESSED', 'pyarrow', 0.720387073198799),
('low_entropy', 'UNCOMPRESSED', 'pyarrow 2 threads', 0.4857448926020879),
('low_entropy', 'UNCOMPRESSED', 'pyarrow 4 threads', 0.3761960049974732),
('low_entropy', 'SNAPPY', 'fastparquet', 1.5866433696006426),
('low_entropy', 'SNAPPY', 'pyarrow', 0.7959197070013033),
('low_entropy', 'SNAPPY', 'pyarrow 2 threads', 0.5189196020015515),
('low_entropy', 'SNAPPY', 'pyarrow 4 threads', 0.4060830580012407),
('low_entropy_dict', 'UNCOMPRESSED', 'fastparquet', 1.0114025997987484),
('low_entropy_dict', 'UNCOMPRESSED', 'pyarrow', 0.5112200214003678),
('low_entropy_dict', 'UNCOMPRESSED', 'pyarrow 2 threads', 0.33887453260249456),
('low_entropy_dict', 'UNCOMPRESSED', 'pyarrow 4 threads', 0.24363951059931424),
('low_entropy_dict', 'SNAPPY', 'fastparquet', 0.995060551800998),
('low_entropy_dict', 'SNAPPY', 'pyarrow', 0.5163240254012635),
('low_entropy_dict', 'SNAPPY', 'pyarrow 2 threads', 0.34845937459904236),
('low_entropy_dict', 'SNAPPY', 'pyarrow 4 threads', 0.2493806065991521)]
fp
<module 'fastparquet' from '/home/wesm/anaconda3/lib/python3.5/site-packages/fastparquet/__init__.py'>
%matplotlib inline
g.fig.set_tight_layout?
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt

results = pd.DataFrame.from_records(results, columns=['test_case', 'compression', 'library', 'timing'])
results['MB/s'] = DATA_SIZE / results['timing'] / MEGABYTE
results
results['type'] = results['test_case'] + ' ' + results['compression']

results = results[~(results.library == 'fastparquet')]
results['mode'] = results.library

fig = plt.figure(figsize=(12, 6))

title = 'Parquet read performance (longer bars = faster)'
g = sns.factorplot(y='type', x='MB/s', hue='mode', data=results, kind='bar', orient='h', size=(10), legend=False)
g.despine(left=True)
#g.fig.get_axes()[0].set_xscale('log', basex=2)
g.fig.set_size_inches(12, 6)
plt.title('Parquet read performance (longer bars = faster)')
g.fig.set_tight_layout(True)
# g.fig.legend(loc='best')

g.fig.axes[0].legend(loc='best')

plt.savefig('parquet_benchmarks.png')
<matplotlib.figure.Figure at 0x7f2fc7ef8860>
/home/wesm/anaconda3/envs/arrow-test/lib/python3.5/site-packages/matplotlib/figure.py:1742: UserWarning: This figure includes Axes that are not compatible with tight_layout, so its results might be incorrect.
  warnings.warn("This figure includes Axes that are not "

%timeit t = read_pyarrow('low_entropy.parquet', nthreads=1)
pf = fp.ParquetFile('low_entropy.parquet').to_pandas()
import ibis

hdfs = ibis.hdfs_connect('localhost', port=5070)
#hdfs.delete('/tmp/parquet-test-1', True)

#hdfs.mkdir('/tmp/parquet-test-1')
# hdfs.put('/tmp/parquet-test-1/low_entropy.parquet', 'low_entropy.parquet')
hdfs.put('/tmp/parquet-test-1/dict_encoded.parquet', 
         '/home/wesm/code/wesm-blog/notebooks/dict_encoded.parquet')
'/tmp/parquet-test-1/dict_encoded.parquet'
hdfs.ls('/tmp/parquet-test-1')
['dict_encoded.parquet']
pq.read_table('dict_encoded.parquet').to_pandas().c0.sum()
-5173.725333513341
import ibis

hdfs = ibis.hdfs_connect('localhost', port=5070)
con = ibis.impala.connect('localhost', port=21050, hdfs_client=hdfs)

hdfs.mkdir('/tmp/parquet-test-3')
hdfs.put('/tmp/parquet-test-3/data.parquet', 'dict_encoded.parquet')

hdfs.mkdir('/tmp/parquet-test-4')
hdfs.put('/tmp/parquet-test-4/data.parquet', 'dict_encoded_impala.parq')
'/tmp/parquet-test-4/data.parquet'
con.parquet_file('/tmp/parquet-test-3').c0.sum().execute()
-5173.725333513341
con.parquet_file('/tmp/parquet-test-4').c0.sum().execute()
-5173.725333513341
hdfs.delete('/tmp/parquet-test-2', True)
hdfs.mkdir('/tmp/parquet-test-2')
con.create_table('pq_testing_5', pf, location='/tmp/parquet-test-2', format='parquet')
con.table('pq_testing_5').c0.sum().execute()
NameError: name 'con' is not defined
hdfs.ls('/tmp/parquet-test-2')
['75479cc886794383-4ae97b96d00adb95_463262430_data.0.parq',
 '_impala_insert_staging']
hdfs.get('/tmp/parquet-test-2/75479cc886794383-4ae97b96d00adb95_463262430_data.0.parq', 'dict_encoded_impala.parq')
'/home/wesm/code/wesm-blog/notebooks/dict_encoded_impala.parq'
# pq.read_table('dict_encoded_impala.parq').to_pandas().c0.sum()
pq.read_table('dict_encoded.parquet').to_pandas().c0.sum()
-5173.725333513341
import pandas.util.testing as tm
import pyarrow.parquet as pq

df1 = pq.read_table('dict_encoded_impala.parq').to_pandas()
df2 = pq.read_table('dict_encoded.parquet').to_pandas()

tm.assert_frame_equal(df1, df2)
fp.ParquetFile('dict_encoded.parquet').to_pandas()
0
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
...
130970
130971
130972
130973
130974
130975
130976
130977
130978
130979
130980
130981
130982
130983
130984
130985
130986
130987
130988
130989
130990
130991
130992
130993
130994
130995
130996
130997
130998
130999

131000 rows × 0 columns

fp.ParquetFile('dict_encoded_impala.parq').to_pandas()
ValueError: cannot assign slice from input of different size
%debug
> /home/wesm/code/ibis/ibis/impala/client.py(275)_wait_synchronous()
    273             print('Canceling query')
    274             self.cancel()
--> 275             raise
    276 
    277     def is_finished(self):

ipdb> u
> /home/wesm/code/ibis/ibis/impala/client.py(244)execute()
    242             return
    243         else:
--> 244             self._wait_synchronous()
    245 
    246     def _wait_synchronous(self):

ipdb> u
> /home/wesm/code/ibis/ibis/impala/client.py(133)execute()
    131             self.error('Exception caused by {0}: {1}'.format(query,
    132                                                              buf.getvalue()))
--> 133             raise
    134 
    135         return cursor

ipdb> p query
'SELECT *\nFROM __ibis_tmp.`__ibis_tmp_edbaa986e05e484283634da60f8556b0`\nLIMIT 1000'
ipdb> q