It's been a while since my previous article in Spark/Scala series, where we ran Spark locally using Docker. And even before that we tested Spark locally with Scalatest.
Probably most people associate Spark with processing of big batches of data, running massive daily jobs going through terrabytes of data. However very often data actually enters our systems constantly, it naturally streams into it.. and to process it in batch, you'll have to aggregate it somewhere, delay processing by at least this aggregation window, and only then make some use of it. Seems quite inefficient? Well, Spark Streaming comes to the rescue.
While not a true "streaming" (like Flink, for instance), Spark Streaming breaks up continuous real-time data streams into small batches, which can then be processed in parallel using Spark's processing engine. As an additionaly benefit, Spark Streaming can read from and write to many different streaming data sources - and probably one of the most popular ones is Kafka.
In this specific article I won't dedicate too much time to the Spark Streaming per se - instead I'll concentrate on testing existing Streaming application locally, without need to connect to existing Kafka cluster, or spin up one in Docker - we'll use EmbeddedKafka library for that purpose, allowing us to create quick and lightweight unit tests.
Setting up dependencies
What we would need first of all is, of course, Spark and Spark Streaming. Let's add them to our build.sbt like that:
libraryDependencies += "org.apache.spark" % "spark-core_2.12" % "3.3.0"
libraryDependencies += "org.apache.spark" % "spark-sql_2.12" % "3.3.0"
libraryDependencies += "org.apache.spark" % "spark-streaming_2.12" % "3.3.0"
And to test our code, as I've already mentioned, we'll be using Scalatest and EmbeddedKafka:
libraryDependencies += "org.scalatest" %% "scalatest" % "3.2.15" % Test
libraryDependencies += "io.github.embeddedkafka" %% "embedded-kafka" % "3.4.0" % Test
(btw, if scopes like "Test" and difference between % and %% confuse you, feel free to leave a comment, I might dedicate a separate post to that).
Utility class for tests
Let's not go into tests just yet, and prepare some foundation for them first. We'll create a trait, that will be setting up (and tearing down) all required infra for the tests themselves, so we wouldn't have to repeat it all the time.
trait SparkStreamingTester
extends Suite
with BeforeAndAfterEach {
val sparkSession: SparkSession = {
SparkSession
.builder()
.master("local")
.appName("test spark app")
.config("spark.ui.enabled", "false")
.config("spark.sql.shuffle.partitions", "1")
.getOrCreate()
}
val checkpointDir: File = Files.createTempDirectory("stream_checkpoint").toFile
FileUtils.forceDeleteOnExit(checkpointDir)
sparkSession.sparkContext.setCheckpointDir(checkpointDir.toString)
implicit val kafkaConfig: EmbeddedKafkaConfig = EmbeddedKafkaConfig(
kafkaPort = 9092,
zooKeeperPort = 2181,
customBrokerProperties = Map("auto.create.topics.enable" -> "true")
)
override def beforeEach(): Unit = {
EmbeddedKafka.start()(kafkaConfig)
}
override def afterEach(): Unit = {
EmbeddedKafka.stop()
}
}
Now, there's lots of things going on here - let's unpack them one by one. Our Trait extends two tags from ScalaTest, that give us beforeEach and afterEach - couple of methods to start EmbeddedKafka before test, and stop it afterwards. We also need kafkaConfig for this instance to run, and we make it implicit - so that tests that would extend SparkStreamingTester get access to it, and wouldn't need to explicitly pass it to Kafka-related methods.
Setting up Spark Session should look quite familiar as well - we're just creating Spark application that we'll be actually testing.
A slightly more unusual bit is checkpointDir. This directory is required only for streaming applications. Spark application can be stopped, redeployed, re-started at any point of its life, while stream continues to exist, and still has some messages in it. Kafka doesn't store information about state of it's consumers - so Spark will have to do that itself. Hence we need checkpointing - in simple words, it's just data about what topic we've read from, and what have we already read, so when reconnected to Kafka, Spark application can pick up where it left.
That's all preparation that we need - now we can start with creation of an actual test.
First structured streaming test case - send a String
Let's create test that is as simple as possible - send one string to Kafka, fetch it using Spark Dataset, collect from Spark, and make sure that we've received what we actually expected.
Let's look at the code first, and then go through it line by line:
class SimpleStreamingTest
extends AnyFlatSpec
with SparkStreamingTester {
"spark" should "receive message from Kafka" in {
val topic = "test_topic"
val tableName = "test_table"
val testMessage = "test_message"
val df = sparkSession.readStream
.format("kafka")
.option("kafka.bootstrap.servers", s"localhost:${kafkaConfig.kafkaPort}")
.option("subscribe", topic)
.option("startingOffsets", "earliest")
.load()
val query = df.writeStream
.format("memory")
.queryName(tableName)
.outputMode(OutputMode.Append())
.trigger(Trigger.Once())
.start()
EmbeddedKafka.publishStringMessageToKafka(topic, testMessage)
query.processAllAvailable()
val results = sparkSession.sql(f"SELECT value FROM $tableName").collect()
results.length should be(1)
val messageBytes = results.head.getAs[Array[Byte]]("value")
new String(messageBytes, StandardCharsets.UTF_8) should be(testMessage)
}
}
Now that you've seen the code, let's discuss what is actually going on there, and why do we need all that stuff.
Our test class is extending "utility class" we've created in a previous section, so when the actual test begins, we already have Spark session ready, and Kafka started.
First thing we start by creating (reading) a stream that comes from Kafka. We specify URL of Kafka server we want to connect to, and topic we'll be reading from. Note here startingOffsets parameter. As already mentioned when talking about checkpointing, this parameter specifies where we would start reading the data. earliest means we'll try reading the data from the very beginning of the stream. So if you're connecting a production job to an existing topic on a long-running kafka server - you probably don't want to set that (maybe latest work better for you). For test it's absolutely fine though, it means we definitely won't miss the message. We've now created our source.
df you got as a result is a totally usual Spark DataFrame, same as if you're read data in batch from Parquet files - you can apply (mostly) the same transformations and functions, and fact that data is being streamed is abstracted away from you.
As a next step we need to create a sink - a destination for our stream. writeStream does exactly that. We'll be just storing the data into in-memory table - definitely not a production solution, but a good way to test our stream processing. outputMode just means we'll be appending new data (if it comes) to the table we've specified. trigger is a moment when data would be actually read and processed - probably you'll set it to Continuous for real job, but we only need to process data once in test. And that's kinda it! Almost.
Now we're finally publish the message using EmbeddedKafka.publishStringMessageToKafka. And we'll wait till all the data will be processed - by calling query.processAllAvailable(). As docstring says, this method can block for a long periods of time, and only exists for testing purposes.
At this point our stream has been processed, and we can collect the result from the table, and check what we've received from Kafka. Resulting table doesn't have just the value we're interested in, but also a key associated with it, topic where it was read from and other metadata, that we'll ignore in our case. Also note that value is an Array[Byte] - so we can serialize any kind of object there (and we'll actually do that, but slightly later).
For now, let's just decode our String (Kafka client uses UTF-8 charset by default), and compare it to what we've sent - voila! It's exactly what we expected it to be - our test_message that made a full circle.
As usually, all the code you can find in my repository.
That will be it for the first part - in the next article we'll take a look at how to send a slightly more complicated message, and see how processing of data stream can use the same API as usual batch data.
Quite an interesting article, thank you! Can Spark Streaming handle other types rather then just Strings? It seems not very flexible.