Add Pubmed affiliations (inferred by BIP) as actionsets #353
|
@ -75,21 +75,27 @@ public class PrepareAffiliationRelations implements Serializable {
|
||||||
spark -> {
|
spark -> {
|
||||||
Constants.removeOutputDir(spark, outputPath);
|
Constants.removeOutputDir(spark, outputPath);
|
||||||
|
|
||||||
List<KeyValue> collectedFromCrossref = OafMapperUtils.listKeyValues(ModelConstants.CROSSREF_ID, "Crossref");
|
List<KeyValue> collectedFromCrossref = OafMapperUtils
|
||||||
JavaPairRDD<Text, Text> crossrefRelations = prepareAffiliationRelations(spark, inputPath, collectedFromCrossref);
|
.listKeyValues(ModelConstants.CROSSREF_ID, "Crossref");
|
||||||
|
JavaPairRDD<Text, Text> crossrefRelations = prepareAffiliationRelations(
|
||||||
|
spark, inputPath, collectedFromCrossref);
|
||||||
|
|
||||||
List<KeyValue> collectedFromPubmed = OafMapperUtils.listKeyValues(ModelConstants.PUBMED_CENTRAL_ID, "Pubmed");
|
List<KeyValue> collectedFromPubmed = OafMapperUtils
|
||||||
JavaPairRDD<Text, Text> pubmedRelations = prepareAffiliationRelations(spark, inputPath, collectedFromPubmed);
|
.listKeyValues(ModelConstants.PUBMED_CENTRAL_ID, "Pubmed");
|
||||||
|
JavaPairRDD<Text, Text> pubmedRelations = prepareAffiliationRelations(
|
||||||
|
spark, inputPath, collectedFromPubmed);
|
||||||
|
|
||||||
crossrefRelations
|
crossrefRelations
|
||||||
.union(pubmedRelations)
|
.union(pubmedRelations)
|
||||||
.saveAsHadoopFile(outputPath, Text.class, Text.class, SequenceFileOutputFormat.class, GzipCodec.class);
|
.saveAsHadoopFile(
|
||||||
|
outputPath, Text.class, Text.class, SequenceFileOutputFormat.class, GzipCodec.class);
|
||||||
|
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
private static <I extends Result> JavaPairRDD<Text, Text> prepareAffiliationRelations(SparkSession spark, String inputPath,
|
private static <I extends Result> JavaPairRDD<Text, Text> prepareAffiliationRelations(SparkSession spark,
|
||||||
List<KeyValue> collectedfrom) {
|
String inputPath,
|
||||||
|
List<KeyValue> collectedfrom) {
|
||||||
|
|
||||||
// load and parse affiliation relations from HDFS
|
// load and parse affiliation relations from HDFS
|
||||||
Dataset<Row> df = spark
|
Dataset<Row> df = spark
|
||||||
|
|
|
@ -87,7 +87,7 @@
|
||||||
<spark xmlns="uri:oozie:spark-action:0.2">
|
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||||
<master>yarn</master>
|
<master>yarn</master>
|
||||||
<mode>cluster</mode>
|
<mode>cluster</mode>
|
||||||
<name>Produces the atomic action with the inferred by BIP! affiliation relations from Crossref</name>
|
<name>Produces the atomic action with the inferred by BIP! affiliation relations (from Crossref and Pubmed)</name>
|
||||||
<class>eu.dnetlib.dhp.actionmanager.bipaffiliations.PrepareAffiliationRelations</class>
|
<class>eu.dnetlib.dhp.actionmanager.bipaffiliations.PrepareAffiliationRelations</class>
|
||||||
<jar>dhp-aggregation-${projectVersion}.jar</jar>
|
<jar>dhp-aggregation-${projectVersion}.jar</jar>
|
||||||
<spark-opts>
|
<spark-opts>
|
||||||
|
|
Loading…
Reference in New Issue