Spark Scala reduceByKey - how to reference to keys specified in configuration file? -
i have dataset following schema:
dataset.printschema() |-- id: string (nullable = true) |-- feature1: double (nullable = true) |-- feature2: double (nullable = true) |-- feature3: double (nullable = true) |-- feature4: double (nullable = true) in application.conf have defined subset of keys should transformed using reducebykey:
keyinfo { keystobetransformed = "feature1,feature2" } i can load these keys main object:
val config : config = configfactory.load() val keys : array[string] = config.getstring("keyinfo.keystobetransformed").split(",") for these keys, need compute mean each id in dataset , collect result array. currently, use following approach:
val meanfeature1 : array[double] = dataset.map(x => (x.id, x.feature1)).rdd .mapvalues{z => (z,1)} .reducebykey{(x,y) => (x._1 + y._1, x._2 + y._2)} .map( x => { val temp = x._2 val total = temp._1 val count = temp._2 (x._1, total / count) }).collect().sortby(_._1).map(_._2), val meanfeature2 : array[double] = dataset.map(x => (x.id, x.feature2)).rdd .mapvalues{z => (z,1)} .reducebykey{(x,y) => (x._1 + y._1, x._2 + y._2)} .map( x => { val temp = x._2 val total = temp._1 val count = temp._2 (x._1, total / count) }).collect().sortby(_._1).map(_._2) the problem of above approach not make reference keys specified in application.conf (the computation not change dynamically when keys re-specified in application.conf)
how can achieve this?
i think dataframe api more suitable in case, better supports dynamically accessing columns name. , converting dataset dataframe trivial:
val averagesperid: array[array[double]] = dataset .groupby("id") // converts dataframe .avg(keys: _*) // create average each key - creates "avg(featurex)" column each featurex key .sort("id") .map(r => keys.map(col => r.getas[double](s"avg($col)"))) // map rows array[double], 1 each id .collect() // transposing result create array each row relates single key, // , mapping each row key: val averagesperkey: map[string, array[double]] = keys.zip(averagesperid.transpose(identity)).tomap // example, if `feature1` in `keys`: val meanfeature1 = averagesperkey("feature1")
Comments
Post a Comment