Spark Scala reduceByKey - how to reference to keys specified in configuration file? -


i have dataset following schema:

  dataset.printschema()   |-- id: string (nullable = true)   |-- feature1: double (nullable = true)   |-- feature2: double (nullable = true)   |-- feature3: double (nullable = true)   |-- feature4: double (nullable = true) 

in application.conf have defined subset of keys should transformed using reducebykey:

keyinfo {     keystobetransformed = "feature1,feature2" } 

i can load these keys main object:

val config : config = configfactory.load() val keys : array[string] = config.getstring("keyinfo.keystobetransformed").split(",") 

for these keys, need compute mean each id in dataset , collect result array. currently, use following approach:

    val meanfeature1 : array[double] = dataset.map(x => (x.id, x.feature1)).rdd     .mapvalues{z => (z,1)}     .reducebykey{(x,y) => (x._1 + y._1, x._2 + y._2)}     .map( x => {       val temp = x._2       val total = temp._1       val count = temp._2       (x._1, total / count)     }).collect().sortby(_._1).map(_._2),     val meanfeature2 : array[double] = dataset.map(x => (x.id, x.feature2)).rdd     .mapvalues{z => (z,1)}     .reducebykey{(x,y) => (x._1 + y._1, x._2 + y._2)}     .map( x => {       val temp = x._2       val total = temp._1       val count = temp._2       (x._1, total / count)     }).collect().sortby(_._1).map(_._2) 

the problem of above approach not make reference keys specified in application.conf (the computation not change dynamically when keys re-specified in application.conf)

how can achieve this?

i think dataframe api more suitable in case, better supports dynamically accessing columns name. , converting dataset dataframe trivial:

val averagesperid: array[array[double]] = dataset   .groupby("id") // converts dataframe   .avg(keys: _*) // create average each key - creates "avg(featurex)" column each featurex key   .sort("id")   .map(r => keys.map(col => r.getas[double](s"avg($col)"))) // map rows array[double], 1 each id   .collect()  // transposing result create array each row relates single key, // , mapping each row key: val averagesperkey: map[string, array[double]] = keys.zip(averagesperid.transpose(identity)).tomap  // example, if `feature1` in `keys`: val meanfeature1 = averagesperkey("feature1") 

Comments

Popular posts from this blog

inversion of control - Autofac named registration constructor injection -

verilog - Systemverilog dynamic casting issues -

ios - Change Storyboard View using Seague -