# Spark

```python
import pyspark

def sort_and_replace_columns():
    """EXAMPLE, which creates new order (different order for different columns) and select only needed columns"""
    session = pyspark.sql.SparkSession.builder.config('spark.sql.shuffle.partitions', 4).getOrCreate()
    df = session.createDataFrame(
        [
            (1, 1, 'a'),
            (2, 3, 'b'),
            (1, 2, 'd'),
            (1, 7, 's'),
            (2, 3, 'q'),
            (1, 5, 'e'),
            (2, 2, 'a')
        ],
        ('user', 'score', 'other column'))
    df.show()
    # >>>
    # +----+-----+------------+
    # |user|score|other column|
    # +----+-----+------------+
    # |   1|    1|           a|
    # |   2|    3|           b|
    # |   1|    2|           d|
    # |   1|    7|           s|
    # |   2|    3|           q|
    # |   1|    5|           e|
    # |   2|    2|           a|
    # +----+-----+------------+

    ndf = df.orderBy(['user', 'score'], ascending=[1, 0])
    ndf.select('user', 'other column', 'score').show()
    # >>>
    # +----+------------+-----+
    # |user|other column|score|
    # +----+------------+-----+
    # |   1|           s|    7|
    # |   1|           e|    5|
    # |   1|           d|    2|
    # |   1|           a|    1|
    # |   2|           b|    3|
    # |   2|           q|    3|
    # |   2|           a|    2|
    # +----+------------+-----+
```

```python
import pyspark
from pyspark.sql import Window, Row, functions as f
from pyspark.sql.types import StructType, StructField, IntegerType, StringType

def read_with_schema():
    """Read csv with schema providing"""
    session = pyspark.sql.SparkSession.builder.config('spark.sql.shuffle.partitions', 4).getOrCreate()
    schema = StructType([
        StructField('KEY', IntegerType(), nullable=False),
        StructField('SKU', IntegerType()),
        StructField('NAME', StringType())])

    # sdf = session.read.csv('/home/vagrant/bra/data/56.csv', sep='|', header=True)
    sdf = session.read.load('/home/vagrant/bra/data/56.csv', sep='|',
                            header=True, schema=schema, escape='"', format='csv')
    sdf.printSchema()
    sdf.show()
```

```python
import urllib.parse

def get_df(path: str, ss: pyspark.sql.SparkSession, token: str, parquet=False) -> pyspark.sql.DataFrame:
    """
    Download DataFrame from Azure Blob

    :param path: path to blob with prefix
    for example "wasbs://perf-account@rmxonngiie.blob.core.windows.net/MAT_train.csv"
    :param ss: SparkSession
    :param parquet: convert flag
    :param token: SAS token to blob
    :return:
    """
    parsed = urllib.parse.urlparse(path)
    netloc = parsed.netloc
    if netloc:
        container_name, full_account_name = netloc.split('@')
        account_name = full_account_name[:full_account_name.find('.')]
        key = "fs.azure.sas.{}.{}.blob.core.windows.net".format(container_name, account_name)
        ss.conf.set(key, token)

    if parquet:
        return ss.read.parquet(path)
    return ss.read.csv(path, header=True)
```

```python
def sdf_to_list(sdf: pyspark.sql.DataFrame, field=None, drop_duplicates=False):
    """
    Helper function to convert sdf's field values to raw python list of values.

    :param field: Field to use. If None, will use first column of dataframe.
    """
    field = field or sdf.columns[0]
    sdf = sdf.select(field)
    if drop_duplicates:
        sdf = sdf.drop_duplicates([field, ])
    return [row[field] for row in sdf.collect()]
```

```python
import pyspark

def dataframe_to_dict():
    session = pyspark.sql.SparkSession.builder.config('spark.sql.shuffle.partitions', 4).getOrCreate()
    user_field = 'user'
    product_field = 'Products'
    df = session.createDataFrame(
        [
            ('user_1', [1,2,3,4,5]),
            ('user_2', [11,22,33,44,55]),
            ('user_3', [1,2,3,4,5]),
            ('user_4', [1,2,3,4,5]),
            ('user_5', [1,2,3,4,5]),
            ('user_6', [1,2,3,4,5])],
        (user_field, product_field))
    sdf = session.createDataFrame(df)
    sdf.show()
    res = sdf.select(user_field, product_field).rdd.collectAsMap()
    print(type(res))
    for user, values in res.items():
        print("{} --> {}".format(user, values))
```

```python
import pyspark
from pyspark.sql import Window, Row, functions as f

def pivote_spark_columns():
    """pivote spark columns"""
    session = pyspark.sql.SparkSession.builder.config('spark.sql.shuffle.partitions', 4).getOrCreate()
    USER_FIELD = 'user'
    SORTING_FIELD = 'sort'
    nrec = 3
    sdf = session.createDataFrame(
        [
            ('user_1', 2, 0.1, '', 1),
            ('user_1', 4, 0.2, 'link4', 2),
            ('user_1', 5, 0.3, 'link5', 3),
            ('user_4', 6, 0.2, 'link6', 5),
            ('user_4', 7, 0.1, 'link7', 4),
            ('user_4', 9, 0.3, '', 6)
        ], (USER_FIELD, 'Product', 'Score', 'Link', SORTING_FIELD))
    sdf.show()
    # >>>
    # +------+-------+-----+-----+----+
    # |  user|Product|Score| Link|sort|
    # +------+-------+-----+-----+----+
    # |user_1|      2|  0.1|     |   1|
    # |user_1|      4|  0.2|link4|   2|
    # |user_1|      5|  0.3|link5|   3|
    # |user_4|      6|  0.2|link6|   5|
    # |user_4|      7|  0.1|link7|   4|
    # |user_4|      9|  0.3|     |   6|
    # +------+-------+-----+-----+----+

    FIELDS_TO_TRANSPOSE = ['Product', 'Score', 'Link']
    new_fields = ['_PRODUCT', '_SCORE', '_LINK']
    window = Window.partitionBy(USER_FIELD).orderBy(SORTING_FIELD)
    for field_name, new_field_name in zip(FIELDS_TO_TRANSPOSE, new_fields):
        sdf = sdf.withColumn(new_field_name, f.collect_list(field_name).over(window))
    sdf = sdf.select(USER_FIELD, *new_fields).filter(f.size(f.col(new_fields[0])) == nrec)

    def field_lambda(field, i):
        """Pivoted columns precessing."""
        return f.col(field)[i - 1].alias('_'.join([field, str(i)]))

    pivoted_columns = [field_lambda(field, i) for i in range(1, nrec + 1) for field in new_fields]
    sdf = sdf.select(USER_FIELD, *pivoted_columns)
    sdf.show()
    # >>>
    # +------+----------+--------+-------+----------+--------+-------+----------+--------+-------+
    # |  user|_PRODUCT_1|_SCORE_1|_LINK_1|_PRODUCT_2|_SCORE_2|_LINK_2|_PRODUCT_3|_SCORE_3|_LINK_3|
    # +------+----------+--------+-------+----------+--------+-------+----------+--------+-------+
    # |user_4|         7|     0.1|  link7|         6|     0.2|  link6|         9|     0.3|       |
    # |user_1|         2|     0.1|       |         4|     0.2|  link4|         5|     0.3|  link5|
    # +------+----------+--------+-------+----------+--------+-------+----------+--------+-------+
```

```python
import pyspark

def find_top_value():
    session = pyspark.sql.SparkSession.builder.config('spark.sql.shuffle.partitions', 4).getOrCreate()
    second_sdf = session.createDataFrame(
        [
            ('user_1', 1, 0.5, 'cf'),
            ('user_1', 1, 0.5, 'cf'),
            ('user_1', 1, 0.5, 'cf'),
            ('user_1', 2, 0.4, 'fb'),
            ('user_4', 5, 0.1, 'fb'),
            ('user_4', 6, 0.1, 'fb'),
            ('user_4', 5, 0.1, 'nf'),
            ('user_4', 5, 0.1, 'nf')],
        ('user', 'product', 'score', 'model')
    )
    second_sdf.show()
    # >>>
    # +------+-------+-----+-----+
    # |  user|product|score|model|
    # +------+-------+-----+-----+
    # |user_1|      1|  0.5|   cf|
    # |user_1|      1|  0.5|   cf|
    # |user_1|      1|  0.5|   cf|
    # |user_1|      2|  0.4|   fb|
    # |user_4|      5|  0.1|   fb|
    # |user_4|      6|  0.1|   fb|
    # |user_4|      5|  0.1|   nf|
    # |user_4|      5|  0.1|   nf|
    # +------+-------+-----+-----+
    users_and_top_models = second_sdf.groupBy('user', 'model').count()
    window = Window.partitionBy('user').orderBy(f.col('count').desc())
    users_and_top_models = users_and_top_models.withColumn('top_models', f.first('count').over(window))
    users_and_top_models = users_and_top_models.filter(f.col('count') == f.col('top_models'))
    users_and_top_models.dropDuplicates(['user']).show()
    # >>>
    # +------+-----+-----+----------+
    # |  user|model|count|top_models|
    # +------+-----+-----+----------+
    # |user_4|   nf|    2|         2|
    # |user_1|   cf|    3|         3|
    # +------+-----+-----+----------+
```


---

# Agent Instructions: Querying This Documentation

If you need additional information that is not directly available in this page, you can query the documentation dynamically by asking a question.

Perform an HTTP GET request on the current page URL with the `ask` query parameter:

```
GET https://bigdata-2.gitbook.io/bd201notes/spark.md?ask=<question>
```

The question should be specific, self-contained, and written in natural language.
The response will contain a direct answer to the question and relevant excerpts and sources from the documentation.

Use this mechanism when the answer is not explicitly present in the current page, you need clarification or additional context, or you want to retrieve related documentation sections.
