Spark ML 管道在线推理实践

陈易生, ml-sysnotes
Back

前言

本文是「算法工程化实践选读」系列的第 5 篇,选读 Uber ML 平台团队在 2019 年 10 月发布的技术博客 Evolving Michelangelo Model Representation for Flexibility at Scale。Uber 尝试拓展 Spark ML 管道的能力,让管道可以更高效地进行模型的在线推理。作为比较,本文还详细介绍了 MLeap,它尝试从不同的角度去解决同一个问题。

离线容易

Spark ML 管道让模型的离线训练和离线推理变得十分简单。我们通过下面这个文档分类示例快速了解一下几个重要概念。

// Configure an ML pipeline, which consists of three stages:
// tokenizer, hashingTF, and lr.
val tokenizer = new Tokenizer()
.setInputCol("text")
.setOutputCol("words")
val hashingTF = new HashingTF()
.setNumFeatures(1000)
.setInputCol(tokenizer.getOutputCol)
.setOutputCol("features")
val lr = new LogisticRegression()
.setMaxIter(10)
.setRegParam(0.001)
val pipeline = new Pipeline()
.setStages(Array(tokenizer, hashingTF, lr))

TokenizerHashingTF 是 Transformer。Transformer 的 transform 方法定义了从输入 DataFrame 到输出 DataFrame 的数据转换逻辑。

LogisticRegression 是 Estimator。Estimator 的 fit 方法定义了从输入 DataFrame 到输出 Model 的模型训练逻辑。

Pipeline 是 Transformer 和 Estimator 的组合。

// Prepare training documents from a list of (id, text, label) tuples.
val training = ...
val model = pipeline.fit(training)

调用 Pipeline 的 fit 方法,可以得到 PipelineModel。不妨把 PipelineModel 理解为,完成拟合后的数据预处理-模型推理全流程。它提供了简单的 I/O API,用于将模型序列化为原生的格式。

model.write.overwrite().save("/tmp/spark-logistic-regression-model")

将此前序列化的模型反序列化,就可以复原 PipelineModel。

val model = PipelineModel.load("/tmp/spark-logistic-regression-model")

PipelineModel 同时也是个 Transformer。调用它的 transform 方法对测试数据集 DataFrame 执行数据预处理和批量推理,可以得到推理结果 DataFrame。

// Prepare test documents, which are unlabeled (id, text) tuples.
val test = ...
model.transform(test)
.select("id", "text", "probability", "prediction")
.collect()
.foreach { case Row(id: Long, text: String, prob: Vector, prediction: Double) =>
println(s"($id, $text) --> prob=$prob, prediction=$prediction")
}

在线难

但是,将离线训练得到模型用于在线推理,就没那么容易了。在线推理要求低延迟和高吞吐,但运行 Spark 的额外开销很大。

根据性能测试的结果,一次依赖 Spark Context 的推理过程的耗时在 100 毫秒左右,距离在线推理通常要求 5 毫秒有很大的距离。Uber Michelangelo 团队在一篇博客中定位了 PipelineModel 不适合做在线推理的两个原因:

下文着重比较将 Spark ML 管道用于在线推理的两个尝试。

MLeap

一个很直观的想法是,把管道所依赖的 Spark Transformer 在没有 Spark 依赖的环境下实现一遍,以避免 Spark Context 带来的额外开销。

开源的 MLeap 和 Databricks 闭源发布的 dbml-local 就是这么做的,在这里我们只讨论开源的 MLeap。一个标准的 MLeap 工作流是这样的:

  1. 离线训练。在离线训练环境中,像上一节介绍的那样,用 Spark 实现并训练一个管道,得到 PipelineModel。

  2. 序列化。将 PipelineModel 序列化为 MLeap Bundle

    import ml.combust.bundle.BundleFile
    import ml.combust.mleap.spark.SparkSupport._
    import org.apache.spark.ml.bundle.SparkBundleContext
    import resource._
    implicit val context = SparkBundleContext()
    (for (
    modelFile <- managed(BundleFile("jar:file:/tmp/spark-logistic-regression-model.zip"))
    ) yield {
    model.writeBundle.save(modelFile)(context)
    }).tried.get
  3. 在线推理。在线上服务环境中,依赖 MLeap 运行时,运行这个序列化后的管道,进行低延迟的在线推理。注意,下面的代码不包含 Spark 依赖!

    // NO SPARK DEPENDENCIES!
    import ml.combust.bundle.BundleFile
    import ml.combust.mleap.runtime.MleapSupport._
    import resource._
    val zipBundleM = (for (bundle <- managed(BundleFile("jar:file:/tmp/spark-logistic-regression-model.zip"))) yield {
    bundle.loadMleapBundle().get
    }).opt.get
    val mleapModel = zipBundleM.root
    // Prepare test documents, which are unlabeled (id, text) tuples.
    val test = ...
    mleapModel.transform(test)
    // ...

根据 MLeap 提供的 benchmark 结果,在 MLeap 运行时中运行 ML 管道,相比 Spark 运行时,可以获得 10000 倍以上的提速。

MLeap 之所以能做到这一点,是因为:

但 MLeap 同时也存在比较明显的缺点:

Spark OnlineTransformer

MLeap 这类解决方案引入的不一致性问题,给 Uber 造成过损失(见 SPIP 附件中的 PDF)。这促使 Uber ML 平台团队思考一种不依赖第三方(MLeap / PMML / PFA 等)模型格式转换的解决方案。

既然 Spark 原生的 PipelineModel 不适合在线的场景,能否直接修改 Spark 的代码,让它变得适合?这篇博客这场演讲介绍了 Uber ML 平台团队引入的 OnlineTransformer 类,解决了 Spark Transformer 在线推理效率低的缺点。

首先,Transformer 设计上没有考虑在线服务的场景,因此只有一个批推理 API def scoreInstances(instances: List[Map[String, Any]]): List[Map[String, Any]]。Uber 团队为 OnlineTransformer 加上了点推理 API def scoreInstance(instance: Map[String, Any]): Map[String, Any]。这是解决问题的第一步。

Uber 团队还发现,PipelineModel 对 Spark Context 的重度依赖,导致了在线服务中的模型加载过程缓慢。优化围绕 PipelineModel 的加载展开,思路是尽量用轻量的本地文件 I/O 代替依赖 Spark Context 的分布式 I/O,这些小优化累积起来,让模型加载速度提升了 4 到 15 倍。优化的地方包括:

这个方案相比 MLeap 等方案,还有一个天然的优点:它被包括在 Spark 项目中,因此会随着 Spark 代码库一同迭代。

然而,这个提案并没有被 Spark 社区所接受。从有限的讨论中,我推测 Spark 社区希望让 MLeap / dbml-local / PMML 去负责这个针对在线推理的优化,它不应该成为 Spark 的包袱。

很有意思。Uber 希望能把这个优化合并到 Spark 的主干,让社区后续能把针对在线推理的优化迭代下去;而 Spark 社区认为这个事情应该由第三方的库去管。这大概就是——「Uber 之蜜糖,Spark 之砒霜」罢 😂

总结

在 Spark ML 管道看来:

美中不足之处在于,这个管道依赖于 Spark 运行时,有很大的额外开销,对于在线推理的场景不适用。为了解决这个问题:

衷心希望能看到更多这样的「八仙过海各显神通」。


Feedback is a gift! Please send your feedback via email or Twitter.

© Yik San Chan. Built with Vercel and Nextra.