forked from D-Net/dnet-hadoop
Merge pull request '9117_pubmed_affiliations_prod' (#357) from 9117_pubmed_affiliations_prod into master
Reviewed-on: D-Net/dnet-hadoop#357
This commit is contained in:
commit
97454e9594
|
@ -12,6 +12,7 @@ import org.apache.hadoop.io.Text;
|
|||
import org.apache.hadoop.io.compress.GzipCodec;
|
||||
import org.apache.hadoop.mapred.SequenceFileOutputFormat;
|
||||
import org.apache.spark.SparkConf;
|
||||
import org.apache.spark.api.java.JavaPairRDD;
|
||||
import org.apache.spark.api.java.function.FlatMapFunction;
|
||||
import org.apache.spark.sql.*;
|
||||
import org.apache.spark.sql.Dataset;
|
||||
|
@ -57,11 +58,14 @@ public class PrepareAffiliationRelations implements Serializable {
|
|||
Boolean isSparkSessionManaged = Constants.isSparkSessionManaged(parser);
|
||||
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
|
||||
|
||||
final String inputPath = parser.get("inputPath");
|
||||
log.info("inputPath {}: ", inputPath);
|
||||
final String crossrefInputPath = parser.get("crossrefInputPath");
|
||||
log.info("crossrefInputPath: {}", crossrefInputPath);
|
||||
|
||||
final String pubmedInputPath = parser.get("pubmedInputPath");
|
||||
log.info("pubmedInputPath: {}", pubmedInputPath);
|
||||
|
||||
final String outputPath = parser.get("outputPath");
|
||||
log.info("outputPath {}: ", outputPath);
|
||||
log.info("outputPath: {}", outputPath);
|
||||
|
||||
SparkConf conf = new SparkConf();
|
||||
|
||||
|
@ -70,12 +74,28 @@ public class PrepareAffiliationRelations implements Serializable {
|
|||
isSparkSessionManaged,
|
||||
spark -> {
|
||||
Constants.removeOutputDir(spark, outputPath);
|
||||
prepareAffiliationRelations(spark, inputPath, outputPath);
|
||||
|
||||
List<KeyValue> collectedFromCrossref = OafMapperUtils
|
||||
.listKeyValues(ModelConstants.CROSSREF_ID, "Crossref");
|
||||
JavaPairRDD<Text, Text> crossrefRelations = prepareAffiliationRelations(
|
||||
spark, crossrefInputPath, collectedFromCrossref);
|
||||
|
||||
List<KeyValue> collectedFromPubmed = OafMapperUtils
|
||||
.listKeyValues(ModelConstants.PUBMED_CENTRAL_ID, "Pubmed");
|
||||
JavaPairRDD<Text, Text> pubmedRelations = prepareAffiliationRelations(
|
||||
spark, pubmedInputPath, collectedFromPubmed);
|
||||
|
||||
crossrefRelations
|
||||
.union(pubmedRelations)
|
||||
.saveAsHadoopFile(
|
||||
outputPath, Text.class, Text.class, SequenceFileOutputFormat.class, GzipCodec.class);
|
||||
|
||||
});
|
||||
}
|
||||
|
||||
private static <I extends Result> void prepareAffiliationRelations(SparkSession spark, String inputPath,
|
||||
String outputPath) {
|
||||
private static <I extends Result> JavaPairRDD<Text, Text> prepareAffiliationRelations(SparkSession spark,
|
||||
String inputPath,
|
||||
List<KeyValue> collectedfrom) {
|
||||
|
||||
// load and parse affiliation relations from HDFS
|
||||
Dataset<Row> df = spark
|
||||
|
@ -92,7 +112,7 @@ public class PrepareAffiliationRelations implements Serializable {
|
|||
new Column("matching.Confidence").as("confidence"));
|
||||
|
||||
// prepare action sets for affiliation relations
|
||||
df
|
||||
return df
|
||||
.toJavaRDD()
|
||||
.flatMap((FlatMapFunction<Row, Relation>) row -> {
|
||||
|
||||
|
@ -120,8 +140,6 @@ public class PrepareAffiliationRelations implements Serializable {
|
|||
qualifier,
|
||||
Double.toString(row.getAs("confidence")));
|
||||
|
||||
List<KeyValue> collectedfrom = OafMapperUtils.listKeyValues(ModelConstants.CROSSREF_ID, "Crossref");
|
||||
|
||||
// return bi-directional relations
|
||||
return getAffiliationRelationPair(paperId, affId, collectedfrom, dataInfo).iterator();
|
||||
|
||||
|
@ -129,9 +147,7 @@ public class PrepareAffiliationRelations implements Serializable {
|
|||
.map(p -> new AtomicAction(Relation.class, p))
|
||||
.mapToPair(
|
||||
aa -> new Tuple2<>(new Text(aa.getClazz().getCanonicalName()),
|
||||
new Text(OBJECT_MAPPER.writeValueAsString(aa))))
|
||||
.saveAsHadoopFile(outputPath, Text.class, Text.class, SequenceFileOutputFormat.class, GzipCodec.class);
|
||||
|
||||
new Text(OBJECT_MAPPER.writeValueAsString(aa))));
|
||||
}
|
||||
|
||||
private static List<Relation> getAffiliationRelationPair(String paperId, String affId, List<KeyValue> collectedfrom,
|
||||
|
|
|
@ -6,9 +6,15 @@
|
|||
"paramRequired": false
|
||||
},
|
||||
{
|
||||
"paramName": "ip",
|
||||
"paramLongName": "inputPath",
|
||||
"paramDescription": "the URL from where to get the programme file",
|
||||
"paramName": "cip",
|
||||
"paramLongName": "crossrefInputPath",
|
||||
"paramDescription": "the path to get the input data from Crossref",
|
||||
"paramRequired": true
|
||||
},
|
||||
{
|
||||
"paramName": "pip",
|
||||
"paramLongName": "pubmedInputPath",
|
||||
"paramDescription": "the path to get the input data from Pubmed",
|
||||
"paramRequired": true
|
||||
},
|
||||
{
|
||||
|
|
|
@ -31,5 +31,6 @@ spark2SqlQueryExecutionListeners=com.cloudera.spark.lineage.NavigatorQueryListen
|
|||
# The following is needed as a property of a workflow
|
||||
oozie.wf.application.path=${oozieTopWfApplicationPath}
|
||||
|
||||
inputPath=/data/bip-affiliations/data.json
|
||||
crossrefInputPath=/data/bip-affiliations/data.json
|
||||
pubmedInputPath=/data/bip-affiliations/pubmed-data.json
|
||||
outputPath=/tmp/crossref-affiliations-output-v5
|
||||
|
|
|
@ -2,8 +2,12 @@
|
|||
<parameters>
|
||||
|
||||
<property>
|
||||
<name>inputPath</name>
|
||||
<description>the path where to find the inferred affiliation relations</description>
|
||||
<name>crossrefInputPath</name>
|
||||
<description>the path where to find the inferred affiliation relations from Crossref</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>pubmedInputPath</name>
|
||||
<description>the path where to find the inferred affiliation relations from Pubmed</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>outputPath</name>
|
||||
|
@ -83,7 +87,7 @@
|
|||
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||
<master>yarn</master>
|
||||
<mode>cluster</mode>
|
||||
<name>Produces the atomic action with the inferred by BIP! affiliation relations from Crossref</name>
|
||||
<name>Produces the atomic action with the inferred by BIP! affiliation relations (from Crossref and Pubmed)</name>
|
||||
<class>eu.dnetlib.dhp.actionmanager.bipaffiliations.PrepareAffiliationRelations</class>
|
||||
<jar>dhp-aggregation-${projectVersion}.jar</jar>
|
||||
<spark-opts>
|
||||
|
@ -96,7 +100,8 @@
|
|||
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
||||
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
|
||||
</spark-opts>
|
||||
<arg>--inputPath</arg><arg>${inputPath}</arg>
|
||||
<arg>--crossrefInputPath</arg><arg>${crossrefInputPath}</arg>
|
||||
<arg>--pubmedInputPath</arg><arg>${pubmedInputPath}</arg>
|
||||
<arg>--outputPath</arg><arg>${outputPath}</arg>
|
||||
</spark>
|
||||
<ok to="End"/>
|
||||
|
|
|
@ -74,17 +74,22 @@ public class PrepareAffiliationRelationsTest {
|
|||
@Test
|
||||
void testMatch() throws Exception {
|
||||
|
||||
String affiliationRelationsPath = getClass()
|
||||
String crossrefAffiliationRelationPath = getClass()
|
||||
.getResource("/eu/dnetlib/dhp/actionmanager/bipaffiliations/doi_to_ror.json")
|
||||
.getPath();
|
||||
|
||||
String pubmedAffiliationRelationsPath = getClass()
|
||||
.getResource("/eu/dnetlib/dhp/actionmanager/bipaffiliations/doi_to_ror.json")
|
||||
.getPath();
|
||||
|
||||
String outputPath = workingDir.toString() + "/actionSet";
|
||||
|
||||
PrepareAffiliationRelations
|
||||
.main(
|
||||
new String[] {
|
||||
"-isSparkSessionManaged", Boolean.FALSE.toString(),
|
||||
"-inputPath", affiliationRelationsPath,
|
||||
"-crossrefInputPath", crossrefAffiliationRelationPath,
|
||||
"-pubmedInputPath", pubmedAffiliationRelationsPath,
|
||||
"-outputPath", outputPath
|
||||
});
|
||||
|
||||
|
@ -101,7 +106,7 @@ public class PrepareAffiliationRelationsTest {
|
|||
// );
|
||||
// }
|
||||
// count the number of relations
|
||||
assertEquals(20, tmp.count());
|
||||
assertEquals(40, tmp.count());
|
||||
|
||||
Dataset<Relation> dataset = spark.createDataset(tmp.rdd(), Encoders.bean(Relation.class));
|
||||
dataset.createOrReplaceTempView("result");
|
||||
|
@ -112,7 +117,7 @@ public class PrepareAffiliationRelationsTest {
|
|||
// verify that we have equal number of bi-directional relations
|
||||
Assertions
|
||||
.assertEquals(
|
||||
10, execVerification
|
||||
20, execVerification
|
||||
.filter(
|
||||
"relClass='" + ModelConstants.HAS_AUTHOR_INSTITUTION + "'")
|
||||
.collectAsList()
|
||||
|
@ -120,7 +125,7 @@ public class PrepareAffiliationRelationsTest {
|
|||
|
||||
Assertions
|
||||
.assertEquals(
|
||||
10, execVerification
|
||||
20, execVerification
|
||||
.filter(
|
||||
"relClass='" + ModelConstants.IS_AUTHOR_INSTITUTION_OF + "'")
|
||||
.collectAsList()
|
||||
|
|
Loading…
Reference in New Issue