Spark Streaming and Kafka Integration

https://data-flair.training/blogs/kafka-spark-streaming-integration/
  • Transfer
Hello colleagues! We remind you that not so long ago we published a book about Spark , and right now the last proofreading book about Kafka is passing .


We hope these books will be successful enough to continue the topic - for example, to translate and publish literature on Spark Streaming. We wanted to offer a translation about the integration of this technology with Kafka today.

1. Justification

Apache Kafka + Spark Streaming is one of the best combinations for creating real-time applications. In this article we will discuss in detail the details of such integration. In addition, we consider the example of Spark Streaming-Kafka. Then we discuss the “approach with the recipient” and the option of directly integrating Kafka and Spark Streaming. So, let's proceed to the integration of Kafka and Spark Streaming.



2. Integration of Kafka and Spark Streaming

When integrating Apache Kafka and Spark Streaming, there are two possible approaches to the configuration of Spark Streaming for receiving data from Kafka - i.e. two approaches to integrating Kafka and Spark Streaming. First, you can use the Kafka Recipients and Kafka high-level API. The second (newer) approach is work without Recipients. For both approaches, there are different programming models that differ, for example, in terms of performance and semantic guarantees.



Consider these approaches in more detail

a. Recipient Based Approach

In this case, the data is received by the Recipient. So, using the high-level consumption API provided by Kafka, we implement the Recipient. Further, the data is stored in the Spark Contractors. Then in Kafka - Spark Streaming, tasks are launched within which the data are processed.

However, using this approach, there is still a risk of data loss in the event of a failure (with the default configuration). Consequently, it will be necessary to additionally include a write ahead log in the Kafka - Spark Streaming in order to prevent data loss. Thus, all data received from Kafka is synchronously stored in the forward write log in the distributed file system. That is why even after a system failure all data can be recovered.

Next, we consider how to use this approach with the use of recipients in an application with Kafka - Spark Streaming.

i. Binding

Now let's associate our streaming application with the following artifact for Scala / Java applications, using the project definitions for SBT / Maven.

groupId = org.apache.spark
artifactId = spark-streaming-kafka-0-8_2.11
version = 2.2.0

However, when deploying our application, we will have to add the aforementioned library and its dependencies, this is needed for Python applications.

ii. Programming

Next, create the input stream DStreamby importing KafkaUtilsinto the code of the streaming application:

import org.apache.spark.streaming.kafka._
val kafkaStream = KafkaUtils.createStream(streamingContext,
[ZK quorum], [consumer group id], [per-topic number of Kafka partitions to consume])

In addition, using the createStream variants, you can specify key classes and value classes, as well as corresponding classes for decoding them.

iii. Deployment

As with any Spark application, the spark-submit command is used to run. However, the details are slightly different in Scala / Java applications and in Python applications.

Moreover, with the help –packagesyou can add spark-streaming-Kafka-0-8_2.11its dependencies directly to spark-submit, it is useful for applications in Python, where it is impossible to manage projects using SBT / Maven.

./bin/spark-submit --packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.2.0 ...

You can also download the Maven Jar archive from the Maven spark-streaming-Kafka-0-8-assemblyrepository. Then add it to spark-submitwith - jars.

b. Direct approach (without recipients)

After the approach with the use of recipients a new approach was developed - the “direct” one. It provides reliable end-to-end guarantees. In this case, we periodically request Kafka about the offset of the read data (offsets) for each topic / section, and not organize the delivery of data through recipients. In addition, the size of the read fragment is determined, it is necessary for the proper processing of each packet. Finally, a simple consuming API is used to read data ranges from Kafka with given offsets, especially when data processing tasks are started. The whole process is like reading files from the file system.

Note: This feature appeared in Spark 1.3 for Scala and the Java API, as well as in Spark 1.4 for the Python API.

Now let's discuss how to apply this approach in our streaming application.
The Consumer API is described in more detail at the following link:

Apache Kafka Consumer | Examples of Kafka Consumer

i. Binding

True, this approach is supported only in Scala / Java applications. With the following artifact, build the SBT / Maven project.

groupId = org.apache.spark
artifactId = spark-streaming-kafka-0-8_2.11
version = 2.2.0

ii. Programming

Next, import KafkaUtils and create an input DStreamin the code of the streaming application:

import org.apache.spark.streaming.kafka._
val directKafkaStream = KafkaUtils.createDirectStream[
[key class], [valueclass], [keydecoderclass], [valuedecoderclass] ](
streamingContext, [mapofKafkaparameters], [setoftopicstoconsume])

In the Kafka parameters you will need to specify either metadata.broker.list, or bootstrap.servers. Consequently, by default we will consume data starting from the last offset in each Kafka section. However, if you want the reading to start from the smallest fragment, then in the Kafka parameters you need to set the configuration option auto.offset.reset.

Moreover, by working with options KafkaUtils.createDirectStream, you can start reading at an arbitrary offset. Then we will do the following, which will allow us to gain access to the Kafka fragments consumed in each packet.

// Храним ссылку на актуальные диапазоны фрагментов, чтобы ее могли использовать и последующие потокиvar offsetRanges = Array.empty[OffsetRange]
directKafkaStream.transform { rdd =>
offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
rdd
}.map {
...
}.foreachRDD { rdd =>
for (o <- offsetRanges) {
println(s"${o.topic} ${o.partition} ${o.fromOffset} ${o.untilOffset}")
}
...
}

If we want to organize Kafka monitoring based on Zookeeper with the help of special tools, then we can update Zookeeper ourselves with their help.

iii. Deployment

The deployment process in this case resembles the deployment process in the version with the recipient.

3. Advantages of the direct approach The

second approach to integrating Spark Streaming with Kafka wins the first for the following reasons:

a. Simplified concurrency

In this case, you do not need to create multiple Kafka input streams and merge them. However, Kafka - Spark Streaming will create as many RDD segments as there will be Kafka consumption segments. All these Kafka data will be read in parallel. Therefore, we can say that we will have a one-to-one correspondence between the Kafka and RDD segments, and this model is clearer and easier to configure.

b. Efficiency

In order to completely eliminate data loss during the first approach, information was required to be stored in the log of the leading record, and then replicated. In fact, this is inefficient, since the data is replicated twice: for the first time by Kafka himself, and in the second - by the proactive write log. In the second approach, this problem is eliminated, since there is no recipient, and, therefore, the forward write log is not needed either. If we have provided for a sufficiently long storage of data in Kafka, then you can restore messages directly from Kafka.

with. Exactly-Once Semantics

In principle, we used the high-level Kafka API on the first approach to store consumed read fragments in the Zookeeper. However, this is the way to consume data from Kafka. Suppose, at the same time, data losses are reliably excluded, there is a small probability that with some failures, individual records can be consumed twice. It's all about the inconsistency between the mechanism for reliable data transfer in Kafka - Spark Streaming and reading fragments occurring in Zookeeper. Therefore, in the second approach, we use a simple Kafka API, which does not require resorting to Zookeeper. Here, the read fragments are tracked in Kafka - Spark Streaming, for this are used control points. In this case, the inconsistency between Spark Streaming and Zookeeper / Kafka is eliminated.

Therefore, even in the event of a bounce, Spark Streaming receives each entry strictly once. Here we need to ensure that our output operation, in which data is stored in external storage, is either an idempotent or atomic transaction, in which both the results and the displacements are stored. This is exactly how exactly-once semantics is achieved when deriving our results.

Although, there is one drawback: the offsets in Zookeeper are not updated. Therefore, Kafka-based Zookeeper monitoring tools do not allow tracking progress.
However, we can still apply to offsets, if the processing is arranged by this method - we apply to each package and update Zookeeper ourselves.

That's all that we wanted to tell about the integration of Apache Kafka and Spark Streaming. Hope you enjoyed it.

Also popular now: