Add changes from code review
This commit is contained in:
parent
be320ba3c1
commit
ebfba38ab6
|
@ -10,15 +10,15 @@ import java.util.stream.Collectors;
|
|||
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import org.apache.hadoop.io.Text;
|
||||
import org.apache.hadoop.io.compress.GzipCodec;
|
||||
import org.apache.hadoop.mapred.SequenceFileOutputFormat;
|
||||
import org.apache.spark.SparkConf;
|
||||
import org.apache.spark.api.java.JavaRDD;
|
||||
import org.apache.spark.api.java.JavaSparkContext;
|
||||
import org.apache.spark.api.java.function.FlatMapFunction;
|
||||
import org.apache.spark.api.java.function.MapFunction;
|
||||
import org.apache.spark.sql.*;
|
||||
import org.apache.spark.sql.Dataset;
|
||||
import org.apache.spark.sql.Encoders;
|
||||
import org.apache.spark.sql.SparkSession;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
|
@ -82,45 +82,32 @@ public class PrepareAffiliationRelations implements Serializable {
|
|||
private static <I extends Result> void prepareAffiliationRelations(SparkSession spark, String inputPath,
|
||||
String outputPath) {
|
||||
|
||||
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
|
||||
|
||||
// load and parse affiliation relations from HDFS
|
||||
JavaRDD<AffiliationRelationDeserializer> affiliationRelationsDeserializeRDD = sc
|
||||
.textFile(inputPath)
|
||||
.map(item -> OBJECT_MAPPER.readValue(item, AffiliationRelationDeserializer.class));
|
||||
Dataset<Row> df = spark
|
||||
.read()
|
||||
.schema("`DOI` STRING, `Matchings` ARRAY<STRUCT<`RORid`:ARRAY<STRING>,`Confidence`:DOUBLE>>")
|
||||
.json(inputPath);
|
||||
|
||||
// convert affiliation to an internal representation
|
||||
Dataset<AffiliationRelationModel> affiliationRelations = spark
|
||||
.createDataset(
|
||||
affiliationRelationsDeserializeRDD
|
||||
.flatMap(
|
||||
entry -> entry
|
||||
.getMatchings()
|
||||
.stream()
|
||||
.flatMap(
|
||||
matching -> matching
|
||||
.getRorId()
|
||||
.stream()
|
||||
.map(
|
||||
rorId -> new AffiliationRelationModel(
|
||||
entry.getDoi(),
|
||||
rorId,
|
||||
matching.getConfidence())))
|
||||
.collect(Collectors.toList())
|
||||
.iterator())
|
||||
.rdd(),
|
||||
Encoders.bean(AffiliationRelationModel.class));
|
||||
// unroll nested arrays
|
||||
df = df
|
||||
.withColumn("matching", functions.explode(new Column("Matchings")))
|
||||
.withColumn("rorid", functions.explode(new Column("matching.RORid")))
|
||||
.select(
|
||||
new Column("DOI").as("doi"),
|
||||
new Column("rorid"),
|
||||
new Column("matching.Confidence").as("confidence"));
|
||||
|
||||
// prepare action sets for affiliation relations
|
||||
affiliationRelations
|
||||
.flatMap((FlatMapFunction<AffiliationRelationModel, Relation>) affRel -> {
|
||||
df
|
||||
.toJavaRDD()
|
||||
.flatMap((FlatMapFunction<Row, Relation>) row -> {
|
||||
|
||||
// DOI to OpenAIRE id
|
||||
final String paperId = ID_PREFIX
|
||||
+ IdentifierFactory.md5(CleaningFunctions.normalizePidValue("doi", affRel.getDoi()));
|
||||
+ IdentifierFactory.md5(CleaningFunctions.normalizePidValue("doi", row.getAs("doi")));
|
||||
|
||||
// ROR id to OpenAIRE id
|
||||
final String affId = GenerateRorActionSetJob.calculateOpenaireId(affRel.getRorId());
|
||||
final String affId = GenerateRorActionSetJob.calculateOpenaireId(row.getAs("rorid"));
|
||||
|
||||
Qualifier qualifier = OafMapperUtils
|
||||
.qualifier(
|
||||
|
@ -137,18 +124,17 @@ public class PrepareAffiliationRelations implements Serializable {
|
|||
true,
|
||||
false,
|
||||
qualifier,
|
||||
Double.toString(affRel.getConfidence()));
|
||||
Double.toString(row.getAs("confidence")));
|
||||
|
||||
// return bi-directional relations
|
||||
return getAffiliationRelationPair(paperId, affId, dataInfo).iterator();
|
||||
|
||||
}, Encoders.bean(Relation.class))
|
||||
.toJavaRDD()
|
||||
})
|
||||
.map(p -> new AtomicAction(Relation.class, p))
|
||||
.mapToPair(
|
||||
aa -> new Tuple2<>(new Text(aa.getClazz().getCanonicalName()),
|
||||
new Text(OBJECT_MAPPER.writeValueAsString(aa))))
|
||||
.saveAsHadoopFile(outputPath, Text.class, Text.class, SequenceFileOutputFormat.class);
|
||||
.saveAsHadoopFile(outputPath, Text.class, Text.class, SequenceFileOutputFormat.class, GzipCodec.class);
|
||||
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue