Tutorial Integrate Spark SQL and Cassandra complete with Scala and Python Example Codes
George Jen, Jen Tek LLC
Summary
This tutorial covers integration between Spark SQL and Cassandra and coding in Scala and Python with Spark SQL against table in Cassandra NoSQL database. Hope it provides values to those who are new on integrating Spark with Cassandra and want end to end working examples for quick start in their Spark/Cassandra applications.
Note: In the github README of Spark-Cassandra-Connector, there are some example of running Scala commands through interactive spark-shell and spark-sumbit, but with required options such as — packages and — conf.
What I have demonstrated differently in this writing is to have a standalone Scala program that does not need external command line options like — packages and — conf to run under spark-submit or spark-shell. Only main class is needed to run by spark-submit. That in my opinion, is a more helpful example to be quickly leveraged by others, as applications should be self-contained and self-sufficient without external command line arguments.
Hope this writing helpful, thank you for your time viewing
I am a frequent data science/data engineering content contributor on medium.com, all of my writings at medium.com can be viewed at
https://medium.com/@jentekllc8888
Items referenced
· Apache Cassandra
· Apache Spark SQL
· Datastax Spark-Cassandra-Connector for Scala
· Pyspark-Cassandra-Connector (for Python), ported from Spark-Cassandra-Connector for Scala
· sbt, the scala build tool
Introduction
Apache Cassandra is an open-source, distributed, wide column store, NoSQL database management system designed to handle large amounts of data across many commodity servers, providing high availability with no single point of failure, with linear scalability.
Cassandra offers robust support for clusters spanning multiple datacenters, with asynchronous masterless replication allowing low latency operations for all clients.
While Cassandra is a NoSQL DBMS, it offers something other NoSQL databases do not, a SQL like query language. Therefore, academically speaking in strict sense, Cassandra is a SQL database, that has schemas called keyspace, tables and columns, and a SQL like query language called CQL. However, Cassandra is NOT a RDBMS, you do not model schema objects on Cassandra like you do on a typical RDBMS. In simple term, tables in Cassandra should not be normalized to 3rd normal form, they would be more likely in 2nd normal form.
Limitation of Cassandra CQL
1. You cannot perform join operation. That is why tables on Cassandra need to be denormalized, meaning every column you want to reference in query is in a single table.
2. There are no support on window functions.
What if you really want to do join and run window functions on Cassandra tables? Logically, nothing should prevent you from running your SQL queries (NOT CQL queries). You can accomplish this to combine Apache Cassandra and Apache Spark SQL. This writing is going to show you how.
Cassandra Setup
To start with, if there is no Cassandra running, get one from
Specifically, download the beta version 4. Why is that? CQL is a Python script. Version 4 starts to support Python 3. Lower than version 4 requires Python 2.7.
Cassandra is a java application, it requires JDK. This assumes you already have JDK set up. I planned to run Cassandra on MacOS. It would work the same on any supported Linux. Reason is I already have Apache Spark running on the same MacOS.
At the time of this writing, I downloaded on MacOS terminal:
wget http://mirror.cogentco.com/pub/apache/cassandra/4.0-beta2/apache-cassandra-4.0-beta2-bin.tar.gz
tar -xvzf apache-cassandra-4.0-beta2-bin.tar.gz
Add below in .bash_profile
export CASSANDRA_HOME=/Users/user/cassandra/apache-cassandra-4.0-beta2
export PATH=$CASSANDRA_HOME/bin:$PATH
Log out and log back in to update the environment variables
Start Cassandra
To start cassandra, simply enter on command line:
cassandra
It will fork a background java process:
501 33692 1 0 2:48PM ttys005 0:34.51 /Library/Java/JavaVirtualMachines/jdk1.8.0_202.jdk/Contents/Home/bin/java -ea -da:net.openhft... -…5.jar:/Users/user/cassandra/apache-cassandra-4.0-beta2/lib/jsr223/*/*.jar: org.apache.cassandra.service.CassandraDaemon
Start the client
just enter
cqlshConnected to Test Cluster at 127.0.0.1:9042.
[cqlsh 5.0.1 | Cassandra 4.0-beta2 | CQL spec 3.4.5 | Native protocol v4]
Use HELP for help.
cqlsh>
Now you are ready to enter SQL queries or CQL queries.
By the way, cqlsh is a Python script:
501 33694 25689 0 2:48PM ttys005 0:01.24 python /Users/user/cassandra/apache-cassandra-4.0-beta2/bin/cqlsh.py
Leave this windows on. Open another terminal, let’s call it terminal 2.
Integrate with Spark SQL.
Integration Cassandra with Spark can be acomplished through Cassandra spark connector.
Tutorial agenda
1. Integrate Cassandra with Spark SQL in Scala through spark-cassandra-connector by Datastax
2. Integrate Cassandra with Spark SQL in Python through pyspark-cassandra-connector, ported from original Cassandra-Spark connector from Datastax.
Integrate Cassandra with Spark SQL in Scala through spark-cassandra-connector by Datastax
Create and deploy spark-cassandra-connector
In the terminal 2:
git clone the Cassandra-Spark connector from github
git clone https://github.com/datastax/spark-cassandra-connector.git
Build spark-cassandra-connector jar file
Build the spark-cassandra connector jar file, cd into the spark-cassandra-connector folder created by git clone and run below:
cd spark-cassandra-connector
sbt/sbt clean
sbt/sbt assembly
Result of running “sbt/sbt assembly” creates an assembly jar file (fat jar):
target/scala-2.12/spark-cassandra-connector-assembly-3.0.0-beta-11-g19cc8c06.jar
Deploy spark-cassandra-connector jar file to SPARK’s classpath
cp target/scala-2.12/spark-cassandra-connector-assembly-3.0.0-beta-11-g19cc8c06.jar $SPARK_HOME/jars/
Write and build Scala code to integrate Spark and Cassandra
To build Scala code, need to use sbt. If there you do not have sbt installed, you can download from
https://www.scala-sbt.org/download.html
Note, how to build with sbt is not in the scope of this writing. There are plenty of tutorials on using sbt on the internet.
To start with sbt, create below project directory tree:
In this writing, I create a folder called cassandra_project
mkdir cassandra_project
cd cassandra_project
Then create src/main/scala subfolder from the cassandra_project where the scala code resides.
mkdir -p src/main/scala
Additionally, need to create a sub-folder lib under cassandra-project root
cd ~/cassandra-project
mkdir lib
Copy spark-cassandra-connector-assembly-3.0.0-beta-11-g19cc8c06.jar, the spark-cassandra-connector jar file into lib folder, by
cd ~/cassandra-project/lib
cp $SPARK_HOME/jars/spark-cassandra-connector-assembly-3.0.0-beta-11-g19cc8c06.jar .
Back to root of project folder, cassandra_project
cd ~/cassandra_project
Create a text file called build.sbt with below.
vi build.sbt, add below lines, then save and exit
name := "CassandraSparkScalaExample"
version := "1.0"
val sparkVersion = "3.0.0-preview2"
val appDependencies = Seq(
"com.datastax.spark" %% "spark-cassandra-connector" % "3.0.0-beta"
)libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-core" % sparkVersion,
"org.apache.spark" %% "spark-streaming" % sparkVersion,
"org.apache.spark" %% "spark-sql" % sparkVersion
)
libraryDependencies += "log4j" % "log4j" % "1.2.14"
// https://mvnrepository.com/artifact/com.github.jnr/jnr-posix
libraryDependencies += "com.github.jnr" % "jnr-posix" % "3.0.61"
// https://mvnrepository.com/artifact/joda-time/joda-time
libraryDependencies += "joda-time" % "joda-time" % "2.10.6"
Now go to src/main/scala, create the following Scala code
cd src/main/scala
vi cassandra_connect_test.scala
Add below lines, save and exit vi:
import com.datastax.spark.connector._
import com.datastax.spark.connector.cql._
import org.apache.spark._
import org.apache.log4j._
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
object cassandraConnect {
def main(args:Array[String]) {Logger.getLogger("org").setLevel(Level.ERROR)
Logger.getLogger("com").setLevel(Level.ERROR)val conf=new SparkConf()
conf.set("spark.master","local[*]")
conf.set("spark.app.name","exampleApp")
val sc=new SparkContext(conf)
val spark = SparkSession
.builder
.appName("SQL example")
.master("local[*]")
.config("spark.sql.warehouse.dir", "")
.config("spark.sql.catalog.history", "com.datastax.spark.connector.datasource.CassandraCatalog")
.config("spark.cassandra.connection.host", "localhost")
.config("spark.sql.extensions", "com.datastax.spark.connector.CassandraSparkExtensions")
.getOrCreate()spark.sql("CREATE DATABASE IF NOT EXISTS history.sales WITH DBPROPERTIES (class='SimpleStrategy',replication_factor='1')")
spark.sql("CREATE TABLE IF NOT EXISTS history.sales.salesfact (key Int, sale_date TIMESTAMP, product STRING, value DOUBLE) USING cassandra PARTITIONED BY (key)")//List all keyspaces
spark.sql("SHOW NAMESPACES FROM history").show(false)
//List tables under keyspace sales
spark.sql("SHOW TABLES FROM history.sales").show(false)//Create some sales records, write them into Cassandra table sales.salesfactspark.createDataFrame(Seq((0,"2020-09-06 10:00:00","TV","200.00"),(1,"2020-09-06 11:00:00","Laptop","500.00"))).toDF("key","sales_date","product","value").rdd.saveToCassandra("sales", "salesfact", SomeColumns("key", "sale_date", "product", "value"))//Query data from Cassandra by Spark SQL, using window function that is not available on CQL
spark.sql("SELECT product, sum(value) over (partition by product) total_sales FROM history.sales.salesfact").show(false)sc.stop()
}
}
Directory sub-tree of cassandra-project, the sbt project folder
tree -a
.
|-- build.sbt
|-- lib
| `-- spark-cassandra-connector-assembly-3.0.0-beta-11-g19cc8c06.jar
`-- src
`-- main
`-- scala
`-- cassandra_connect_test.scala
Build it:
cd ~/cassandra_project
#remove any prior builds
sbt clean
#Compile into Java byte code
sbt compile#Run it
sbt run[info] welcome to sbt 1.3.12 (Oracle Corporation Java 1.8.0_202)
[info] loading project definition from /Users/user/git/scala_project/cassandra_project/project
[info] loading settings for project cassandra_project from build.sbt ...
[info] set current project to CassandraSparkScalaExample (in build file:/Users/user/git/scala_project/cassandra_project/)
[warn] There may be incompatibilities among your library dependencies; run 'evicted' to see detailed eviction warnings.
[info] Compiling 1 Scala source to /Users/user/git/scala_project/cassandra_project/target/scala-2.12/classes ...
[info] running cassandraConnect
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
+---------------------+
|namespace |
+---------------------+
|system |
|sales |
|system_schema |
|system_distributed |
|system_auth |
|test |
|system_virtual_schema|
|system_views |
|ks |
|system_traces |
|testks |
+---------------------++---------+---------+
|namespace|tableName|
+---------+---------+
|sales |salesfact|
+---------+---------++-------+-----------+
|product|total_sales|
+-------+-----------+
|TV |200.0 |
|Laptop |500.0 |
+-------+-----------+[success] Total time: 42 s, completed Sep 6, 2020 4:13:28 PM
To create a jar file that can be run by spark-submit, run
sbt packageresult of running "sbt package" creates a jar filetarget/scala-2.12/cassandrasparkscalaexample_2.12-1.0.jarThis jar file can be run by spark-submit, note, in the above Scala code, the object name is cassandraConnect, which is needed as --class option of spark-submitspark-submit --class cassandraConnect target/scala-2.12/cassandrasparkscalaexample_2.12-1.0.jar+---------------------+
|namespace |
+---------------------+
|system_auth |
|test |
|system_views |
|system_virtual_schema|
|testks |
|system_distributed |
|system |
|ks |
|system_schema |
|system_traces |
|sales |
+---------------------++---------+---------+
|namespace|tableName|
+---------+---------+
|sales |salesfact|
+---------+---------++-------+-----------+
|product|total_sales|
+-------+-----------+
|TV |200.0 |
|Laptop |500.0 |
+-------+-----------+
Integrate Cassandra with Spark SQL in Python through pyspark-cassandra-connector ported from original spark-cassandra-connector from Datastax
Create and deploy pyspark-cassandra-connector
Open a 3rd terminal, called terminal 3:
Clone pyspark-cassandra, which is port from Datastax’ Spark-Cassandra-Connector.
git clone https://github.com/anguenot/pyspark-cassandra.git
cd to the folder pyspark-cassandra created by git clone.
There are 2 tasks:
1. Build the pyspark-cassandra jar file, by running sbt
2. Install Python library pyspark_cassandra
Build the pyspark-cassandra jar file, by running sbt
cd pyspark-cassandra
sbt clean
sbt compile
sbt spPublishLocal
Result of running “sbt spPublishLocal”, creates a jar file
target/scala-2.11/pyspark-cassandra_2.11-2.4.0.jar
Simply copy target/scala-2.11/pyspark-cassandra_2.11–2.4.0.jar to $SPARK_HOME/jars/
cp target/scala-2.11/pyspark-cassandra_2.11-2.4.0.jar $SPARK_HOME/jars/
Install Python library pyspark_cassandra
cd to sub-folder python, and run setup.py
cd python
python setup.py install
Result of running setup.py will install pyspark_cassandra library, which can be imported in Python. Note, at the time of this writing, there is no other method to install pyspark_cassandra, for example, no pip install.
Test it, it should work.
python -c "import pyspark_cassandra"
Here is the Python code cassandra_spark.py to run queries in Spark SQL against table in Cassandra. Note, you will need findspark module if you have not already installed. You can simply install it by
sudo pip install findspark
Write Python code to run Spark SQL query against table on Cassandra
vi cassandra_spark.py
Enter below lines, save and exit
import findspark
findspark.init()
import pyspark_cassandraimport pyspark
from pyspark import *
from pyspark.sql import *
from pyspark.sql.functions import *conf=SparkConf()
conf.set("spark.master","local[*]")conf.set("spark.app.name","exampleApp")sc= SparkContext(conf=conf)
sc.setLogLevel("ERROR")spark = SparkSession.builder.appName("SQL Example").master("local[*]")\
.config("spark.sql.catalog.history", "com.datastax.spark.connector.datasource.CassandraCatalog")\
.config("spark.cassandra.connection.host", "localhost")\
.config("spark.sql.extensions", "com.datastax.spark.connector.CassandraSparkExtensions")\
.getOrCreate()#Create keyspace if not exists
spark.sql("CREATE DATABASE IF NOT EXISTS history.sales WITH DBPROPERTIES (class='SimpleStrategy',replication_factor='1')")#Create table if not exists
spark.sql("CREATE TABLE IF NOT EXISTS history.sales.salesfact (key Int, sale_date TIMESTAMP, product STRING, value DOUBLE) USING cassandra PARTITIONED BY (key)")#Show all keyspaces
spark.sql("SHOW NAMESPACES FROM history").show(truncate=False)#Show tables under keyspace sales
spark.sql("SHOW TABLES FROM history.sales").show(truncate=False)#Run window function in SparkSQL against Cassandra table sales.salesfactspark.sql("SELECT product, sum(value) over (partition by product) total_sales_by_product FROM history.sales.salesfact").show(truncate=False)sc.stop()
quit()
Run it.
python cassandra_spark.pySetting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
+---------------------+
|namespace |
+---------------------+
|sales |
|system_views |
|system |
|ks |
|testks |
|system_distributed |
|system_schema |
|system_traces |
|system_auth |
|test |
|system_virtual_schema|
+---------------------++---------+---------+
|namespace|tableName|
+---------+---------+
|sales |salesfact|
+---------+---------++-------+----------------------+
|product|total_sales_by_product|
+-------+----------------------+
|TV |200.0 |
|Laptop |500.0 |
+-------+----------------------+
Return to first terminal where Cassandra client cqlsh was started, and query the table created by Spark SQL on Cassandra:
cqlshConnected to Test Cluster at 127.0.0.1:9042.
[cqlsh 5.0.1 | Cassandra 4.0-beta2 | CQL spec 3.4.5 | Native protocol v4]
Use HELP for help.
cqlsh> select * from sales.salesfact;key | product | sale_date | value
-----+---------+---------------------------------+-------
1 | Laptop | 2020-09-06 18:00:00.000000+0000 | 500
0 | TV | 2020-09-06 17:00:00.000000+0000 | 200(2 rows)