Hierarchical data manipulation in Apache Spark

java apache-spark apache-spark-sql spark-graphx graphframes

1155 观看

1回复

31 作者的声誉

I am having a Dataset in Spark (v2.1.1) with 3 columns (as shown below) containing hierarchical data.

  • My target objective is to assign incremental numbering to each row based on the parent-child hierarchy. Graphically it can be said that the hierarchical data is a collection of trees.
  • As per below table, I already have the rows grouped based on 'Global_ID'. Now I would like to generate the 'Value' column in an incremental order but based on the hierarchy of data from 'Parent' and 'Child' columns.

Tabular Representation (Value is the desired output):

    +-----------+--------+-------+         +-----------+--------+-------+-------+
    |      Current Dataset       |         |      Desired Dataset (Output)      |
    +-----------+--------+-------+         +-----------+--------+-------+-------+
    | Global_ID | Parent | Child |         | Global_ID | Parent | Child | Value |
    +-----------+--------+-------+         +-----------+--------+-------+-------+
    |       111 |    111 |   123 |         |       111 |    111 |   111 |     1 |
    |       111 |    135 |   246 |         |       111 |    111 |   123 |     2 |
    |       111 |    123 |   456 |         |       111 |    123 |   789 |     3 |
    |       111 |    123 |   789 |         |       111 |    123 |   456 |     4 |
    |       111 |    111 |   111 |         |       111 |    111 |   135 |     5 |
    |       111 |    135 |   468 |         |       111 |    135 |   246 |     6 |
    |       111 |    135 |   268 |         |       111 |    135 |   468 |     7 |
    |       111 |    268 |   321 |         |       111 |    135 |   268 |     8 |
    |       111 |    138 |   139 |         |       111 |    268 |   321 |     9 |
    |       111 |    111 |   135 |         |       111 |    111 |   138 |    10 |
    |       111 |    111 |   138 |         |       111 |    138 |   139 |    11 |
    |       222 |    222 |   654 |         |       222 |    222 |   222 |    12 |
    |       222 |    654 |   721 |         |       222 |    222 |   987 |    13 |
    |       222 |    222 |   222 |         |       222 |    222 |   654 |    14 |
    |       222 |    721 |   127 |         |       222 |    654 |   721 |    15 |
    |       222 |    222 |   987 |         |       222 |    721 |   127 |    16 |
    |       333 |    333 |   398 |         |       333 |    333 |   333 |    17 |
    |       333 |    333 |   498 |         |       333 |    333 |   398 |    18 |
    |       333 |    333 |   333 |         |       333 |    333 |   498 |    19 |
    |       333 |    333 |   598 |         |       333 |    333 |   598 |    20 |
    +-----------+--------+-------+         +-----------+--------+-------+-------+

Tree Representation (Desired value is represented next to each node):

                      +-----+                                           +-----+
                   1  | 111 |                                       17  | 333 |
                      +--+--+                                           +--+--+
                         |                                                 |
         +---------------+--------+-----------------+           +----------+----------+
         |                        |                 |           |          |          |
      +--v--+                  +--v--+           +--v--+     +--v--+    +--v--+    +--v--+
   2  | 123 |                5 | 135 |        10 | 138 |     | 398 |    | 498 |    | 598 |
      +--+--+                  +--+--+           +--+--+     +--+--+    +--+--+    +--+--+  
   +-----+-----+         +--------+--------+        |          18         19         20
   |           |         |        |        |        |  
+--v--+     +--v--+   +--v--+  +--v--+  +--v--+  +--v--+ 
| 789 |     | 456 |   | 246 |  | 468 |  | 268 |  | 139 |                 +-----+
+-----+     +-----+   +-----+  +-----+  +--+--+  +-----+             12  | 222 |
   3           4         6        7      8 |        11                   +--+--+
                                        +--v--+                             |
                                        | 321 |                      +------+-------+
                                        +--+--+                      |              |
                                           9                      +--v--+        +--v--+
                                                               13 | 987 |    14  | 654 |
                                                                  +--+--+        +--+--+
                                                                                    |
                                                                                 +--v--+
                                                                             15  | 721 |
                                                                                 +--+--+
                                                                                    |
                                                                                 +--v--+
                                                                             16  | 127 |
                                                                                 +--+--+

Code Snippet:

Dataset<Row> myDataset = spark
                .sql("select Global_ID, Parent, Child from RECORDS");

JavaPairRDD<Row,Long> finalDataset = myDataset.groupBy(new Column("Global_ID"))
    .agg(functions.sort_array(functions.collect_list(new Column("Parent").as("parent_col"))),
        functions.sort_array(functions.collect_list(new Column("Child").as("child_col"))))
    .orderBy(new Column("Global_ID"))
    .withColumn("vars", functions.explode(<Spark UDF>)
    .select(new Column("vars"),new Column("parent_col"),new Column("child_col"))
    .javaRDD().zipWithIndex();


// Sample UDF (TODO: Actual Implementation)   
spark.udf().register("computeValue",
                (<Column Names>) -> <functionality & implementation>,
                DataTypes.<xxx>);

After lot of research and going through many suggestions in blogs, I have tried the below approaches but of no avail to achieve the result for my scenario.

Tech Stack :

  • Apache Spark (v2.1.1)

  • Java 8

  • AWS EMR Cluster (Spark App Deployment)


Data Volume:

  • Approximately ~20 million rows in the Dataset

Approaches Tried:

  1. Spark GraphX + GraphFrames:

  2. Spark GraphX Pregel API:


Any suggestions for alternatives (or) modifications in current approaches would be really helpful as I am totally lost in figuring out the solution for this use case.

Appreciate your help! Thank you!

作者: Sridher 的来源 发布者: 2017 年 12 月 27 日

回应 1


0

2269 作者的声誉

Note: The below solution is in scala spark. You can easily translate to java code.

Check this out. I tried doing it using Spark Sql you can get an idea. Basically idea is to sort the child, parent and globalid while aggregating and grouping them. Once grouped and sorted by globalid expand the rest. You will get ordered result table to which later you can zipWithIndex to add the rank (value)

   import org.apache.spark.sql.SQLContext
   import org.apache.spark.sql.functions._
   import org.apache.spark.sql.expressions.UserDefinedFunction
   import org.apache.spark.sql.functions.udf

   val sqlContext = new SQLContext(sc)
   import sqlContext.implicits._

   val t = Seq((111,111,123), (111,111,111), (111,123,789), (111,268,321), (222,222,654), (222,222,222), (222,721,127), (333,333,398), (333,333,333), (333,333,598))
   val ddd = sc.parallelize(t).toDF
   val zip = udf((xs: Seq[Int], ys: Seq[Int]) => xs zip ys)
   val dd1 = ddd
    .groupBy($"_1")
    .agg(sort_array(collect_list($"_2")).as("v"),
         sort_array(collect_list($"_3")).as("w"))
    .orderBy(asc("_1"))
    .withColumn("vars", explode(zip($"v", $"w")))
    .select($"_1", $"vars._1", $"vars._2").rdd.zipWithIndex

  dd1.collect

Output

    res24: Array[(org.apache.spark.sql.Row, Long)] = Array(([111,111,111],0), ([111,111,123],1), ([111,123,321],2),
([111,268,789],3), ([222,222,127],4), ([222,222,222],5), ([222,721,654],6),([333,333,333],7), ([333,333,398],8), ([333,333,598],9))
作者: Sumeet Sharma 发布者: 2017 年 12 月 27 日
32x32