
  1. 一个疑惑:在接收的数据真正交给 blockmanager 之前是在 blockgenerate 中存储的,这里面并没有任何持久化保证,也就是说如果这个时候宕机是不是当前在 blockgenerate 中还没有交个 blockManager 的数据会全部丢失掉?
  2. 默认情况下 spark streaming 使用 BlockManagerBasedBlockHandler 保存块,并且默认持久化策略是 MEMORY_AND_DISK_SER_2
  3. WriteAheadLogBasedBlockHandler 默认使用 FileBasedWriteAheadLog 写日志,这种方式每个一段时间( spark.streaming.receiver.writeAheadLog.rollingIntervalSecs ) 滚动一次,生成一个新的日志文件,使得单个日志文件不会太大。
  4. Kafka 的 DirectStream 另外一种数据可靠性的方法,这种方法跳出了正在 spark streaming 的模式,不会再单独启动 Receiver 接收数据
  5. DirectStream 相当于是把 Kafka 看做类似 HDFS 一样的底层文件系统,由 Spark Streaming 来负责整个 offset 的侦测、batch 分配、实际读取数据,些分 batch 的信息都是 checkpoint 到了可靠存储中。

reciver 预写日志

  1. 如果我们在 spark stream 的配置文件中指定了 spark.streaming.receiver.writeAheadLog.enable 为 true ,则 ReceiverSupervisorImpl 实例化时得到的 receivedBlockHandler 实例是一个 WriteAheadLogBasedBlockHandler,还有一个需要注意的地方是不管创建的是 WriteAheadLogBasedBlockHandler 还是 BlockManagerBasedBlockHandler 都会传入一个 storageLevel ,这个参数配置的是底层 blockManager 的持久化级别,默认是 MEMORY_AND_DISK_SER_2 ,如果已经开启乐预写日志可以把持久化级别后面的 2 给去掉。

    private val receivedBlockHandler: ReceivedBlockHandler = {
      if (WriteAheadLogUtils.enableReceiverLog(env.conf)) {
        if (checkpointDirOption.isEmpty) {
          throw new SparkException(
            "Cannot enable receiver write-ahead log without checkpoint directory set. " +
              "Please use streamingContext.checkpoint() to set the checkpoint directory. " +
              "See documentation for more details.")
        new WriteAheadLogBasedBlockHandler(env.blockManager, env.serializerManager, receiver.streamId,
          receiver.storageLevel, env.conf, hadoopConf, checkpointDirOption.get)
      } else {
        new BlockManagerBasedBlockHandler(env.blockManager, receiver.storageLevel)
  2. WriteAheadLogBasedBlockHandler 的构造函数中会实例化真正负责存储的 writeAheadLog, 具体实例化方法如下:

    private def createLog(
        isDriver: Boolean,
        sparkConf: SparkConf,
        fileWalLogDirectory: String,
        fileWalHadoopConf: Configuration
      ): WriteAheadLog = {
      // 配置的预写日志类
      val classNameOption = if (isDriver) {
      } else {
      val wal = classNameOption.map { className =>
        try {
          // 实例化用户配置的预写日志类
            Utils.classForName(className).asInstanceOf[Class[_ <: WriteAheadLog]], sparkConf)
        } catch {
          case NonFatal(e) =>
            throw new SparkException(s"Could not create a write ahead log of class $className", e)
      }.getOrElse {
        // 默认的预写日志类
        new FileBasedWriteAheadLog(sparkConf, fileWalLogDirectory, fileWalHadoopConf,
          getRollingIntervalSecs(sparkConf, isDriver), getMaxFailures(sparkConf, isDriver),
          shouldCloseFileAfterWrite(sparkConf, isDriver))
      // 受参数 spark.streaming.driver.writeAheadLog.allowBatching 控制
      // 这个类另外开启了一个线程负责存储
      // 调用 write 方法时仅仅是把消息存储到一个队列里面
      // 真正存储时会对队列中所以的消息按时间排序,并聚合成一条消息
      // 然后使用前面创建的 wal 实例写入
      if (isBatchingEnabled(sparkConf, isDriver)) {
        new BatchedWriteAheadLog(wal, sparkConf)
      } else {
  3. 下面看 FileBasedWriteAheadLog 类, 看这个类的 write 具体实现:

    def write(byteBuffer: ByteBuffer, time: Long): FileBasedWriteAheadLogSegment = synchronized {
      var fileSegment: FileBasedWriteAheadLogSegment = null
      var failures = 0
      var lastException: Exception = null
      var succeeded = false
      while (!succeeded && failures < maxFailures) {
        try {
          // 关键是这里,根据时间得到 LogWriter
          // 每个 LogWriter 对应一个日志文件,滚动日志
          fileSegment = getLogWriter(time).write(byteBuffer)
          if (closeFileAfterWrite) {
          succeeded = true
        } catch {
          case ex: Exception =>
            lastException = ex
            logWarning("Failed to write to write ahead log")
            failures += 1
      if (fileSegment == null) {
        logError(s"Failed to write to write ahead log after $failures failures")
        throw lastException

    getLogWriter 方法, 多长时间滚动一次可以通过如下参数控制 spark.streaming.receiver.writeAheadLog.rollingIntervalSecs

    private def getLogWriter(currentTime: Long): FileBasedWriteAheadLogWriter = synchronized {
      if (currentLogWriter == null || currentTime > currentLogWriterStopTime) {
        currentLogPath.foreach {
          pastLogs += LogInfo(currentLogWriterStartTime, currentLogWriterStopTime, _)
        currentLogWriterStartTime = currentTime
        currentLogWriterStopTime = currentTime + (rollingIntervalSecs * 1000)
        val newLogPath = new Path(logDirectory,
          timeToLogFile(currentLogWriterStartTime, currentLogWriterStopTime))
        currentLogPath = Some(newLogPath.toString)
        currentLogWriter = new FileBasedWriteAheadLogWriter(currentLogPath.get, hadoopConf)

    然后 write 方法调用 hadoop API 向 hadoop 写数据

  4. WriteAheadLogBasedBlockHandler#storeBlock 方法中同时向 blockmanager 还有 writeAheadLog 写日志,实现了 reciver 上的可靠性

    def storeBlock(blockId: StreamBlockId, block: ReceivedBlock): ReceivedBlockStoreResult = {
      var numRecords = Option.empty[Long]
      // Serialize the block so that it can be inserted into both
      val serializedBlock = block match {
        case ArrayBufferBlock(arrayBuffer) =>
          numRecords = Some(arrayBuffer.size.toLong)
          serializerManager.dataSerialize(blockId, arrayBuffer.iterator)
        case IteratorBlock(iterator) =>
          val countIterator = new CountingIterator(iterator)
          val serializedBlock = serializerManager.dataSerialize(blockId, countIterator)
          numRecords = countIterator.count
        case ByteBufferBlock(byteBuffer) =>
          new ChunkedByteBuffer(byteBuffer.duplicate())
        case _ =>
          throw new Exception(s"Could not push $blockId to block manager, unexpected block type")
      // Store the block in block manager
      val storeInBlockManagerFuture = Future {
        val putSucceeded = blockManager.putBytes(
          tellMaster = true)
        if (!putSucceeded) {
          throw new SparkException(
            s"Could not store $blockId to block manager with storage level $storageLevel")
      // Store the block in write ahead log
      val storeInWriteAheadLogFuture = Future {
        writeAheadLog.write(serializedBlock.toByteBuffer, clock.getTimeMillis())
      // Combine the futures, wait for both to complete, and return the write ahead log record handle
      val combinedFuture = storeInBlockManagerFuture.zip(storeInWriteAheadLogFuture).map(_._2)
      val walRecordHandle = ThreadUtils.awaitResult(combinedFuture, blockStoreTimeout)
      WriteAheadLogBasedStoreResult(blockId, numRecords, walRecordHandle)

Kafka Direct API

val kafkaParams = Map[String, String](
  "bootstrap.servers" -> "localhost:9092,anotherhost:9092",
  "group.id" -> "use_a_separate_group_id_for_each_stream",
  "auto.offset.reset" -> "latest"
val topics = Set("topicA", "topicB")
val stream = KafkaUtils.createDirectStream(ssc,kafkaParams,topics)
  1. createDirectStream 创建的 DirectKafkaInputDStream 对象继承自 InputDStream 而非是 ReceiverInputDStream,这一点很重要因为这种 Direct 方式并不会使用 receiver 接收数据,也就没有必要使用 receiverTracker 去启动 receiver 了。而代码中判断是不是需要启动 Receiver 的依据正是是否继承自 ReceiverInputDStream

    def getReceiverInputStreams(): Array[ReceiverInputDStream[_]] = this.synchronized {
  2. 在使用 DStream 生成 RDD 时最终会调用到 DirectKafkaInputDStream 的 compute 方法,而在这个方法中创建的是一个 KafkaRDD,然后就到了关键的地方了, 考虑 RDD 的调用链,每个 RDD 都需要前一个 RDD 返回的 Iterator. 看下面的方法:

    override def compute(thePart: Partition, context: TaskContext): Iterator[R] = {
      val part = thePart.asInstanceOf[KafkaRDDPartition]
      assert(part.fromOffset <= part.untilOffset, errBeginAfterEnd(part))
      if (part.fromOffset == part.untilOffset) {
        log.info(s"Beginning offset ${part.fromOffset} is the same as ending offset " +
          s"skipping ${part.topic} ${part.partition}")
      } else {
        new KafkaRDDIterator(part, context)

    在 KafkaRDDIterator 会创建针对 topic 的 consumer,然后就可以直接从 Kafka 中取数据了,这里相当于把 Kafka 当成类似于 HDFS 一样的数据源。


