Spark est une plateforme open-source de traitement de données volumineuses. Au fil des années, Spark s'est imposé comme l'outil de référence pour l'ingénierie de données. Dans ce guide ultime, je vous présente PySpark, l'API Python de Spark.
import warnings warnings.filterwarnings("ignore") # Ignore warnings coming from Arrow optimizations.
import pyspark
pyspark.__version__
'3.5.1'
Nous devons importer la classe SparkSession depuis le module pyspark.sql
SparkSession
pyspark.sql
from pyspark.sql import SparkSession
# Créer une session Spark spark = SparkSession.builder \ .appName("Get Started") \ .getOrCreate()
Setting default log level to "WARN". To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel). 24/05/12 17:51:07 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 24/05/12 17:51:07 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
Nous allons préparer une petit dataset, la liste des colonnes et les enregistrements comme liste de tuples
columns = ["id", "product_name", "product_category", "amount", "quantity"] data = [ (123, "Product1", "Category1", 45.67, 8), (124, "Product2", "Category2", 78.23, 3), (125, "Product3", "Category3", 32.45, 6), (126, "Product4", "Category1", 91.12, 5), (127, "Product5", "Category2", 64.78, 7) ]
Nous allons créer le dataframe à partir du dataset
df = spark.createDataFrame(data, schema=columns)
On fait appel à la méthode .show() pour afficher le dataframe.
.show()
df.show()
+---+------------+----------------+------+--------+ | id|product_name|product_category|amount|quantity| +---+------------+----------------+------+--------+ |123| Product1| Category1| 45.67| 8| |124| Product2| Category2| 78.23| 3| |125| Product3| Category3| 32.45| 6| |126| Product4| Category1| 91.12| 5| |127| Product5| Category2| 64.78| 7| +---+------------+----------------+------+--------+
df.select("id").show()
+---+ | id| +---+ |123| |124| |125| |126| |127| +---+
df.sort("quantity").show()
+---+------------+----------------+------+--------+ | id|product_name|product_category|amount|quantity| +---+------------+----------------+------+--------+ |124| Product2| Category2| 78.23| 3| |126| Product4| Category1| 91.12| 5| |125| Product3| Category3| 32.45| 6| |127| Product5| Category2| 64.78| 7| |123| Product1| Category1| 45.67| 8| +---+------------+----------------+------+--------+
df.filter("product_category = 'Category1'").show()
+---+------------+----------------+------+--------+ | id|product_name|product_category|amount|quantity| +---+------------+----------------+------+--------+ |123| Product1| Category1| 45.67| 8| |126| Product4| Category1| 91.12| 5| +---+------------+----------------+------+--------+
Il existe cette synthaxe
df.filter(df["product_category"] == "Category1").show()
agg_df = df.groupby("product_category").agg({"id": "count", "amount": "sum", "quantity": "sum"}) agg_df.show()
+----------------+------------------+-------------+---------+ |product_category| sum(amount)|sum(quantity)|count(id)| +----------------+------------------+-------------+---------+ | Category1|136.79000000000002| 13| 2| | Category2| 143.01| 10| 2| | Category3| 32.45| 6| 1| +----------------+------------------+-------------+---------+
Spark offre la possibilité de manipuler les données avec SQL, c'est l'une de ses forces. Les professionnels des données qui utilisent SQL au quotidien s'y retrouvent assez facilement.
df.createOrReplaceTempView("lu_products") sql_query = "SELECT * FROM lu_products WHERE id > 125" result_df = spark.sql(sql_query) result_df.show()
+---+------------+----------------+------+--------+ | id|product_name|product_category|amount|quantity| +---+------------+----------------+------+--------+ |126| Product4| Category1| 91.12| 5| |127| Product5| Category2| 64.78| 7| +---+------------+----------------+------+--------+
Une fois notre travail terminé, nous devons fermer la session.
spark.stop()
Dans la prochaine section, nous verrons comment importer des données depuis des fichiers.