[AffRo] refactoring

This commit is contained in:
Miriam Baglioni 2024-09-25 17:12:51 +02:00
parent a54d021c37
commit 7cd8171268
3 changed files with 25 additions and 18 deletions

View File

@ -143,11 +143,11 @@ public class PrepareAffiliationRelations implements Serializable {
Dataset<Row> df = spark Dataset<Row> df = spark
.read() .read()
.schema("`DOI` STRING, `Organizations` ARRAY<STRUCT<`PID`:STRING, `Value`:STRING,`Confidence`:DOUBLE, `Status`:STRING>>") .schema(
"`DOI` STRING, `Organizations` ARRAY<STRUCT<`PID`:STRING, `Value`:STRING,`Confidence`:DOUBLE, `Status`:STRING>>")
.json(inputPath) .json(inputPath)
.where("DOI is not null"); .where("DOI is not null");
return getTextTextJavaPairRDD(collectedfrom, df.selectExpr("DOI", "Organizations as Matchings")); return getTextTextJavaPairRDD(collectedfrom, df.selectExpr("DOI", "Organizations as Matchings"));
} }
@ -158,11 +158,11 @@ public class PrepareAffiliationRelations implements Serializable {
// load and parse affiliation relations from HDFS // load and parse affiliation relations from HDFS
Dataset<Row> df = spark Dataset<Row> df = spark
.read() .read()
.schema("`DOI` STRING, `Matchings` ARRAY<STRUCT<`PID`:STRING, `Value`:STRING,`Confidence`:DOUBLE, `Status`:STRING>>") .schema(
"`DOI` STRING, `Matchings` ARRAY<STRUCT<`PID`:STRING, `Value`:STRING,`Confidence`:DOUBLE, `Status`:STRING>>")
.json(inputPath) .json(inputPath)
.where("DOI is not null"); .where("DOI is not null");
return getTextTextJavaPairRDD(collectedfrom, df); return getTextTextJavaPairRDD(collectedfrom, df);
} }
@ -178,7 +178,6 @@ public class PrepareAffiliationRelations implements Serializable {
new Column("matching.Status").as("status")) new Column("matching.Status").as("status"))
.where("status = 'active'"); .where("status = 'active'");
// prepare action sets for affiliation relations // prepare action sets for affiliation relations
return df return df
.toJavaRDD() .toJavaRDD()
@ -188,14 +187,13 @@ public class PrepareAffiliationRelations implements Serializable {
final String paperId = ID_PREFIX final String paperId = ID_PREFIX
+ IdentifierFactory.md5(CleaningFunctions.normalizePidValue("doi", row.getAs("doi"))); + IdentifierFactory.md5(CleaningFunctions.normalizePidValue("doi", row.getAs("doi")));
// Organization to OpenAIRE identifier // Organization to OpenAIRE identifier
String affId = null; String affId = null;
if(row.getAs("pidtype").equals("ROR")) if (row.getAs("pidtype").equals("ROR"))
//ROR id to OpenIARE id // ROR id to OpenIARE id
affId = GenerateRorActionSetJob.calculateOpenaireId(row.getAs("pidvalue")); affId = GenerateRorActionSetJob.calculateOpenaireId(row.getAs("pidvalue"));
else else
//getting the OpenOrgs identifier for the organization // getting the OpenOrgs identifier for the organization
affId = row.getAs("pidvalue"); affId = row.getAs("pidvalue");
Qualifier qualifier = OafMapperUtils Qualifier qualifier = OafMapperUtils

View File

@ -517,8 +517,10 @@ case object Crossref2Oaf {
) )
} }
if(doi.startsWith("10.3410") || doi.startsWith("10.12703")) if (doi.startsWith("10.3410") || doi.startsWith("10.12703"))
instance.setHostedby(OafMapperUtils.keyValue(OafMapperUtils.createOpenaireId(10, "openaire____::H1Connect", true),"H1Connect")) instance.setHostedby(
OafMapperUtils.keyValue(OafMapperUtils.createOpenaireId(10, "openaire____::H1Connect", true), "H1Connect")
)
instance.setAccessright( instance.setAccessright(
decideAccessRight(instance.getLicense, result.getDateofacceptance.getValue) decideAccessRight(instance.getLicense, result.getDateofacceptance.getValue)

View File

@ -110,12 +110,11 @@ public class PrepareAffiliationRelationsTest {
// ); // );
// } // }
// count the number of relations // count the number of relations
assertEquals(168, tmp.count());//150 + assertEquals(168, tmp.count());// 150 +
Dataset<Relation> dataset = spark.createDataset(tmp.rdd(), Encoders.bean(Relation.class)); Dataset<Relation> dataset = spark.createDataset(tmp.rdd(), Encoders.bean(Relation.class));
dataset.createOrReplaceTempView("result"); dataset.createOrReplaceTempView("result");
Dataset<Row> execVerification = spark Dataset<Row> execVerification = spark
.sql("select r.relType, r.relClass, r.source, r.target, r.dataInfo.trust from result r"); .sql("select r.relType, r.relClass, r.source, r.target, r.dataInfo.trust from result r");
@ -159,7 +158,15 @@ public class PrepareAffiliationRelationsTest {
.assertEquals( .assertEquals(
5, execVerification.filter("source = '" + publisherid + "' and target = '" + rorId + "'").count()); 5, execVerification.filter("source = '" + publisherid + "' and target = '" + rorId + "'").count());
Assertions.assertEquals(1,execVerification.filter("source = '" + ID_PREFIX Assertions
+ IdentifierFactory.md5(CleaningFunctions.normalizePidValue("doi", "10.1007/s00217-010-1268-9")) + "' and target = '" + "20|ror_________::" + IdentifierFactory.md5("https://ror.org/03265fv13") + "'").count()); .assertEquals(
1, execVerification
.filter(
"source = '" + ID_PREFIX
+ IdentifierFactory
.md5(CleaningFunctions.normalizePidValue("doi", "10.1007/s00217-010-1268-9"))
+ "' and target = '" + "20|ror_________::"
+ IdentifierFactory.md5("https://ror.org/03265fv13") + "'")
.count());
} }
} }