Spark

import pyspark

def sort_and_replace_columns():
    """EXAMPLE, which creates new order (different order for different columns) and select only needed columns"""
    session = pyspark.sql.SparkSession.builder.config('spark.sql.shuffle.partitions', 4).getOrCreate()
    df = session.createDataFrame(
        [
            (1, 1, 'a'),
            (2, 3, 'b'),
            (1, 2, 'd'),
            (1, 7, 's'),
            (2, 3, 'q'),
            (1, 5, 'e'),
            (2, 2, 'a')
        ],
        ('user', 'score', 'other column'))
    df.show()
    # >>>
    # +----+-----+------------+
    # |user|score|other column|
    # +----+-----+------------+
    # |   1|    1|           a|
    # |   2|    3|           b|
    # |   1|    2|           d|
    # |   1|    7|           s|
    # |   2|    3|           q|
    # |   1|    5|           e|
    # |   2|    2|           a|
    # +----+-----+------------+

    ndf = df.orderBy(['user', 'score'], ascending=[1, 0])
    ndf.select('user', 'other column', 'score').show()
    # >>>
    # +----+------------+-----+
    # |user|other column|score|
    # +----+------------+-----+
    # |   1|           s|    7|
    # |   1|           e|    5|
    # |   1|           d|    2|
    # |   1|           a|    1|
    # |   2|           b|    3|
    # |   2|           q|    3|
    # |   2|           a|    2|
    # +----+------------+-----+
import pyspark
from pyspark.sql import Window, Row, functions as f
from pyspark.sql.types import StructType, StructField, IntegerType, StringType

def read_with_schema():
    """Read csv with schema providing"""
    session = pyspark.sql.SparkSession.builder.config('spark.sql.shuffle.partitions', 4).getOrCreate()
    schema = StructType([
        StructField('KEY', IntegerType(), nullable=False),
        StructField('SKU', IntegerType()),
        StructField('NAME', StringType())])

    # sdf = session.read.csv('/home/vagrant/bra/data/56.csv', sep='|', header=True)
    sdf = session.read.load('/home/vagrant/bra/data/56.csv', sep='|',
                            header=True, schema=schema, escape='"', format='csv')
    sdf.printSchema()
    sdf.show()
import urllib.parse

def get_df(path: str, ss: pyspark.sql.SparkSession, token: str, parquet=False) -> pyspark.sql.DataFrame:
    """
    Download DataFrame from Azure Blob

    :param path: path to blob with prefix
    for example "wasbs://perf-account@rmxonngiie.blob.core.windows.net/MAT_train.csv"
    :param ss: SparkSession
    :param parquet: convert flag
    :param token: SAS token to blob
    :return:
    """
    parsed = urllib.parse.urlparse(path)
    netloc = parsed.netloc
    if netloc:
        container_name, full_account_name = netloc.split('@')
        account_name = full_account_name[:full_account_name.find('.')]
        key = "fs.azure.sas.{}.{}.blob.core.windows.net".format(container_name, account_name)
        ss.conf.set(key, token)

    if parquet:
        return ss.read.parquet(path)
    return ss.read.csv(path, header=True)
def sdf_to_list(sdf: pyspark.sql.DataFrame, field=None, drop_duplicates=False):
    """
    Helper function to convert sdf's field values to raw python list of values.

    :param field: Field to use. If None, will use first column of dataframe.
    """
    field = field or sdf.columns[0]
    sdf = sdf.select(field)
    if drop_duplicates:
        sdf = sdf.drop_duplicates([field, ])
    return [row[field] for row in sdf.collect()]
import pyspark

def dataframe_to_dict():
    session = pyspark.sql.SparkSession.builder.config('spark.sql.shuffle.partitions', 4).getOrCreate()
    user_field = 'user'
    product_field = 'Products'
    df = session.createDataFrame(
        [
            ('user_1', [1,2,3,4,5]),
            ('user_2', [11,22,33,44,55]),
            ('user_3', [1,2,3,4,5]),
            ('user_4', [1,2,3,4,5]),
            ('user_5', [1,2,3,4,5]),
            ('user_6', [1,2,3,4,5])],
        (user_field, product_field))
    sdf = session.createDataFrame(df)
    sdf.show()
    res = sdf.select(user_field, product_field).rdd.collectAsMap()
    print(type(res))
    for user, values in res.items():
        print("{} --> {}".format(user, values))
import pyspark
from pyspark.sql import Window, Row, functions as f

def pivote_spark_columns():
    """pivote spark columns"""
    session = pyspark.sql.SparkSession.builder.config('spark.sql.shuffle.partitions', 4).getOrCreate()
    USER_FIELD = 'user'
    SORTING_FIELD = 'sort'
    nrec = 3
    sdf = session.createDataFrame(
        [
            ('user_1', 2, 0.1, '', 1),
            ('user_1', 4, 0.2, 'link4', 2),
            ('user_1', 5, 0.3, 'link5', 3),
            ('user_4', 6, 0.2, 'link6', 5),
            ('user_4', 7, 0.1, 'link7', 4),
            ('user_4', 9, 0.3, '', 6)
        ], (USER_FIELD, 'Product', 'Score', 'Link', SORTING_FIELD))
    sdf.show()
    # >>>
    # +------+-------+-----+-----+----+
    # |  user|Product|Score| Link|sort|
    # +------+-------+-----+-----+----+
    # |user_1|      2|  0.1|     |   1|
    # |user_1|      4|  0.2|link4|   2|
    # |user_1|      5|  0.3|link5|   3|
    # |user_4|      6|  0.2|link6|   5|
    # |user_4|      7|  0.1|link7|   4|
    # |user_4|      9|  0.3|     |   6|
    # +------+-------+-----+-----+----+

    FIELDS_TO_TRANSPOSE = ['Product', 'Score', 'Link']
    new_fields = ['_PRODUCT', '_SCORE', '_LINK']
    window = Window.partitionBy(USER_FIELD).orderBy(SORTING_FIELD)
    for field_name, new_field_name in zip(FIELDS_TO_TRANSPOSE, new_fields):
        sdf = sdf.withColumn(new_field_name, f.collect_list(field_name).over(window))
    sdf = sdf.select(USER_FIELD, *new_fields).filter(f.size(f.col(new_fields[0])) == nrec)

    def field_lambda(field, i):
        """Pivoted columns precessing."""
        return f.col(field)[i - 1].alias('_'.join([field, str(i)]))

    pivoted_columns = [field_lambda(field, i) for i in range(1, nrec + 1) for field in new_fields]
    sdf = sdf.select(USER_FIELD, *pivoted_columns)
    sdf.show()
    # >>>
    # +------+----------+--------+-------+----------+--------+-------+----------+--------+-------+
    # |  user|_PRODUCT_1|_SCORE_1|_LINK_1|_PRODUCT_2|_SCORE_2|_LINK_2|_PRODUCT_3|_SCORE_3|_LINK_3|
    # +------+----------+--------+-------+----------+--------+-------+----------+--------+-------+
    # |user_4|         7|     0.1|  link7|         6|     0.2|  link6|         9|     0.3|       |
    # |user_1|         2|     0.1|       |         4|     0.2|  link4|         5|     0.3|  link5|
    # +------+----------+--------+-------+----------+--------+-------+----------+--------+-------+
import pyspark

def find_top_value():
    session = pyspark.sql.SparkSession.builder.config('spark.sql.shuffle.partitions', 4).getOrCreate()
    second_sdf = session.createDataFrame(
        [
            ('user_1', 1, 0.5, 'cf'),
            ('user_1', 1, 0.5, 'cf'),
            ('user_1', 1, 0.5, 'cf'),
            ('user_1', 2, 0.4, 'fb'),
            ('user_4', 5, 0.1, 'fb'),
            ('user_4', 6, 0.1, 'fb'),
            ('user_4', 5, 0.1, 'nf'),
            ('user_4', 5, 0.1, 'nf')],
        ('user', 'product', 'score', 'model')
    )
    second_sdf.show()
    # >>>
    # +------+-------+-----+-----+
    # |  user|product|score|model|
    # +------+-------+-----+-----+
    # |user_1|      1|  0.5|   cf|
    # |user_1|      1|  0.5|   cf|
    # |user_1|      1|  0.5|   cf|
    # |user_1|      2|  0.4|   fb|
    # |user_4|      5|  0.1|   fb|
    # |user_4|      6|  0.1|   fb|
    # |user_4|      5|  0.1|   nf|
    # |user_4|      5|  0.1|   nf|
    # +------+-------+-----+-----+
    users_and_top_models = second_sdf.groupBy('user', 'model').count()
    window = Window.partitionBy('user').orderBy(f.col('count').desc())
    users_and_top_models = users_and_top_models.withColumn('top_models', f.first('count').over(window))
    users_and_top_models = users_and_top_models.filter(f.col('count') == f.col('top_models'))
    users_and_top_models.dropDuplicates(['user']).show()
    # >>>
    # +------+-----+-----+----------+
    # |  user|model|count|top_models|
    # +------+-----+-----+----------+
    # |user_4|   nf|    2|         2|
    # |user_1|   cf|    3|         3|
    # +------+-----+-----+----------+

Last updated