Dans cette section, nous explorerons la capacité de Spark à se connecter à une base de données relationnelle à l'aide de JDBC, en mettant en lumière l'exemple de Postgres.
Afin d'accéder à la base de données Postgres, nous aurons besoin du driver Postgres. Rendez-vous sur la page de téléchargement Download pgJDBC. Une fois le téléchargement terminé, placer-le dans un dossier de votre choix facile d'accès depuis le notebook.
import warnings warnings.filterwarnings("ignore") # Ignore warnings coming from Arrow optimizations.
import pyspark from pyspark.sql import SparkSession
pyspark.__version__
'3.5.1'
# Créer une session Spark spark = SparkSession.builder \ .appName("Work with PostgreSQL") \ .config("spark.driver.extraClassPath", "/home/joekakone/spark/drivers/postgresql-42.7.3.jar") \ .config("spark.executor.extraClassPath", "/home/joekakone/spark/drivers/postgresql-42.7.3.jar") \ .getOrCreate()
Setting default log level to "WARN". To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel). 24/05/12 17:32:19 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 24/05/12 17:32:20 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
Faites ip route dans le terminal pour obtenir l'adresse
ip route
joekakone@UTILISA-GII29BR:~$ ip route default via 172.31.16.1 dev eth0 proto kernel 172.31.16.0/20 dev eth0 proto kernel scope link src 172.31.27.239
## Database Infos & Credentials HOST = "172.31.16.1" # Windows localhost (Ubuntu WSL) PORT = "5432" DATABASE = "postgres" USERNAME = "postgres" PASSWORD = "admin"
postgresql_url = f"jdbc:postgresql://{HOST}:{PORT}/{DATABASE}" postgresql_long_url = f"jdbc:postgresql://{HOST}:{PORT}/{DATABASE}?user={USERNAME}&password={PASSWORD}" connection_properties = { # "user": USERNAME, # "password": PASSWORD, "driver": "org.postgresql.Driver" } print(postgresql_url)
jdbc:postgresql://172.31.16.1:5432/postgres
Nous devons importer la classe SparkSession depuis le module pyspark.sql
SparkSession
pyspark.sql
table_name = "public.covid19_daily_kpi" df = spark.read.jdbc(url=postgresql_long_url, table=table_name, properties=connection_properties) df.show()
+----------+-------------+--------------------+--------------------+---------+------+---------+------+--------------------+ | dt| country| latitude| longitude|confirmed|deaths|recovered|active| load_datetime| +----------+-------------+--------------------+--------------------+---------+------+---------+------+--------------------+ |2021-06-18| Benin|9.307700000000000000|2.315800000000000000| 8140| 103| 7979| 58|2023-06-18 22:56:...| |2021-06-18| Burkina Faso|12.23830000000000...|-1.56160000000000...| 13460| 167| 13287| 6|2023-06-18 22:56:...| |2021-06-18| Cabo Verde|16.53880000000000...|-23.0418000000000...| 31858| 280| 30796| 782|2023-06-18 22:56:...| |2021-06-18|Cote d'Ivoire|7.540000000000000000|-5.54710000000000...| 47973| 306| 47346| 321|2023-06-18 22:56:...| |2021-06-18| Gambia|13.44320000000000...|-15.3101000000000...| 6024| 181| 5827| 16|2023-06-18 22:56:...| |2021-06-18| Ghana|7.946500000000000000|-1.02320000000000...| 94824| 790| 92806| 1228|2023-06-18 22:56:...| |2021-06-18| Guinea|9.945600000000000000|-9.69660000000000...| 23431| 167| 21488| 1776|2023-06-18 22:56:...| |2021-06-18|Guinea-Bissau|11.80370000000000...|-15.1804000000000...| 3819| 69| 3553| 197|2023-06-18 22:56:...| |2021-06-18| Liberia|6.428055000000000500|-9.42949900000000...| 2729| 95| 2105| 529|2023-06-18 22:56:...| |2021-06-18| Mali|17.57069200000000...|-3.99616600000000...| 14364| 523| 10001| 3840|2023-06-18 22:56:...| |2021-06-18| Niger|17.60778900000000...|8.081666000000000000| 5457| 193| 5178| 86|2023-06-18 22:56:...| |2021-06-18| Nigeria|9.082000000000000000|8.675300000000000000| 167142| 2117| 163535| 1490|2023-06-18 22:56:...| |2021-06-18| Senegal|14.49740000000000...|-14.4524000000000...| 42206| 1158| 40707| 341|2023-06-18 22:56:...| |2021-06-18| Sierra Leone|8.460555000000001000|-11.7798890000000...| 4553| 82| 3208| 1263|2023-06-18 22:56:...| |2021-06-18| Togo|8.619500000000000000|0.824800000000000000| 13682| 127| 13334| 221|2023-06-18 22:56:...| |2021-06-21| Benin|9.307700000000000000|2.315800000000000000| 8140| 103| 7979| 58|2023-06-21 04:42:...| |2021-06-21| Burkina Faso|12.23830000000000...|-1.56160000000000...| 13469| 167| 13293| 9|2023-06-21 04:42:...| |2021-06-21| Cabo Verde|16.53880000000000...|-23.0418000000000...| 32002| 283| 30988| 731|2023-06-21 04:42:...| |2021-06-21|Cote d'Ivoire|7.540000000000000000|-5.54710000000000...| 48044| 308| 47445| 291|2023-06-21 04:42:...| |2021-06-21| Gambia|13.44320000000000...|-15.3101000000000...| 6024| 181| 5827| 16|2023-06-21 04:42:...| +----------+-------------+--------------------+--------------------+---------+------+---------+------+--------------------+ only showing top 20 rows
table_name = "public.covid19_daily_kpi" df = spark.read.format("jdbc") \ .option("url", postgresql_long_url) \ .option("dbtable", table_name) \ .load() df.show()
sql_query = "(select * from public.covid19_daily_kpi) as sq" df = spark.read.format("jdbc") \ .option("url", postgresql_url) \ .option("dbtable", sql_query) \ .option("user", USERNAME) \ .option("password", PASSWORD) \ .load() df.show()
Uen fois vos mainupulations terminées, Spark offre la possibilité d'exporter les données dans une table dans la base de données pour un usage ultérieur.
destination_table = "public.saprk_table" colums_types = """ dt date, country varchar(200), latitude numeric, longitude numeric, confirmed integer, deaths integer, recovered integer, active integer, load_datetime timestamp """ df.write \ .option("createTableColumnTypes", colums_types) \ .jdbc(postgresql_long_url, destination_table)
Pour vérifier que l'exportation a été bien effectuée, on va importer la table
df = spark.read.format("jdbc") \ .option("url", postgresql_long_url) \ .option("dbtable", destination_table) \ .load() df.show()
+----------+-------------+--------+---------+---------+------+---------+------+--------------------+ | dt| country|latitude|longitude|confirmed|deaths|recovered|active| load_datetime| +----------+-------------+--------+---------+---------+------+---------+------+--------------------+ |2021-06-18| Benin| 9| 2| 8140| 103| 7979| 58|2023-06-18 22:56:...| |2021-06-18| Burkina Faso| 12| -2| 13460| 167| 13287| 6|2023-06-18 22:56:...| |2021-06-18| Cabo Verde| 17| -23| 31858| 280| 30796| 782|2023-06-18 22:56:...| |2021-06-18|Cote d'Ivoire| 8| -6| 47973| 306| 47346| 321|2023-06-18 22:56:...| |2021-06-18| Gambia| 13| -15| 6024| 181| 5827| 16|2023-06-18 22:56:...| |2021-06-18| Ghana| 8| -1| 94824| 790| 92806| 1228|2023-06-18 22:56:...| |2021-06-18| Guinea| 10| -10| 23431| 167| 21488| 1776|2023-06-18 22:56:...| |2021-06-18|Guinea-Bissau| 12| -15| 3819| 69| 3553| 197|2023-06-18 22:56:...| |2021-06-18| Liberia| 6| -9| 2729| 95| 2105| 529|2023-06-18 22:56:...| |2021-06-18| Mali| 18| -4| 14364| 523| 10001| 3840|2023-06-18 22:56:...| |2021-06-18| Niger| 18| 8| 5457| 193| 5178| 86|2023-06-18 22:56:...| |2021-06-18| Nigeria| 9| 9| 167142| 2117| 163535| 1490|2023-06-18 22:56:...| |2021-06-18| Senegal| 14| -14| 42206| 1158| 40707| 341|2023-06-18 22:56:...| |2021-06-18| Sierra Leone| 8| -12| 4553| 82| 3208| 1263|2023-06-18 22:56:...| |2021-06-18| Togo| 9| 1| 13682| 127| 13334| 221|2023-06-18 22:56:...| |2021-06-21| Benin| 9| 2| 8140| 103| 7979| 58|2023-06-21 04:42:...| |2021-06-21| Burkina Faso| 12| -2| 13469| 167| 13293| 9|2023-06-21 04:42:...| |2021-06-21| Cabo Verde| 17| -23| 32002| 283| 30988| 731|2023-06-21 04:42:...| |2021-06-21|Cote d'Ivoire| 8| -6| 48044| 308| 47445| 291|2023-06-21 04:42:...| |2021-06-21| Gambia| 13| -15| 6024| 181| 5827| 16|2023-06-21 04:42:...| +----------+-------------+--------+---------+---------+------+---------+------+--------------------+ only showing top 20 rows
Une fois notre travail terminé, nous devons fermer la session.
spark.stop()
Dans la prochaine section, nous verrons comment importer des données depuis des fichiers.