George Jen
5 min readFeb 2, 2020

Enrich json records in scala with and without Apache Spark

JSON (JavaScript Object Notation) is a lightweight data-interchange format. It is easy for humans to read and write. It is easy for machines to parse and generate.

One of the Json representation is a collection of name/value pairs. In various languages, this is realized as an object, record, struct, dictionary, hash table, keyed list, or associative array

Json is one of the major data representations for both Human being and machine. Social media companies send you streaming data if you request them through their API, for example, Twitter can stream you tweets in Json.

However, Json is just a text string that has to follow given syntax in order for the machine to parse it. We human being can read a Json even it is syntactically incorrect.

The use case I would like to demonstrate is to add information to a given Json string and produce a new json string contains additional information, it is called, enrich a Json.

Following is Json record 1

{

“visitorId”: “v1”,

“products”: [{

“id”: “i1”,

“interest”: 0.68

}, {

“id”: “i2”,

“interest”: 0.42

}]

}

Following is Json record 2:

{

“visitorId”: “v2”,

“products”: [{

“id”: “i1”,

“interest”: 0.78

}, {

“id”: “i3”,

“interest”: 0.11

}]

}

Following is dimension definition what the ids are:

“i1” is “Nike Shoes”

“i2” is “Umbrella“

“i3” is “Jeans“

Now define enrichment task: Turn the record 1, based upon dimension defined earlier, add a “name” attribute:

{

“visitorId”: “v1”,

“products”: [{

“id”: “i1”,

“interest”: 0.68

}, {

“id”: “i2”,

“interest”: 0.42

}]

}

Into

{

“visitorId”: “v1”,

“products”: [{

“id”: “i1”,

“name”: “Nike Shoes”

“interest”: 0.68

}, {

“id”: “i2”,

“name”: “Unbrella”,

“interest”: 0.42

}]

}

Also, enrich record 2 the same way:

{

“visitorId”: “v2”,

“products”: [{

“id”: “i1”,

“interest”: 0.78

}, {

“id”: “i3”,

“interest”: 0.11

}]

}

Into:

{

“visitorId”: “v2”,

“products”: [{

“id”: “i1”,

“name”: “Nike Shoes”,

“interest”: 0.78

}, {

“id”: “i3”,

“name”: “Jeans”,

“interest”: 0.11

}]

}

I am going to use Scala for the Json enrichment task. Why Scala? Two reasons:

1. Scala has wealth of Json library that can parse and extract Json string easily.

2. Apache Spark has its own library and methods to read and parse Json through Spark SQL.

I will use both approaches to accomplish the Json enrichment task defined earlier.

1. Approach that does not use Apache Spark

2. Approach that uses Apache Spark SQL

Here is the non spark approach, by import and using json4s library, using Jupyter-Scala kernel in Jupyter notebook:

import $ivy.`org.json4s::json4s-jackson:3.7.0-M2`

import org.json4s._

import org.json4s.jackson.JsonMethods._

import org.json4s.DefaultFormats

import org.json4s.jackson.Serialization

import org.json4s.jackson.Serialization.write

implicit val formats = org.json4s.DefaultFormats

//define original record 1 and record 2

val rec1: String = “””{

“visitorId”: “v1”,

“products”: [{

“id”: “i1”,

“interest”: 0.68

}, {

“id”: “i2”,

“interest”: 0.42

}]

}”””

val rec2: String = “””{

“visitorId”: “v2”,

“products”: [{

“id”: “i1”,

“interest”: 0.78

}, {

“id”: “i3”,

“interest”: 0.11

}]

}”””

val visitsData: Seq[String] = Seq(rec1, rec2)

for (i<-0 until visitsData.size)

{

println(visitsData(i))

println(“ “)

}

//Define dimension into key value pair map

val productIdToNameMap = Map(“i1” -> “Nike Shoes”, “i2” -> “Umbrella”, “i3” -> “Jeans”)

// define case class that matches the structure of original Json record. You need that case class to extract Json attribute later on

case class v_rec(

id: String,

interest: Double

)

case class p_rec(

visitorId: String, products: Array[v_rec]

)

//define New case class that will match new enriched Json string, that has added name field, you need that new case class to create a new, enriched Json string that has an added name key value pair

case class v_rec_new(

id: String,

name: String,

interest: Double

)

case class p_rec_new(

visitorId: String, products: Array[v_rec_new]

)

var jString: Array[String]=Array[String]()

var enrichedJson:Array[String]=Array[String]()

//Below starts parse, extract original Json record, generate new enriched Json string

for (js<-visitsData)

{

var jObj=parse(js)

var eJ=jObj.extract[p_rec]

var jStringJ=parse(rec1)

for (i<-0 until eJ.products.size)

{

var prodName:String=”Invalid Product”

//if there is no such product, show Invalid Product

if (productIdToNameMap contains (eJ.products(i).id.toString))

prodName=productIdToNameMap(eJ.products(i).id.toString)

var newRec=p_rec_new(visitorId=eJ.visitorId,products=Array(v_rec_new(eJ.products(i).id.toString,

prodName,eJ.products(i).interest

)

)

)

//Now Json Serilizing it

val newRecStr = write(newRec)

jString:+=newRecStr

}

for (x<-0 until jString.size)

{

if (x==0)

jStringJ=parse(jString(x))

else

{

jStringJ=jStringJ merge parse(jString(x))

}

}

// Finally, print the new enriched Json string out

enrichedJson:+=write(jStringJ)

jString=Array[String]()

}

Running the prior Scala code will produce below output:

{“visitorId”:”v1",”products”:[{“id”:”i1",”name”:”Nike Shoes”,”interest”:0.68},{“id”:”i2",”name”:”Umbrella”,”interest”:0.42}]}

{“visitorId”:”v2",”products”:[{“id”:”i1",”name”:”Nike Shoes”,”interest”:0.78},{“id”:”i3",”name”:”Jeans”,”interest”:0.11}]}

Next, I will demonstrate to do the same enrichment on the same Json record and produce the same output, by using Apache Spark SQL, using spylon kernel in Jupyter notebook.

import org.apache.spark._

import org.apache.spark.SparkContext._

import org.apache.spark.rdd._

import org.apache.spark.util.LongAccumulator

import org.apache.log4j._

import scala.collection.mutable.ArrayBuffer

import org.apache.spark.sql._

import org.apache.spark.sql.functions._

//original Json record 1

val rec1: String = “””{

“visitorId”: “v1”,

“products”: [{

“id”: “i1”,

“interest”: 0.68

}, {

“id”: “i2”,

“interest”: 0.42

}]

}”””

//original Json record 2

val rec2: String = “””{

“visitorId”: “v2”,

“products”: [{

“id”: “i1”,

“interest”: 0.78

}, {

“id”: “i3”,

“interest”: 0.11

}]

}”””

val visitsData: Seq[String] = Seq(rec1, rec2)

//define product id to name key value pair map

val productIdToNameMap = Map(“i1” -> “Nike Shoes”, “i2” -> “Umbrella”, “i3” -> “Jeans”)

/*

Now solution starts

*/

import spark.implicits._

import spark.sql

productIdToNameMap.toSeq.toDF(“id”,”name”).createOrReplaceTempView(“prodRec”)

for (i<-visitsData)

{

println(“Original Json String is: \n”)

println(i)

println(“\n”)

var rec=spark.read.json(Seq(i).toDS)

rec.createOrReplaceTempView(“dfVisitsTable”)

val productsArr=sql (“SELECT products FROM dfVisitsTable”).withColumn(“products”, explode($”products”)).select(“products.*”)

productsArr.createOrReplaceTempView(“productsArr”)

// Need to do outer join in case the product id in the record is not valid, if product id not found in the MAP,

// return invalid product

val enrichedProducts=sql(“select a.id, if (b.name is not null, b.name, ‘invalid product’) name, a.interest from productsArr a full outer join prodRec b on a.id=b.id”)

val enrichedRecord=rec.select(“VisitorId”).join(enrichedProducts)

enrichedRecord.createOrReplaceTempView(“enrichedRec”)

val enrichedJson=sql(“select visitorId, collect_list(struct(id, name, interest)) products from enrichedRec group by visitorId”).toJSON.collect.mkString(“”,”,”,””)

println(“Enriched Json String is:\n”)

println(enrichedJson)

println(“ “)

println(“ “)

}

Executing prior Spark Scala code produces below output:

Original Json String is:

{

“visitorId”: “v1”,

“products”: [{

“id”: “i1”,

“interest”: 0.68

}, {

“id”: “i2”,

“interest”: 0.42

}]

}

Enriched Json String is:

{“visitorId”:”v1",”products”:[{“name”:”Jeans”},{“id”:”i1",”name”:”Nike Shoes”,”interest”:0.68},{“id”:”i2",”name”:”Umbrella”,”interest”:0.42}]}

Original Json String is:

{

“visitorId”: “v2”,

“products”: [{

“id”: “i1”,

“interest”: 0.78

}, {

“id”: “i3”,

“interest”: 0.11

}]

}

Enriched Json String is:

{“visitorId”:”v2",”products”:[{“id”:”i3",”name”:”Jeans”,”interest”:0.11},{“id”:”i1",”name”:”Nike Shoes”,”interest”:0.78},{“name”:”Umbrella”}]}

Both approaches, Scala only without Spark and Scala with Spark produce the same result. Which method I would recommend? In the production environment with large number of Json records, thinking about millions or billions of Json records to be processed, Apache Spark is a way to go.

A scala program in itself is no different from a Java program, in fact, they are the same because both will be compiled into Java byte code and run on JVM. Scala program is just a monolithic program without parallelism unless you code it that way. Writing a Scala program with Apache Spark will take advantages of Spark distributed computing framework and rich library of Spark SQL, that processes, for example, Json enrichment task in a few SQL queries executed by Apache Spark, in parallel, across Spark worker nodes.

The codes used in this writing in my github site:

https://github.com/geyungjen/jentekllc

George Jen
George Jen

Written by George Jen

I am founder of Jen Tek LLC, a startup company in East Bay California developing AI powered, cloud based documentation/publishing software as a service.

No responses yet