void mergeGraphTable( + SparkSession spark, + String priority, + String betaInputPath, + String prodInputPath, + Class
p_clazz,
+ Class b_clazz,
+ String outputPath) {
+
+ Dataset p = Optional.ofNullable(value._1()).map(Tuple2::_2);
+ Optional b = Optional.ofNullable(value._2()).map(Tuple2::_2);
+
+ if (p.orElse((P) b.orElse((B) DATASOURCE)) instanceof Datasource) {
+ return mergeDatasource(p, b);
+ }
+ switch (priority) {
+ default:
+ case "BETA":
+ return mergeWithPriorityToBETA(p, b);
+ case "PROD":
+ return mergeWithPriorityToPROD(p, b);
+ }
+ }, Encoders.bean(p_clazz))
+ .filter((FilterFunction ) Objects::nonNull)
+ .write()
+ .mode(SaveMode.Overwrite)
+ .option("compression", "gzip")
+ .json(outputPath);
+ }
+
+ /**
+ * Datasources involved in the merge operation doesn't obey to the infra precedence policy, but relies on a custom
+ * behaviour that, given two datasources from beta and prod returns the one from prod with the highest
+ * compatibility among the two.
+ *
+ * @param p datasource from PROD
+ * @param b datasource from BETA
+ * @param Datasource class type from PROD
+ * @param Datasource class type from BETA
+ * @return the datasource from PROD with the highest compatibility level.
+ */
+ protected static P mergeDatasource(Optional p, Optional b) {
+ if (p.isPresent() & !b.isPresent()) {
+ return p.get();
+ }
+ if (b.isPresent() & !p.isPresent()) {
+ return (P) b.get();
+ }
+ if (!b.isPresent() & !p.isPresent()) {
+ return null; // unlikely, at least one should be produced by the join operation
+ }
+
+ Datasource dp = (Datasource) p.get();
+ Datasource db = (Datasource) b.get();
+
+ List P mergeWithPriorityToPROD(Optional p, Optional b) {
+ if (b.isPresent() & !p.isPresent()) {
+ return (P) b.get();
+ }
+ if (p.isPresent()) {
+ return p.get();
+ }
+ return null;
+ }
+
+ private static P mergeWithPriorityToBETA(Optional p, Optional b) {
+ if (p.isPresent() & !b.isPresent()) {
+ return p.get();
+ }
+ if (b.isPresent()) {
+ return (P) b.get();
+ }
+ return null;
+ }
+
+ private static