import numpy
import pyspark
import timeit
import random
import sys
from pyspark.sql import SparkSession
import numpy as np
import pandas as pd

numPartition = 8

def scala_object(jpkg, obj):
    return jpkg.__getattr__(obj + "$").__getattr__("MODULE$")

def time(spark, df, repeat, number):
    print("collect as internal rows")
    time = timeit.repeat(lambda: df._jdf.queryExecution().executedPlan().executeCollect(), repeat=repeat, number=number)
    time_df = pd.Series(time)
    print(time_df.describe())

    print("internal rows to arrow record batch")
    arrow = scala_object(spark._jvm.org.apache.spark.sql, "Arrow")
    root_allocator = spark._jvm.org.apache.arrow.memory.RootAllocator(sys.maxsize)
    internal_rows = df._jdf.queryExecution().executedPlan().executeCollect()
    jschema = df._jdf.schema()
    def internalRowsToArrowRecordBatch():
        rb = arrow.internalRowsToArrowRecordBatch(internal_rows, jschema, root_allocator)
        rb.close()

    time = timeit.repeat(internalRowsToArrowRecordBatch, repeat=repeat, number=number)
    root_allocator.close()
    time_df = pd.Series(time)
    print(time_df.describe())

    print("toPandas with arrow")
    time = timeit.repeat(lambda: df.toPandas(True), repeat=repeat, number=number)
    time_df = pd.Series(time)
    print(time_df.describe())

    print("toPandas without arrow")
    time = timeit.repeat(lambda: df.toPandas(False), repeat=repeat, number=number)
    time_df = pd.Series(time)
    print(time_df.describe())

def next_long():
    return random.randint(0, 10000)

def next_double():
    return random.random()

def genDataLocal(spark, size, columns):
    data = [list([fn() for fn in columns]) for x in range(0, size)]
    df = spark.createDataFrame(data)
    return df

def genData(spark, size, columns):
    rdd = spark.sparkContext\
        .parallelize(range(0, size), numPartition)\
        .map(lambda _: [fn() for fn in columns])
    df = spark.createDataFrame(rdd)
    return df

import gc
import os
import time

import numpy as np
import pandas as pd
from pyarrow.compat import guid
import pyarrow as pa
import pyarrow.parquet as pq

def generate_floats(n, pct_null, repeats=1):
    nunique = int(n / repeats)
    unique_values = np.random.randn(nunique)
    
    num_nulls = int(nunique * pct_null)
    null_indices = np.random.choice(nunique, size=num_nulls, replace=False)
    unique_values[null_indices] = np.nan
    
    return unique_values.repeat(repeats)

DATA_GENERATORS = {
    'float64': generate_floats
}

def generate_data(total_size, ncols, pct_null=0.1, repeats=1, dtype='float64'):
    type_ = np.dtype('float64')
    nrows = total_size / ncols / np.dtype(type_).itemsize
    
    datagen_func = DATA_GENERATORS[dtype]
    
    data = {
        'c' + str(i): datagen_func(nrows, pct_null, repeats)
        for i in range(ncols)
    }
    return pd.DataFrame(data)

def write_to_parquet(df, out_path, use_dictionary=True,
                     compression='SNAPPY'):
    arrow_table = pa.Table.from_pandas(df)
    pq.write_table(arrow_table, out_path, use_dictionary=use_dictionary,
                   compression=compression)
    
def read_parquet(path, nthreads=1):
    return pq.read_table(path, nthreads=nthreads).to_pandas()
import pyarrow as pa
import pyarrow.parquet as pq

data_size = 64 * (1 << 20)
df = generate_data(data_size, 8)

arrow_table = pa.Table.from_pandas(df)
pq.write_table(arrow_table, 'example.parquet', compression='snappy')
spark = (SparkSession.builder
         .config('spark.executor.memory', '4g')
         .config('spark.driver.memory', '4g')
         .config('spark.driver.extraClassPath', '/home/wesm/jars/slf4j-simple-1.7.16.jar')
         .appName("ArrowBenchmark")).getOrCreate()
spark.sparkContext.setLogLevel("INFO")
sqlContext = pyspark.SQLContext(spark)
!ls -l
total 184788
-rw-rw-r-- 1 wesm wesm      2212 Jan 30 16:33 benchmark.py
-rw-r--r-- 1 wesm wesm 125330983 Jan 30 22:40 example2.parquet
-rw-r--r-- 1 wesm wesm  63855263 Jan 30 22:40 example.parquet
-rw-rw-r-- 1 wesm wesm     15449 Feb  9 10:28 PerformanceAnalysis.ipynb
drwxrwxr-x 2 wesm wesm      4096 Feb  8 21:42 spark_parquet_example
drwxrwxr-x 2 wesm wesm      4096 Jan 30 17:08 spark-warehouse
df = sqlContext.read.parquet('/home/wesm/code/wesm-blog/notebooks/20170130Spark/example2.parquet')
df = df.cache()
df.count()
2097152
df.write.parquet('spark_parquet_example')
%%prun -s cumulative
dfs = [df.toPandas() for i in range(5)]
 
%%prun -s cumulative
dfs = [df.toPandas(useArrow=True) for i in range(5)]
 
%%timeit
df = pq.read_table('example2.parquet').to_pandas()
10 loops, best of 3: 175 ms per loop
%memit
gc.collect()
peak memory: 858.83 MiB, increment: 0.00 MiB
0
%memit?
%memit -i 0.001 pdf = df.toPandas()
peak memory: 714.43 MiB, increment: 573.43 MiB
%timeit table = pq.read_table('example2.parquet', nthreads=1)
10 loops, best of 3: 162 ms per loop
import gc
%%memit -i 0.0001
pdf = None
pdf = df.toPandas()
gc.collect()
peak memory: 1223.16 MiB, increment: 1018.20 MiB
%%memit -i 0.0001
pdf = None
pdf = df.toPandas(useArrow=True)
gc.collect()
peak memory: 334.08 MiB, increment: 258.31 MiB
%memit
peak memory: 209.00 MiB, increment: 0.00 MiB
 
pandas_df.memory_usage().sum()
67108944
df = genData(spark, 1000 * 1000, [next_double])
df.cache()
DataFrame[_1: double]
df.count()
1000000
%prun -s cumulative pdf = df.toPandas()
 
%%prun -s cumulative 

frames = [df.toPandas(useArrow=True) for i in range(10)]
 
%time pdf = df.toPandas(useArrow=True)
CPU times: user 68 ms, sys: 12 ms, total: 80 ms
Wall time: 1.38 s
pdf.shape
(1000000, 1)
df.collect()

time(spark, df, 50, 1)
df.unpersist()