import pyspark
import timeit
import random
import sys
from pyspark.sql import SparkSession
import numpy as np
import pandas as pd
numPartition = 8
def scala_object(jpkg, obj):
return jpkg.__getattr__(obj + "$").__getattr__("MODULE$")
def time(spark, df, repeat, number):
print("collect as internal rows")
time = timeit.repeat(lambda: df._jdf.queryExecution().executedPlan().executeCollect(), repeat=repeat, number=number)
time_df = pd.Series(time)
print(time_df.describe())
print("internal rows to arrow record batch")
arrow = scala_object(spark._jvm.org.apache.spark.sql, "Arrow")
root_allocator = spark._jvm.org.apache.arrow.memory.RootAllocator(sys.maxsize)
internal_rows = df._jdf.queryExecution().executedPlan().executeCollect()
jschema = df._jdf.schema()
def internalRowsToArrowRecordBatch():
rb = arrow.internalRowsToArrowRecordBatch(internal_rows, jschema, root_allocator)
rb.close()
time = timeit.repeat(internalRowsToArrowRecordBatch, repeat=repeat, number=number)
root_allocator.close()
time_df = pd.Series(time)
print(time_df.describe())
print("toPandas with arrow")
time = timeit.repeat(lambda: df.toPandas(True), repeat=repeat, number=number)
time_df = pd.Series(time)
print(time_df.describe())
print("toPandas without arrow")
time = timeit.repeat(lambda: df.toPandas(False), repeat=repeat, number=number)
time_df = pd.Series(time)
print(time_df.describe())
def next_long():
return random.randint(0, 10000)
def next_double():
return random.random()
def genDataLocal(spark, size, columns):
data = [list([fn() for fn in columns]) for x in range(0, size)]
df = spark.createDataFrame(data)
return df
def genData(spark, size, columns):
rdd = spark.sparkContext\
.parallelize(range(0, size), numPartition)\
.map(lambda _: [fn() for fn in columns])
df = spark.createDataFrame(rdd)
return df
import gc
import os
import time
import numpy as np
import pandas as pd
from pyarrow.compat import guid
import pyarrow as pa
import pyarrow.parquet as pq
def generate_floats(n, pct_null, repeats=1):
nunique = int(n / repeats)
unique_values = np.random.randn(nunique)
num_nulls = int(nunique * pct_null)
null_indices = np.random.choice(nunique, size=num_nulls, replace=False)
unique_values[null_indices] = np.nan
return unique_values.repeat(repeats)
DATA_GENERATORS = {
'float64': generate_floats
}
def generate_data(total_size, ncols, pct_null=0.1, repeats=1, dtype='float64'):
type_ = np.dtype('float64')
nrows = total_size / ncols / np.dtype(type_).itemsize
datagen_func = DATA_GENERATORS[dtype]
data = {
'c' + str(i): datagen_func(nrows, pct_null, repeats)
for i in range(ncols)
}
return pd.DataFrame(data)
def write_to_parquet(df, out_path, use_dictionary=True,
compression='SNAPPY'):
arrow_table = pa.Table.from_pandas(df)
pq.write_table(arrow_table, out_path, use_dictionary=use_dictionary,
compression=compression)
def read_parquet(path, nthreads=1):
return pq.read_table(path, nthreads=nthreads).to_pandas()