Data pipeline with PySpark and AWS

Contents

This article was published as part of the Data Science Blogathon

Introduction

Apache Spark is a framework used in cluster computing environments to analyze big data. Apache Spark can work in a distributed environment on a group of computers in a cluster to more effectively process large data sets. This open source Spark engine supports a wide range of programming languages, including Scala, Java, R y Python.

In this article, I will show you how to start installing Pyspark on your Ubuntu machine and then build a basic ETL pipeline to extract transfer load data from a remote RDBMS system to a AWS S3 Bucket.

This ETL architecture can be used to transfer hundreds of Gigabytes of data from any RDBMS database server. (in this article we have used MS SQL Server) has an Amazon S3 bucket.

Key benefits of using Apache Spark:

  • Run workloads 100 times faster than Hadoop
  • Java compatible, Scala, Python, R y SQL
90411etl_arch-8849620

Source: This is an original image.

Requirements

To start, we must have the following prerequisites:

  • A system running Ubuntu 18.04 o Ubuntu 20.04
  • A user account with sudo privileges
  • An AWS account with upload access to the S3 bucket

Before downloading and configuring Spark, you must install the necessary package dependencies. Make sure the following packages are already configured on your system.

To confirm the installed dependencies by running these commands:

java -version; git --version; python --version
21657versions-6391307

Install PySpark

Download the version of Spark you want from the official Apache website. We will download Spark 3.0.3 con Hadoop 2.7 since it is the current version. Then, use wget command and direct url to download Spark package.

Change your working directory to / opt / spark.

cd /opt/spark
sudo wget https://downloads.apache.org/spark/spark-3.0.3/spark-3.0.3-bin-hadoop2.7.tgz 
17735spark-9367142

Extract the saved package using the tar command. Once the spreading process is complete, the output shows the files that have been unzipped from the archive.

tar xvf spark-*
ls -lrt spark-*
75265tar-8448130

Set up the Spark environment

Before starting a Spark server, we need to set some environment variables. There are some Spark directories that we need to add to the default profile. Use the vi editor or any other editor to add these three lines to .profile:

vi ~ / .profile

Insert these 3 lines at the end of the .profile file.

export SPARK_HOME=/opt/spark 
export PATH=$PATH:$SPARK_HOME/bin:$SPARK_HOME/sbin 
export PYSPARK_PYTHON=/usr/bin/python3

Save the changes and exit the editor. When you finish editing the file, load the .profile file on the command line by typing. Alternatively, we can exit the server and log back in for the changes to take effect.

source ~/.profile
20831spark-env-4356444

Start / Stop Spark Master & Worker

Go to the Spark installation directory / opt / spark / spark *. It has all the necessary scripts to start / stop Spark services.

Run this command to start Spark Master.

start-master.sh

To view the Spark web interface, open a web browser and enter the IP address of the local host in the port 8080. (This is the default port that Spark uses if you need to change it, do it in the start-master.sh script). Alternatively, can replace 127.0.0.1 with the actual network IP address of your host machine.

http://127.0.0.1:8080/

The web page shows the Spark Master URL, worker nodes, CPU resource utilization, the memory, running applications, etc.

17629s-master-4266446

Now, run this command to start a Spark worker instance.

start-slave.sh spark://0.0.0.0:8082

O

start-slave.sh spark://waplgmdalin_lab01:8082

The worker's own website runs on http://127.0.0.1:8084/ but it must be linked to the teacher. This is why we pass the Spark master url as a parameter to the start-slave.sh script. To confirm if the worker is correctly linked to the master, open the link in a browser.

28935spark-worker-2237238

Assigning Resources to the Spark Worker

By default, when you start a worker instance, uses all available cores on the machine. But nevertheless, for practical reasons, you may want to limit the number of cores and the amount of RAM allocated to each worker.

 start-slave.sh spark://0.0.0.0:8082 -c 4 -m 512M

Here, we have assigned 4 cores Y 512 MB RAM to worker. Let's confirm this by restarting the worker instance.

26003spark-worker2-8169059

To stop the master instance started by running the above script, run:

stop-master.sh

To stop a running worker process, enter this command:

stop-slave.sh

Configure MS SQL connection

In this PySpark ETL, we will connect to an MS SQL server instance as source system and run SQL queries to get data. Then, first we have to download the necessary dependencies.

Download the MS-SQL jar file (mssql-jdbc-9.2.1.jre8) from the Microsoft website and copy it to the directory “/ opt / spark / jars”.

https://www.microsoft.com/en-us/download/details.aspx?id=11774

Download the Spark SQL jar file (chispa-sql_2.12-3.0.3.jar) from the Apache download site and copy it to the ‘/ opt directory / spark / jars ”.

https://jar-download.com/?search_box=org.apache.spark+spark.sql

Edit the .profile, add PySpark and Py4J classes to Python path:

export PYTHONPATH=$SPARK_HOME/python/:$PYTHONPATH
export PYTHONPATH=$SPARK_HOME/python/lib/py4j-0.10.9-src.zip:$PYTHONPATH

Configure the AWS S3 connection

To connect to an AWS instance, we need to download the three jar files and copy them to the directory “/ opt / spark / jars”. Check the version of Hadoop you are currently using. You can get it from any jar present in your Spark installation. If the Hadoop version is 2.7.4, download the jar file for the same version. Para Java SDK, you must download the same version that was used to generate the Hadoop-aws package.

Make sure the versions are the latest.

  • hadoop-aws-2.7.4.jar
  • aws-java-sdk-1.7.4.jar
  • jets3t-0.9.4.jar
sudo wget https://repo1.maven.org/maven2/com/amazonaws/aws-java-sdk/1.11.30/aws-java-sdk-1.7.4.jar
sudo wget https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aws/2.7.3/hadoop-aws-2.7.4.jar
sudo wget https://repo1.maven.org/maven2/net/java/dev/jets3t/jets3t/0.9.4/jets3t-0.9.4.jar

Python development

Create a working directory called 'scripts’ to store all python scripts and configuration files. Create a file called “sqlfile.py” which will contain the SQL queries that we want to execute on the remote database server.

vi sqlfile.py

Insert the following SQL query in the sqlfile.py file which will extract the data. Before this step, it is recommended to test run this SQL query on the server to get an idea of ​​the number of records returned.

query1 = """(select * from sales-data where date >= '2021-01-01' and status="Completed")"""

Save and exit the file.

Create a configuration file called “config.ini” which will store login credentials and database parameters.

vi config.ini

Insert the following AWS and MSSQL connection parameters in the file. Note that we have created separate sections to store the AWS and MSSQL connection parameters. You can create as many DB connection instances as required, as long as each one is kept in its own section (mssql1, mssql2, aws1, aws2, etc.).

[aws]

ACCESS_KEY=BBIYYTU6L4U47BGGG&^CF
SECRET_KEY=Uy76BBJabczF7h6vv+BFssqTVLDVkKYN/f31puYHtG
BUCKET_NAME=s3-bucket-name
DIRECTORY=sales-data-directory

[mssql]
url = jdbc:sqlserver://PPOP24888S08YTA.APAC.PAD.COMPANY-DSN.COM:1433;databaseName=Transactions
database = Transactions
user = MSSQL-USER
password = MSSQL-Password
dbtable = sales-data
filename = data_extract.csv

Save and exit the file.

Create a Python script called “Data-Extraction.py”.

Import libraries for Spark and Boto3

Spark is implemented in Scala, a language that runs in the JVM, but since we are working with Python we will use PySpark. The current version of PySpark is 2.4.3 and it works with python 2.7, 3.3 and higher. You can think of PySpark as a Python based container on top of the Scala API.

Here, AWS SDK for Python (Boto3) to create, configure and manage AWS services, such as Amazon EC2 and Amazon S3. The SDK provides an object-oriented API, as well as low-level access to AWS services.

Import the Python libraries to start a Spark session, query1 from sqlfile.py and boto3.

from pyspark.sql import SparkSession
import shutil
import os
import glob
import boto3
from sqlfile import query1
from configparser import ConfigParser

Create a SparkSession

SparkSession provides a single entry point to interact with the underlying Spark engine and enables Spark programming with DataFrame and Dataset APIs. Most importantly, it restricts the number of concepts and builds a developer has to work with while interacting with Spark.. In this point, you can use the ‘Spark – spark’ variable as your instance object to access your public methods and instances for the duration of your Spark job. Give the app a name.

appName = "PySpark ETL Example - via MS-SQL JDBC"
master = "local"
spark = SparkSession 
    .builder 
    .master(master) 
    .appName(appName) 
    .config("spark.driver.extraClassPath","/opt/spark/jars/mssql-jdbc-9.2.1.jre8.jar") 
    .getOrCreate()

Read the configuration file

We have stored the parameters in a file “config.ini” to separate static parameters from Python code. This helps to write cleaner code without any coding. Este module implements a basic configuration language that provides a structure similar to what we see in Microsoft Windows .ini files.

url = config.get('mssql-onprem', 'url')
user = config.get('mssql-onprem', 'user')
password  = config.get('mssql-onprem', 'password')
dbtable =  config.get('mssql-onprem', 'dbtable')
filename =  config.get('mssql-onprem', 'filename')
ACCESS_KEY=config.get('aws', 'ACCESS_KEY')

SECRET_KEY=config.get('aws', 'SECRET_KEY')

BUCKET_NAME=config.get('aws', 'BUCKET_NAME')

DIRECTORY=config.get('aws', 'DIRECTORY')

Run data extraction

Spark includes a data source that can read data from other databases using JDBC. Run SQL on the remote database connecting using Microsoft SQL Server JDBC driver and connection parameters. In option “query”, if you want to read a whole table, provide the table name; on the contrary, if you want to run the select query, specify the same. The data returned by SQL is stored in a Spark data frame.

jdbcDF = spark.read.format("jdbc") 
    .option("url", url) 
    .option("query", query2) 
    .option("user", user) 
    .option("password", password) 
    .option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver") 
    .load()
jdbcDF.show(5)

Save data frame as CSV file

The data frame can be stored on the server as a file. Archivo CSV. Something, this step is optional in case you want to write the data frame directly to an S3 bucket, this step can be skipped. PySpark, default, create multiple partitions, to avoid this we can save it as a single file using the coalesce function (1). Then, we move the file to the designated output folder. Optionally, remove created output directory if you just want to save dataframe to S3 bucket.

path="output"
jdbcDF.coalesce(1).write.option("header","true").option("sep",",").mode("overwrite").csv(path)
shutil.move(glob.glob(os.getcwd() + '/' + path + '/' + r '*. csv')[0], os.getcwd()+ '/' + filename )
shutil.rmtree(os.getcwd() + '/' + path)

Copy data frame to bucket S3

First, create a 'boto3 session’ using AWS access and secret key values. Retrieve the values ​​from the S3 bucket and subdirectory where you want to upload the file. the Upload file() accepts a file name, a bucket name and an object name. The method handles large files by dividing them into smaller chunks and loading each chunk in parallel.

session = boto3.Session(
    aws_access_key_id=ACCESS_KEY,
    aws_secret_access_key=SECRET_KEY,
)
bucket_name=BUCKET_NAME
s3_output_key=DIRECTORY + filename
s3 = session.resource('s3')
# Filename - File to upload
# Bucket - Bucket to upload to (the top level directory under AWS S3)
# Key - S3 object name (can contain subdirectories). If not specified then file_name is used
s3.meta.client.upload_file(Filename=filename, Bucket=bucket_name, Key=s3_output_key)

Cleaning files

After uploading the file to the S3 bucket, delete any files left on the server; on the contrary, I threw an error.

if os.path.isfile(filename):
    removes(filename)
else: 
    print("Error: %s file not found" % filename)

Conclution

Apache Spark is an open source cluster computing framework with in-memory processing capabilities. It was developed in the Scala programming language. Spark offers many features and capabilities that make it an efficient Big Data framework. Performance and speed are the main benefits of Spark. You can load the terabytes of data and process it smoothly by setting up a multi-node cluster. This article gives an idea of ​​how to write a Python-based ETL.

The media shown in this article is not the property of DataPeaker and is used at the author's discretion.

Subscribe to our Newsletter

We will not send you SPAM mail. We hate it as much as you.