package eu.dnetlib.dhp.provision; import org.apache.commons.io.IOUtils; import org.apache.hadoop.io.compress.GzipCodec; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoders; import org.apache.spark.sql.SaveMode; import org.apache.spark.sql.SparkSession; import com.fasterxml.jackson.databind.ObjectMapper; import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.provision.scholix.*; import eu.dnetlib.dhp.provision.scholix.summary.ScholixSummary; import eu.dnetlib.dhp.schema.oaf.Relation; import scala.Tuple2; public class SparkGenerateScholix { public static void main(String[] args) throws Exception { final ArgumentApplicationParser parser = new ArgumentApplicationParser( IOUtils .toString( SparkGenerateScholix.class .getResourceAsStream( "/eu/dnetlib/dhp/provision/input_generate_summary_parameters.json"))); parser.parseArgument(args); SparkConf conf = new SparkConf(); conf.set("spark.sql.shuffle.partitions", "4000"); conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); final SparkSession spark = SparkSession .builder() .config(conf) .appName(SparkExtractRelationCount.class.getSimpleName()) .master(parser.get("master")) .getOrCreate(); conf .registerKryoClasses( new Class[] { Scholix.class, ScholixCollectedFrom.class, ScholixEntityId.class, ScholixIdentifier.class, ScholixRelationship.class, ScholixResource.class }); final String graphPath = parser.get("graphPath"); final String workingDirPath = parser.get("workingDirPath"); final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); final Dataset scholixSummary = spark .read() .load(workingDirPath + "/summary") .as(Encoders.bean(ScholixSummary.class)); final Dataset rels = spark.read().load(graphPath + "/relation").as(Encoders.bean(Relation.class)); Dataset firstJoin = scholixSummary .joinWith(rels, scholixSummary.col("id").equalTo(rels.col("source"))) .map( (MapFunction, Scholix>) f -> Scholix .generateScholixWithSource(f._1(), f._2()), Encoders.bean(Scholix.class)); firstJoin.write().mode(SaveMode.Overwrite).save(workingDirPath + "/scholix_1"); Dataset scholix_final = spark .read() .load(workingDirPath + "/scholix_1") .as(Encoders.bean(Scholix.class)); scholixSummary .map( (MapFunction) ScholixResource::fromSummary, Encoders.bean(ScholixResource.class)) .repartition(1000) .write() .mode(SaveMode.Overwrite) .save(workingDirPath + "/scholix_target"); Dataset target = spark .read() .load(workingDirPath + "/scholix_target") .as(Encoders.bean(ScholixResource.class)); scholix_final .joinWith( target, scholix_final.col("identifier").equalTo(target.col("dnetIdentifier")), "inner") .map( (MapFunction, Scholix>) f -> { final Scholix scholix = f._1(); final ScholixResource scholixTarget = f._2(); scholix.setTarget(scholixTarget); scholix.generateIdentifier(); scholix.generatelinkPublisher(); return scholix; }, Encoders.kryo(Scholix.class)) .javaRDD() .map( s -> { ObjectMapper mapper = new ObjectMapper(); return mapper.writeValueAsString(s); }) .saveAsTextFile(workingDirPath + "/scholix_json", GzipCodec.class); } }