Cassandra Sink for Spark Structured Streaming

A couple of months ago, I started exploring Spark, and at some point I was faced with the problem of saving Structured Streaming calculations in the Cassandra database.

In this post, I give a simple example of creating and using Cassandra Sink for Spark Structured Streaming. I hope that the post will be useful to those who have recently started working with Spark Structured Streaming and are wondering how to upload the results of calculations to the database.

The idea of ​​the application is very simple - to receive and parse messages from the Kafka, perform simple transformations in the Spark and save the results in Cassandra.

Pros Structured Streaming


About Structured Streaming can be read in detail in the documentation . In short, Structured Streaming is a highly scalable stream processing engine that is based on the Spark SQL engine. It allows you to use Dataset / DataFrame to aggregate data, calculate window functions, connections, etc. That is, Structured Streaming allows you to use good old SQL to work with data streams.

What is the problem?


Stable release Spark Structured Streaming was released in 2017. That is, this is a fairly new API in which the basic functionality is implemented, but some things will have to be done by ourselves. For example, in Structured Streaming, there are standard functions for recording output data to a file, tab, console, or memory, but in order to save data to the database, you will have to use the foreach receiver in Structured Streaming and implement the ForeachWriter interface . Starting with Spark 2.3.1, this functionality can only be implemented on Scala and Java .

I assume that the reader already knows how Structured Streaming works in general terms, knows how to implement the necessary transformations and is now ready to upload the results to the base. If some of the steps above are unclear, official documentation can be a good starting point for learning Structured Streaming. In this article, I would like to focus on the last step when you need to save the results in a database.

Below, I will describe an example of implementing Cassandra sink for Structured Streaming and explain how to run it in a cluster. The full code is available here .

When I first encountered the above problem, this projectproved to be very helpful. However, it may seem a bit tricky if the reader has just started working with Structured Streaming and is looking for a simple example of how to upload data to a cassandra. In addition, the project is written to work in local mode and requires some changes to run in a cluster.

I also want to give examples of how to save data in MongoDB and any other database that uses JDBC .

A simple solution


To upload data to an external system, you must use a foreach receiver . Read more about it here . In short, you need to implement the ForeachWriter interface . That is, it is necessary to determine how to open a connection, how to process each piece of data, and how to close the connection at the end of processing. The source code is as follows:

classCassandraSinkForeach() extendsForeachWriter[org.apache.spark.sql.Row] {
  // This class implements the interface ForeachWriter, which has methods that get called // whenever there is a sequence of rows generated as outputval cassandraDriver = newCassandraDriver();
  defopen(partitionId: Long, version: Long): Boolean = {
    // open connection
    println(s"Open connection")
    true
  }
  defprocess(record: org.apache.spark.sql.Row) = {
    println(s"Process new $record")
    cassandraDriver.connector.withSessionDo(session =>
      session.execute(s"""
       insert into ${cassandraDriver.namespace}.${cassandraDriver.foreachTableSink} (fx_marker, timestamp_ms, timestamp_dt)
       values('${record(0)}', '${record(1)}', '${record(2)}')""")
    )
  }
  defclose(errorOrNull: Throwable): Unit = {
    // close the connection
    println(s"Close connection")
  }
}

I will describe the definition of CassandraDriver and the structure of the output tables later, but for now let's take a closer look at how the above code works. To connect to the casandra from Spark, I create a CassandraDriver object that provides access to the CassandraConnector , a connector designed by DataStax . CassandraConnector is responsible for opening and closing the connection to the database, so I just output debug messages in the open and close methods of the CassandraSinkForeach class .

The above code is called from the main application as follows:

val sink = parsed
    .writeStream
    .queryName("KafkaToCassandraForeach")
    .outputMode("update")
    .foreach(newCassandraSinkForeach())
    .start()

CassandraSinkForeach is created for each row of data, so each working node inserts its own part of the rows into the database. That is, each worker node executes val cassandraDriver = new CassandraDriver (); This is what CassandraDriver looks like:

classCassandraDriverextendsSparkSessionBuilder{
  // This object will be used in CassandraSinkForeach to connect to Cassandra DB from an executor.// It extends SparkSessionBuilder so to use the same SparkSession on each node.val spark = buildSparkSession
  import spark.implicits._
  val connector = CassandraConnector(spark.sparkContext.getConf)
  // Define Cassandra's table which will be used as a sink/* For this app I used the following table:
       CREATE TABLE fx.spark_struct_stream_sink (
       fx_marker text,
       timestamp_ms timestamp,
       timestamp_dt date,
       primary key (fx_marker));
  */val namespace = "fx"val foreachTableSink = "spark_struct_stream_sink"
}

Let's take a closer look at the spark object . The code for SparkSessionBuilder is as follows:

classSparkSessionBuilderextendsSerializable{
  // Build a spark session. Class is made serializable so to get access to SparkSession in a driver and executors. // Note here the usage of @transient lazy val defbuildSparkSession: SparkSession = {
    @transientlazyval conf: SparkConf = newSparkConf()
    .setAppName("Structured Streaming from Kafka to Cassandra")
    .set("spark.cassandra.connection.host", "ec2-52-23-103-178.compute-1.amazonaws.com")
    .set("spark.sql.streaming.checkpointLocation", "checkpoint")
    @transientlazyval spark = SparkSession
    .builder()
    .config(conf)
    .getOrCreate()
    spark
  }
}

On each working node, SparkSessionBuilder provides access to the SparkSession that was created on the driver. To make such access possible, it is necessary to serialize SparkSessionBuilder and use a transient lazy val , which allows the serialization system to ignore conf and spark objects during initialization of the program until the moment when the objects are accessed. Thus, when launching the program, buildSparkSession is serialized and sent to each working node, but conf and spark objects are resolved only at the moment when the working node accesses them.

Now let's look at the main application code:

objectKafkaToCassandraextendsSparkSessionBuilder{
  // Main body of the app. It also extends SparkSessionBuilder.defmain(args: Array[String]) {
    val spark = buildSparkSession
    import spark.implicits._
    // Define location of Kafka brokers:val broker = "ec2-18-209-75-68.compute-1.amazonaws.com:9092,ec2-18-205-142-57.compute-1.amazonaws.com:9092,ec2-50-17-32-144.compute-1.amazonaws.com:9092"/*Here is an example massage which I get from a Kafka stream. It contains multiple jsons separated by \n 
    {"timestamp_ms": "1530305100936", "fx_marker": "EUR/GBP"}
    {"timestamp_ms": "1530305100815", "fx_marker": "USD/CHF"}
    {"timestamp_ms": "1530305100969", "fx_marker": "EUR/CHF"}
    {"timestamp_ms": "1530305100011", "fx_marker": "USD/CAD"}
    */// Read incoming streamval dfraw = spark
    .readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", broker)
    .option("subscribe", "currency_exchange")
    .load()
    val schema = StructType(
      Seq(
        StructField("fx_marker", StringType, false),
        StructField("timestamp_ms", StringType, false)
      )
    )
    val df = dfraw
    .selectExpr("CAST(value AS STRING)").as[String]
    .flatMap(_.split("\n"))
    val jsons = df.select(from_json($"value", schema) as "data").select("data.*")
    // Process data. Create a new date columnval parsed = jsons
      .withColumn("timestamp_dt", to_date(from_unixtime($"timestamp_ms"/1000.0, "yyyy-MM-dd HH:mm:ss.SSS")))
      .filter("fx_marker != ''")
    // Output results into a databaseval sink = parsed
    .writeStream
    .queryName("KafkaToCassandraForeach")
    .outputMode("update")
    .foreach(newCassandraSinkForeach())
    .start()
    sink.awaitTermination()
  }
}

When an application is sent for execution, buildSparkSession is serialized and sent to the working nodes, however conf and spark objects remain unresolved. The driver then creates a spark object inside the KafkaToCassandra and distributes the work between the working nodes. Each working node reads data from the kafka, does simple transformations on the received portion of records, and when the working node is ready to write the results to the database, it allows conf and spark objects, thereby gaining access to the SparkSession created on the driver.

How to build and run the application?


When I switched from PySpark to Scala, it took me some time to figure out how to build the application. Therefore, I included Maven pom.xml in my project. A reader can build an application using Maven by running the mvn package command . After the application can be sent for execution using

./bin/spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.1,datastax:spark-cassandra-connector:2.3.0-s_2.11 --class com.insight.app.CassandraSink.KafkaToCassandra --master spark://ec2-18-232-26-53.compute-1.amazonaws.com:7077 target/cassandra-sink-0.0.1-SNAPSHOT.jar

In order to build and run the application, it is necessary to replace the names of my AWS machines with their own (i.e., replace everything that looks like ec2-xx-xxx-xx-xx.compute-1.amazonaws.com).

Spark and Structured Streaming in particular is a new topic for me, so I will be very grateful to readers for comments, discussion and corrections.

Also popular now: