1
0
Fork 0

Change the description of the workflow

This commit is contained in:
Serafeim Chatzopoulos 2023-10-20 12:48:21 +03:00
parent 6ce9b600c1
commit 24c3f92d87
2 changed files with 15 additions and 9 deletions

View File

@ -75,21 +75,27 @@ public class PrepareAffiliationRelations implements Serializable {
spark -> { spark -> {
Constants.removeOutputDir(spark, outputPath); Constants.removeOutputDir(spark, outputPath);
List<KeyValue> collectedFromCrossref = OafMapperUtils.listKeyValues(ModelConstants.CROSSREF_ID, "Crossref"); List<KeyValue> collectedFromCrossref = OafMapperUtils
JavaPairRDD<Text, Text> crossrefRelations = prepareAffiliationRelations(spark, inputPath, collectedFromCrossref); .listKeyValues(ModelConstants.CROSSREF_ID, "Crossref");
JavaPairRDD<Text, Text> crossrefRelations = prepareAffiliationRelations(
spark, inputPath, collectedFromCrossref);
List<KeyValue> collectedFromPubmed = OafMapperUtils.listKeyValues(ModelConstants.PUBMED_CENTRAL_ID, "Pubmed"); List<KeyValue> collectedFromPubmed = OafMapperUtils
JavaPairRDD<Text, Text> pubmedRelations = prepareAffiliationRelations(spark, inputPath, collectedFromPubmed); .listKeyValues(ModelConstants.PUBMED_CENTRAL_ID, "Pubmed");
JavaPairRDD<Text, Text> pubmedRelations = prepareAffiliationRelations(
spark, inputPath, collectedFromPubmed);
crossrefRelations crossrefRelations
.union(pubmedRelations) .union(pubmedRelations)
.saveAsHadoopFile(outputPath, Text.class, Text.class, SequenceFileOutputFormat.class, GzipCodec.class); .saveAsHadoopFile(
outputPath, Text.class, Text.class, SequenceFileOutputFormat.class, GzipCodec.class);
}); });
} }
private static <I extends Result> JavaPairRDD<Text, Text> prepareAffiliationRelations(SparkSession spark, String inputPath, private static <I extends Result> JavaPairRDD<Text, Text> prepareAffiliationRelations(SparkSession spark,
List<KeyValue> collectedfrom) { String inputPath,
List<KeyValue> collectedfrom) {
// load and parse affiliation relations from HDFS // load and parse affiliation relations from HDFS
Dataset<Row> df = spark Dataset<Row> df = spark

View File

@ -87,7 +87,7 @@
<spark xmlns="uri:oozie:spark-action:0.2"> <spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master> <master>yarn</master>
<mode>cluster</mode> <mode>cluster</mode>
<name>Produces the atomic action with the inferred by BIP! affiliation relations from Crossref</name> <name>Produces the atomic action with the inferred by BIP! affiliation relations (from Crossref and Pubmed)</name>
<class>eu.dnetlib.dhp.actionmanager.bipaffiliations.PrepareAffiliationRelations</class> <class>eu.dnetlib.dhp.actionmanager.bipaffiliations.PrepareAffiliationRelations</class>
<jar>dhp-aggregation-${projectVersion}.jar</jar> <jar>dhp-aggregation-${projectVersion}.jar</jar>
<spark-opts> <spark-opts>