15°

DStream转为DF的两种方式(突破map时元组22的限制)

在进行Spark Streaming的开发时,我们常常需要将DStream转为DataFrame来进行进一步的处理,
共有两种方式,方式一:

val spark = SparkSession.builder()
  .appName("Test")
  .getOrCreate()
import spark.implicits._
dStream.foreachRDD{ rdd =>
  val df = rdd.map(_.split(" "))
    .map(t => (t(1),t(2),t(3)))
    .toDF("col1","col2","col3")
  // 业务逻辑
}

利用map算子和tuple来完成,一般的场景下采用这种方式即可。

但是有的时候我们会遇到列数大于22的情况,这个时候会受到scala的tuple数不能超过22的影响。这时可以采用方式二:

val spark = SparkSession.builder()
  .appName("Test")
  .getOrCreate()
dStream.foreachRDD{ rdd =>
  val res:RDD[Row] = rdd.map{ row =>
    val buffer = ArrayBuffer.empty[Any]
    val fields: Array[String] = row.split("\\|~\\|")
    buffer.append(fields(0))
    buffer.append(fields(1))
    buffer.append(fields(2))
    // 省略
    buffer.append(fields(25))
    Row.fromSeq(buffer)
  } 
  val schema = StructType(Seq(
    StructField("col1", StringType, false),
    StructField("col2", StringType, false),
    StructField("col3", StringType, false),
    // 省略
    StructField("col26", StringType, false)
  ))
  val df: DataFrame = spark.createDataFrame(result, schema)
  // 业务逻辑
}

原文链接:https://www.cnblogs.com/icecola/p/11176600.html

全部评论: 0

    我有话说: