Spark
import pyspark
def sort_and_replace_columns():
"""EXAMPLE, which creates new order (different order for different columns) and select only needed columns"""
session = pyspark.sql.SparkSession.builder.config('spark.sql.shuffle.partitions', 4).getOrCreate()
df = session.createDataFrame(
[
(1, 1, 'a'),
(2, 3, 'b'),
(1, 2, 'd'),
(1, 7, 's'),
(2, 3, 'q'),
(1, 5, 'e'),
(2, 2, 'a')
],
('user', 'score', 'other column'))
df.show()
# >>>
# +----+-----+------------+
# |user|score|other column|
# +----+-----+------------+
# | 1| 1| a|
# | 2| 3| b|
# | 1| 2| d|
# | 1| 7| s|
# | 2| 3| q|
# | 1| 5| e|
# | 2| 2| a|
# +----+-----+------------+
ndf = df.orderBy(['user', 'score'], ascending=[1, 0])
ndf.select('user', 'other column', 'score').show()
# >>>
# +----+------------+-----+
# |user|other column|score|
# +----+------------+-----+
# | 1| s| 7|
# | 1| e| 5|
# | 1| d| 2|
# | 1| a| 1|
# | 2| b| 3|
# | 2| q| 3|
# | 2| a| 2|
# +----+------------+-----+
import pyspark
from pyspark.sql import Window, Row, functions as f
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
def read_with_schema():
"""Read csv with schema providing"""
session = pyspark.sql.SparkSession.builder.config('spark.sql.shuffle.partitions', 4).getOrCreate()
schema = StructType([
StructField('KEY', IntegerType(), nullable=False),
StructField('SKU', IntegerType()),
StructField('NAME', StringType())])
# sdf = session.read.csv('/home/vagrant/bra/data/56.csv', sep='|', header=True)
sdf = session.read.load('/home/vagrant/bra/data/56.csv', sep='|',
header=True, schema=schema, escape='"', format='csv')
sdf.printSchema()
sdf.show()
import urllib.parse
def get_df(path: str, ss: pyspark.sql.SparkSession, token: str, parquet=False) -> pyspark.sql.DataFrame:
"""
Download DataFrame from Azure Blob
:param path: path to blob with prefix
for example "wasbs://perf-account@rmxonngiie.blob.core.windows.net/MAT_train.csv"
:param ss: SparkSession
:param parquet: convert flag
:param token: SAS token to blob
:return:
"""
parsed = urllib.parse.urlparse(path)
netloc = parsed.netloc
if netloc:
container_name, full_account_name = netloc.split('@')
account_name = full_account_name[:full_account_name.find('.')]
key = "fs.azure.sas.{}.{}.blob.core.windows.net".format(container_name, account_name)
ss.conf.set(key, token)
if parquet:
return ss.read.parquet(path)
return ss.read.csv(path, header=True)
def sdf_to_list(sdf: pyspark.sql.DataFrame, field=None, drop_duplicates=False):
"""
Helper function to convert sdf's field values to raw python list of values.
:param field: Field to use. If None, will use first column of dataframe.
"""
field = field or sdf.columns[0]
sdf = sdf.select(field)
if drop_duplicates:
sdf = sdf.drop_duplicates([field, ])
return [row[field] for row in sdf.collect()]
import pyspark
def dataframe_to_dict():
session = pyspark.sql.SparkSession.builder.config('spark.sql.shuffle.partitions', 4).getOrCreate()
user_field = 'user'
product_field = 'Products'
df = session.createDataFrame(
[
('user_1', [1,2,3,4,5]),
('user_2', [11,22,33,44,55]),
('user_3', [1,2,3,4,5]),
('user_4', [1,2,3,4,5]),
('user_5', [1,2,3,4,5]),
('user_6', [1,2,3,4,5])],
(user_field, product_field))
sdf = session.createDataFrame(df)
sdf.show()
res = sdf.select(user_field, product_field).rdd.collectAsMap()
print(type(res))
for user, values in res.items():
print("{} --> {}".format(user, values))
import pyspark
from pyspark.sql import Window, Row, functions as f
def pivote_spark_columns():
"""pivote spark columns"""
session = pyspark.sql.SparkSession.builder.config('spark.sql.shuffle.partitions', 4).getOrCreate()
USER_FIELD = 'user'
SORTING_FIELD = 'sort'
nrec = 3
sdf = session.createDataFrame(
[
('user_1', 2, 0.1, '', 1),
('user_1', 4, 0.2, 'link4', 2),
('user_1', 5, 0.3, 'link5', 3),
('user_4', 6, 0.2, 'link6', 5),
('user_4', 7, 0.1, 'link7', 4),
('user_4', 9, 0.3, '', 6)
], (USER_FIELD, 'Product', 'Score', 'Link', SORTING_FIELD))
sdf.show()
# >>>
# +------+-------+-----+-----+----+
# | user|Product|Score| Link|sort|
# +------+-------+-----+-----+----+
# |user_1| 2| 0.1| | 1|
# |user_1| 4| 0.2|link4| 2|
# |user_1| 5| 0.3|link5| 3|
# |user_4| 6| 0.2|link6| 5|
# |user_4| 7| 0.1|link7| 4|
# |user_4| 9| 0.3| | 6|
# +------+-------+-----+-----+----+
FIELDS_TO_TRANSPOSE = ['Product', 'Score', 'Link']
new_fields = ['_PRODUCT', '_SCORE', '_LINK']
window = Window.partitionBy(USER_FIELD).orderBy(SORTING_FIELD)
for field_name, new_field_name in zip(FIELDS_TO_TRANSPOSE, new_fields):
sdf = sdf.withColumn(new_field_name, f.collect_list(field_name).over(window))
sdf = sdf.select(USER_FIELD, *new_fields).filter(f.size(f.col(new_fields[0])) == nrec)
def field_lambda(field, i):
"""Pivoted columns precessing."""
return f.col(field)[i - 1].alias('_'.join([field, str(i)]))
pivoted_columns = [field_lambda(field, i) for i in range(1, nrec + 1) for field in new_fields]
sdf = sdf.select(USER_FIELD, *pivoted_columns)
sdf.show()
# >>>
# +------+----------+--------+-------+----------+--------+-------+----------+--------+-------+
# | user|_PRODUCT_1|_SCORE_1|_LINK_1|_PRODUCT_2|_SCORE_2|_LINK_2|_PRODUCT_3|_SCORE_3|_LINK_3|
# +------+----------+--------+-------+----------+--------+-------+----------+--------+-------+
# |user_4| 7| 0.1| link7| 6| 0.2| link6| 9| 0.3| |
# |user_1| 2| 0.1| | 4| 0.2| link4| 5| 0.3| link5|
# +------+----------+--------+-------+----------+--------+-------+----------+--------+-------+
import pyspark
def find_top_value():
session = pyspark.sql.SparkSession.builder.config('spark.sql.shuffle.partitions', 4).getOrCreate()
second_sdf = session.createDataFrame(
[
('user_1', 1, 0.5, 'cf'),
('user_1', 1, 0.5, 'cf'),
('user_1', 1, 0.5, 'cf'),
('user_1', 2, 0.4, 'fb'),
('user_4', 5, 0.1, 'fb'),
('user_4', 6, 0.1, 'fb'),
('user_4', 5, 0.1, 'nf'),
('user_4', 5, 0.1, 'nf')],
('user', 'product', 'score', 'model')
)
second_sdf.show()
# >>>
# +------+-------+-----+-----+
# | user|product|score|model|
# +------+-------+-----+-----+
# |user_1| 1| 0.5| cf|
# |user_1| 1| 0.5| cf|
# |user_1| 1| 0.5| cf|
# |user_1| 2| 0.4| fb|
# |user_4| 5| 0.1| fb|
# |user_4| 6| 0.1| fb|
# |user_4| 5| 0.1| nf|
# |user_4| 5| 0.1| nf|
# +------+-------+-----+-----+
users_and_top_models = second_sdf.groupBy('user', 'model').count()
window = Window.partitionBy('user').orderBy(f.col('count').desc())
users_and_top_models = users_and_top_models.withColumn('top_models', f.first('count').over(window))
users_and_top_models = users_and_top_models.filter(f.col('count') == f.col('top_models'))
users_and_top_models.dropDuplicates(['user']).show()
# >>>
# +------+-----+-----+----------+
# | user|model|count|top_models|
# +------+-----+-----+----------+
# |user_4| nf| 2| 2|
# |user_1| cf| 3| 3|
# +------+-----+-----+----------+
Last updated
Was this helpful?