WIP: dump of the OpenAIRE graph - Changes #103

Closed
miriam.baglioni wants to merge 77 commits from miriam.baglioni/dnet-hadoop:dump into master
7 changed files with 106 additions and 17 deletions
Showing only changes of commit f7714645d2 - Show all commits

View File

@ -4,7 +4,7 @@ package eu.dnetlib.dhp.schema.dump.oaf;
import java.io.Serializable;
/**
* Used to refer to the Article Processing Charge information. Not dumped in this release. It contains two parameters: -
* Used to refer to the Article Processing Charge information. It contains two parameters: -
* currency of type String to store the currency of the APC - amount of type String to stores the charged amount
*/
public class APC implements Serializable {

View File

@ -12,9 +12,11 @@ import java.util.List;
* type of type String to store the type of the instance as defined in the corresponding dnet vocabulary
* (dnet:pubication_resource). It corresponds to the instancetype.classname of the instance to be mapped - url of type
* List<String> list of locations where the instance is accessible. It corresponds to url of the instance to be dumped -
* publicationdate of type String to store the publication date of the instance ;// dateofacceptance; - refereed of type
* String to store information abour tthe review status of the instance. Possible values are 'Unknown',
* publicationdate of type String to store the publication date of the instance ;// dateofacceptance;
* - refereed of type
* String to store information abour the review status of the instance. Possible values are 'Unknown',
* 'nonPeerReviewed', 'peerReviewed'. It corresponds to refereed.classname of the instance to be dumped
* - articleprocessingcharge of type APC to store the article processing charges possibly associated to the instance
*/
public class Instance implements Serializable {
@ -28,6 +30,8 @@ public class Instance implements Serializable {
private String publicationdate;// dateofacceptance;
private APC articleprocessingcharge;
private String refereed; // peer-review status
public String getLicense() {
@ -78,4 +82,11 @@ public class Instance implements Serializable {
this.refereed = refereed;
}
public APC getArticleprocessingcharge() {
return articleprocessingcharge;
}
public void setArticleprocessingcharge(APC articleprocessingcharge) {
this.articleprocessingcharge = articleprocessingcharge;
}
}

View File

@ -24,8 +24,6 @@ public class Constants {
public static String RESEARCH_INFRASTRUCTURE = "Research Infrastructure/Initiative";
public static String ORCID = "orcid";
static {
accessRightsCoarMap.put("OPEN", "c_abf2");
accessRightsCoarMap.put("RESTRICTED", "c_16ec");

View File

@ -424,6 +424,14 @@ public class ResultMapper implements Serializable {
.ifPresent(value -> instance.setType(value.getClassname()));
Optional.ofNullable(i.getUrl()).ifPresent(value -> instance.setUrl(value));
Optional<Field<String>> oPca = Optional.ofNullable(i.getProcessingchargeamount());
Optional<Field<String>> oPcc = Optional.ofNullable(i.getProcessingchargecurrency());
if (oPca.isPresent() && oPcc.isPresent()) {
APC apc = new APC();
apc.setCurrency(oPcc.get().getValue());
apc.setAmount(oPca.get().getValue());
instance.setArticleprocessingcharge(apc);
}
}
private static List<Provenance> getUniqueProvenance(List<Provenance> provenance) {
@ -503,7 +511,7 @@ public class ResultMapper implements Serializable {
private static Pid getOrcid(List<StructuredProperty> p) {
for (StructuredProperty pid : p) {
if (pid.getQualifier().getClassid().equals(Constants.ORCID)) {
if (pid.getQualifier().getClassid().equals(ModelConstants.ORCID)) {
Optional<DataInfo> di = Optional.ofNullable(pid.getDataInfo());
if (di.isPresent()) {
return Pid

View File

@ -57,16 +57,12 @@ public class CommunitySplit implements Serializable {
Dataset<CommunityResult> community_products = result
.filter((FilterFunction<CommunityResult>) r -> containsCommunity(r, c));
try {
community_products.first();
community_products
.write()
.option("compression", "gzip")
.mode(SaveMode.Overwrite)
.json(outputPath + "/" + c);
} catch (Exception e) {
}
community_products
.write()
.option("compression", "gzip")
.mode(SaveMode.Overwrite)
.json(outputPath + "/" + c);
}
@ -75,9 +71,9 @@ public class CommunitySplit implements Serializable {
return r
.getContext()
.stream()
.filter(con -> con.getCode().equals(c))
.map(con -> con.getCode())
.collect(Collectors.toList())
.size() > 0;
.contains(c);
}
return false;
}

View File

@ -13,6 +13,7 @@ import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.ForeachFunction;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.junit.jupiter.api.*;
import org.slf4j.Logger;
@ -408,4 +409,54 @@ public class DumpJobTest {
}
@Test
public void testArticlePCA() {
final String sourcePath = getClass()
.getResource("/eu/dnetlib/dhp/oa/graph/dump/resultDump/publication_pca")
.getPath();
final String communityMapPath = getClass()
.getResource("/eu/dnetlib/dhp/oa/graph/dump/communityMapPath/communitymap.json")
.getPath();
DumpProducts dump = new DumpProducts();
dump
.run(
// false, sourcePath, workingDir.toString() + "/result", communityMapPath, Publication.class,
false, sourcePath, workingDir.toString() + "/result", communityMapPath, Publication.class,
GraphResult.class, Constants.DUMPTYPE.COMPLETE.getType());
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
JavaRDD<GraphResult> tmp = sc
.textFile(workingDir.toString() + "/result")
.map(item -> OBJECT_MAPPER.readValue(item, GraphResult.class));
org.apache.spark.sql.Dataset<GraphResult> verificationDataset = spark
.createDataset(tmp.rdd(), Encoders.bean(GraphResult.class));
Assertions.assertEquals(23, verificationDataset.count());
// verificationDataset.show(false);
Assertions.assertEquals(23, verificationDataset.filter("type = 'publication'").count());
verificationDataset.createOrReplaceTempView("check");
org.apache.spark.sql.Dataset<Row> temp = spark
.sql(
"select id " +
"from check " +
"lateral view explode (instance) i as inst " +
"where inst.articleprocessingcharge is not null");
Assertions.assertTrue(temp.count() == 2);
Assertions.assertTrue(temp.filter("id = '50|datacite____::05c611fdfc93d7a2a703d1324e28104a'").count() == 1);
Assertions.assertTrue(temp.filter("id = '50|dedup_wf_001::01e6a28565ca01376b7548e530c6f6e8'").count() == 1);
//TODO verify value and name of the fields for vocab related value (i.e. accessright, bestaccessright)
}
}

File diff suppressed because one or more lines are too long