package eu.dnetlib.dhp.oa.graph.raw; import static eu.dnetlib.dhp.common.GraphSupport.*; import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkHiveSession; import java.util.Objects; import java.util.Optional; import org.apache.commons.io.IOUtils; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoders; import org.apache.spark.sql.SparkSession; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.common.GraphFormat; import eu.dnetlib.dhp.schema.common.ModelSupport; import eu.dnetlib.dhp.schema.oaf.Oaf; import scala.Tuple2; public class MergeClaimsApplication { private static final Logger log = LoggerFactory.getLogger(MergeClaimsApplication.class); public static void main(final String[] args) throws Exception { final ArgumentApplicationParser parser = new ArgumentApplicationParser( IOUtils .toString( MigrateMongoMdstoresApplication.class .getResourceAsStream( "/eu/dnetlib/dhp/oa/graph/merge_claims_parameters.json"))); parser.parseArgument(args); Boolean isSparkSessionManaged = Optional .ofNullable(parser.get("isSparkSessionManaged")) .map(Boolean::valueOf) .orElse(Boolean.TRUE); log.info("isSparkSessionManaged: {}", isSparkSessionManaged); final String rawGraphPath = parser.get("rawGraphPath"); log.info("rawGraphPath: {}", rawGraphPath); final String claimsGraphPath = parser.get("claimsGraphPath"); log.info("claimsGraphPath: {}", claimsGraphPath); final String outputGraph = parser.get("outputGraph"); log.info("outputGraph: {}", outputGraph); final GraphFormat format = Optional .ofNullable(parser.get("format")) .map(GraphFormat::valueOf) .orElse(GraphFormat.DEFAULT); log.info("graphFormat: {}", format); String graphTableClassName = parser.get("graphTableClassName"); log.info("graphTableClassName: {}", graphTableClassName); Class clazz = (Class) Class.forName(graphTableClassName); String hiveMetastoreUris = parser.get("hiveMetastoreUris"); log.info("hiveMetastoreUris: {}", hiveMetastoreUris); SparkConf conf = new SparkConf(); conf.set("hive.metastore.uris", hiveMetastoreUris); conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); conf.registerKryoClasses(ModelSupport.getOafModelClasses()); runWithSparkHiveSession( conf, isSparkSessionManaged, spark -> mergeByType(spark, rawGraphPath, claimsGraphPath, outputGraph, format, clazz)); } private static void mergeByType( SparkSession spark, String rawPath, String claimPath, String outputGraph, GraphFormat format, Class clazz) { Dataset> raw = readGraphJSON(spark, rawPath, clazz) .map( (MapFunction>) value -> new Tuple2<>(ModelSupport.idFn().apply(value), value), Encoders.tuple(Encoders.STRING(), Encoders.kryo(clazz))); Dataset> claim = readGraphJSON(spark, claimPath, clazz) .map( (MapFunction>) value -> new Tuple2<>(ModelSupport.idFn().apply(value), value), Encoders.tuple(Encoders.STRING(), Encoders.kryo(clazz))); Dataset merged = raw .joinWith(claim, raw.col("_1").equalTo(claim.col("_1")), "full_outer") .map( (MapFunction, Tuple2>, T>) value -> { Optional> opRaw = Optional.ofNullable(value._1()); Optional> opClaim = Optional.ofNullable(value._2()); return opRaw.isPresent() ? opRaw.get()._2() : opClaim.isPresent() ? opClaim.get()._2() : null; }, Encoders.bean(clazz)) .filter(Objects::nonNull); saveGraphTable(merged, clazz, outputGraph, format); } }