added the isTest checkpoint

This commit is contained in:
Miriam Baglioni 2020-04-16 10:53:48 +02:00
parent c28333d43f
commit 53f418098b
1 changed files with 9 additions and 11 deletions

View File

@ -8,6 +8,7 @@ import eu.dnetlib.dhp.countrypropagation.SparkCountryPropagationJob2;
import eu.dnetlib.dhp.schema.oaf.*; import eu.dnetlib.dhp.schema.oaf.*;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.spark.SparkConf; import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaRDD;
@ -58,9 +59,6 @@ public class SparkResultToOrganizationFromIstRepoJob2 {
final String alreadylinked = parser.get("alreadyLinkedPath"); final String alreadylinked = parser.get("alreadyLinkedPath");
log.info("alreadyLinkedPath: {}", alreadylinked); log.info("alreadyLinkedPath: {}", alreadylinked);
final String resultorganizationsetpath = parser.get("resultOrganizationsetPath");
log.info("resultOrganizationsetPath: {}", resultorganizationsetpath);
final String resultClassName = parser.get("resultTableName"); final String resultClassName = parser.get("resultTableName");
log.info("resultTableName: {}", resultClassName); log.info("resultTableName: {}", resultClassName);
@ -93,9 +91,6 @@ public class SparkResultToOrganizationFromIstRepoJob2 {
writeUpdates, saveGraph); writeUpdates, saveGraph);
}); });
} }
private static void execPropagation(SparkSession spark, String datasourceorganization, String alreadylinked, String inputPath, private static void execPropagation(SparkSession spark, String datasourceorganization, String alreadylinked, String inputPath,
@ -122,17 +117,20 @@ public class SparkResultToOrganizationFromIstRepoJob2 {
.read() .read()
.textFile(alreadylinked) .textFile(alreadylinked)
.map(value -> OBJECT_MAPPER.readValue(value, ResultOrganizationSet.class), .map(value -> OBJECT_MAPPER.readValue(value, ResultOrganizationSet.class),
Encoders.bean(ResultOrganizationSet.class)), potentialUpdates); Encoders.bean(ResultOrganizationSet.class)), potentialUpdates)
.toJavaRDD()
.map(r -> OBJECT_MAPPER.writeValueAsString(r))
.saveAsTextFile(outputPath , GzipCodec.class);
} }
} }
private static Dataset<Relation> getNewRelations(Dataset<ResultOrganizationSet> alreadyLinked, Dataset<ResultOrganizationSet> potentialUpdates) { private static Dataset<Relation> getNewRelations(Dataset<ResultOrganizationSet> alreadyLinked, Dataset<ResultOrganizationSet> potentialUpdates) {
return potentialUpdates return potentialUpdates
.joinWith(alreadyLinked, potentialUpdates.col("resultId") .joinWith(alreadyLinked, potentialUpdates.col("resultId")
.equalTo(alreadyLinked.col("resultId")), "left_outer") .equalTo(alreadyLinked.col("resultId")), "left_outer").flatMap((FlatMapFunction<Tuple2<ResultOrganizationSet, ResultOrganizationSet>, Relation>) value -> {
.flatMap((FlatMapFunction<Tuple2<ResultOrganizationSet, ResultOrganizationSet>, Relation>) value -> {
List<Relation> new_relations = new ArrayList<>(); List<Relation> new_relations = new ArrayList<>();
ResultOrganizationSet potential_update = value._1(); ResultOrganizationSet potential_update = value._1();
Optional<ResultOrganizationSet> already_linked = Optional.ofNullable(value._2()); Optional<ResultOrganizationSet> already_linked = Optional.ofNullable(value._2());