forked from D-Net/dnet-hadoop
Compare commits
26 Commits
master
...
dumpProjec
Author | SHA1 | Date |
---|---|---|
Miriam Baglioni | f7714645d2 | |
Miriam Baglioni | 4632795f25 | |
Miriam Baglioni | 870ee28dd6 | |
Miriam Baglioni | 08f8dd9454 | |
Miriam Baglioni | e5463fea01 | |
Miriam Baglioni | 16c1a27852 | |
Miriam Baglioni | d0c94462e4 | |
Miriam Baglioni | a896febc02 | |
Miriam Baglioni | 5dea729de3 | |
Miriam Baglioni | 200e7e9c46 | |
Miriam Baglioni | 931b2a2e15 | |
Miriam Baglioni | 330343937c | |
Miriam Baglioni | defbb71561 | |
Miriam Baglioni | 17049f8bde | |
Miriam Baglioni | cc11ee1cb9 | |
Miriam Baglioni | 871e5bea29 | |
Miriam Baglioni | 5d92df0627 | |
Miriam Baglioni | 9841086ef3 | |
Miriam Baglioni | d4ad740c98 | |
Miriam Baglioni | a684e1065e | |
Miriam Baglioni | f7c35e6311 | |
Miriam Baglioni | 9bdadd4ddb | |
Miriam Baglioni | 0d76e039cf | |
Miriam Baglioni | 7c86e66697 | |
Miriam Baglioni | bc09d37e8c | |
Miriam Baglioni | 815c7c11aa |
|
@ -4,7 +4,7 @@ package eu.dnetlib.dhp.schema.dump.oaf;
|
|||
import java.io.Serializable;
|
||||
|
||||
/**
|
||||
* Used to refer to the Article Processing Charge information. Not dumped in this release. It contains two parameters: -
|
||||
* Used to refer to the Article Processing Charge information. It contains two parameters: -
|
||||
* currency of type String to store the currency of the APC - amount of type String to stores the charged amount
|
||||
*/
|
||||
public class APC implements Serializable {
|
||||
|
|
|
@ -12,9 +12,11 @@ import java.util.List;
|
|||
* type of type String to store the type of the instance as defined in the corresponding dnet vocabulary
|
||||
* (dnet:pubication_resource). It corresponds to the instancetype.classname of the instance to be mapped - url of type
|
||||
* List<String> list of locations where the instance is accessible. It corresponds to url of the instance to be dumped -
|
||||
* publicationdate of type String to store the publication date of the instance ;// dateofacceptance; - refereed of type
|
||||
* String to store information abour tthe review status of the instance. Possible values are 'Unknown',
|
||||
* publicationdate of type String to store the publication date of the instance ;// dateofacceptance;
|
||||
* - refereed of type
|
||||
* String to store information abour the review status of the instance. Possible values are 'Unknown',
|
||||
* 'nonPeerReviewed', 'peerReviewed'. It corresponds to refereed.classname of the instance to be dumped
|
||||
* - articleprocessingcharge of type APC to store the article processing charges possibly associated to the instance
|
||||
*/
|
||||
public class Instance implements Serializable {
|
||||
|
||||
|
@ -28,6 +30,8 @@ public class Instance implements Serializable {
|
|||
|
||||
private String publicationdate;// dateofacceptance;
|
||||
|
||||
private APC articleprocessingcharge;
|
||||
|
||||
private String refereed; // peer-review status
|
||||
|
||||
public String getLicense() {
|
||||
|
@ -78,4 +82,11 @@ public class Instance implements Serializable {
|
|||
this.refereed = refereed;
|
||||
}
|
||||
|
||||
public APC getArticleprocessingcharge() {
|
||||
return articleprocessingcharge;
|
||||
}
|
||||
|
||||
public void setArticleprocessingcharge(APC articleprocessingcharge) {
|
||||
this.articleprocessingcharge = articleprocessingcharge;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -24,8 +24,6 @@ public class Constants {
|
|||
|
||||
public static String RESEARCH_INFRASTRUCTURE = "Research Infrastructure/Initiative";
|
||||
|
||||
public static String ORCID = "orcid";
|
||||
|
||||
static {
|
||||
accessRightsCoarMap.put("OPEN", "c_abf2");
|
||||
accessRightsCoarMap.put("RESTRICTED", "c_16ec");
|
||||
|
|
|
@ -424,6 +424,14 @@ public class ResultMapper implements Serializable {
|
|||
.ifPresent(value -> instance.setType(value.getClassname()));
|
||||
Optional.ofNullable(i.getUrl()).ifPresent(value -> instance.setUrl(value));
|
||||
|
||||
Optional<Field<String>> oPca = Optional.ofNullable(i.getProcessingchargeamount());
|
||||
Optional<Field<String>> oPcc = Optional.ofNullable(i.getProcessingchargecurrency());
|
||||
if (oPca.isPresent() && oPcc.isPresent()) {
|
||||
APC apc = new APC();
|
||||
apc.setCurrency(oPcc.get().getValue());
|
||||
apc.setAmount(oPca.get().getValue());
|
||||
instance.setArticleprocessingcharge(apc);
|
||||
}
|
||||
}
|
||||
|
||||
private static List<Provenance> getUniqueProvenance(List<Provenance> provenance) {
|
||||
|
@ -503,7 +511,7 @@ public class ResultMapper implements Serializable {
|
|||
|
||||
private static Pid getOrcid(List<StructuredProperty> p) {
|
||||
for (StructuredProperty pid : p) {
|
||||
if (pid.getQualifier().getClassid().equals(Constants.ORCID)) {
|
||||
if (pid.getQualifier().getClassid().equals(ModelConstants.ORCID)) {
|
||||
Optional<DataInfo> di = Optional.ofNullable(pid.getDataInfo());
|
||||
if (di.isPresent()) {
|
||||
return Pid
|
||||
|
|
|
@ -57,16 +57,12 @@ public class CommunitySplit implements Serializable {
|
|||
Dataset<CommunityResult> community_products = result
|
||||
.filter((FilterFunction<CommunityResult>) r -> containsCommunity(r, c));
|
||||
|
||||
try {
|
||||
community_products.first();
|
||||
community_products
|
||||
.write()
|
||||
.option("compression", "gzip")
|
||||
.mode(SaveMode.Overwrite)
|
||||
.json(outputPath + "/" + c);
|
||||
} catch (Exception e) {
|
||||
|
||||
}
|
||||
community_products
|
||||
.write()
|
||||
.option("compression", "gzip")
|
||||
.mode(SaveMode.Overwrite)
|
||||
.json(outputPath + "/" + c);
|
||||
|
||||
}
|
||||
|
||||
|
@ -75,9 +71,9 @@ public class CommunitySplit implements Serializable {
|
|||
return r
|
||||
.getContext()
|
||||
.stream()
|
||||
.filter(con -> con.getCode().equals(c))
|
||||
.map(con -> con.getCode())
|
||||
.collect(Collectors.toList())
|
||||
.size() > 0;
|
||||
.contains(c);
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
|
|
@ -102,19 +102,26 @@ public class SparkDumpFunderResults implements Serializable {
|
|||
} else {
|
||||
funderdump = fundernsp.substring(0, fundernsp.indexOf("_")).toUpperCase();
|
||||
}
|
||||
writeFunderResult(funder, result, outputPath + "/" + funderdump);
|
||||
writeFunderResult(funder, result, outputPath, funderdump);
|
||||
});
|
||||
|
||||
}
|
||||
|
||||
private static void writeFunderResult(String funder, Dataset<CommunityResult> results, String outputPath) {
|
||||
private static void dumpResults(String nsp, Dataset<CommunityResult> results, String outputPath,
|
||||
String funderName) {
|
||||
|
||||
results.map((MapFunction<CommunityResult, CommunityResult>) r -> {
|
||||
if (!Optional.ofNullable(r.getProjects()).isPresent()) {
|
||||
return null;
|
||||
}
|
||||
for (Project p : r.getProjects()) {
|
||||
if (p.getId().startsWith(funder)) {
|
||||
if (p.getId().startsWith(nsp)) {
|
||||
if (nsp.startsWith("40|irb")) {
|
||||
if (p.getFunder().getShortName().equals(funderName))
|
||||
return r;
|
||||
else
|
||||
return null;
|
||||
}
|
||||
return r;
|
||||
}
|
||||
}
|
||||
|
@ -124,7 +131,18 @@ public class SparkDumpFunderResults implements Serializable {
|
|||
.write()
|
||||
.mode(SaveMode.Overwrite)
|
||||
.option("compression", "gzip")
|
||||
.json(outputPath);
|
||||
.json(outputPath + "/" + funderName);
|
||||
}
|
||||
|
||||
private static void writeFunderResult(String funder, Dataset<CommunityResult> results, String outputPath,
|
||||
String funderDump) {
|
||||
|
||||
if (funder.startsWith("40|irb")) {
|
||||
dumpResults(funder, results, outputPath, "HRZZ");
|
||||
dumpResults(funder, results, outputPath, "MZOS");
|
||||
} else
|
||||
dumpResults(funder, results, outputPath, funderDump);
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -1,9 +1,8 @@
|
|||
<workflow-app name="dump_funder_results" xmlns="uri:oozie:workflow:0.5">
|
||||
|
||||
<parameters>
|
||||
<property>
|
||||
<name>upload</name>
|
||||
<value>false</value>
|
||||
<value>true</value>
|
||||
<description>true to upload the dump for the funders in Zenodo</description>
|
||||
</property>
|
||||
<property>
|
||||
|
|
|
@ -13,6 +13,7 @@ import org.apache.spark.api.java.JavaRDD;
|
|||
import org.apache.spark.api.java.JavaSparkContext;
|
||||
import org.apache.spark.api.java.function.ForeachFunction;
|
||||
import org.apache.spark.sql.Encoders;
|
||||
import org.apache.spark.sql.Row;
|
||||
import org.apache.spark.sql.SparkSession;
|
||||
import org.junit.jupiter.api.*;
|
||||
import org.slf4j.Logger;
|
||||
|
@ -408,4 +409,54 @@ public class DumpJobTest {
|
|||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testArticlePCA() {
|
||||
final String sourcePath = getClass()
|
||||
.getResource("/eu/dnetlib/dhp/oa/graph/dump/resultDump/publication_pca")
|
||||
.getPath();
|
||||
|
||||
final String communityMapPath = getClass()
|
||||
.getResource("/eu/dnetlib/dhp/oa/graph/dump/communityMapPath/communitymap.json")
|
||||
.getPath();
|
||||
|
||||
DumpProducts dump = new DumpProducts();
|
||||
dump
|
||||
.run(
|
||||
// false, sourcePath, workingDir.toString() + "/result", communityMapPath, Publication.class,
|
||||
false, sourcePath, workingDir.toString() + "/result", communityMapPath, Publication.class,
|
||||
GraphResult.class, Constants.DUMPTYPE.COMPLETE.getType());
|
||||
|
||||
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
|
||||
|
||||
JavaRDD<GraphResult> tmp = sc
|
||||
.textFile(workingDir.toString() + "/result")
|
||||
.map(item -> OBJECT_MAPPER.readValue(item, GraphResult.class));
|
||||
|
||||
org.apache.spark.sql.Dataset<GraphResult> verificationDataset = spark
|
||||
.createDataset(tmp.rdd(), Encoders.bean(GraphResult.class));
|
||||
|
||||
Assertions.assertEquals(23, verificationDataset.count());
|
||||
// verificationDataset.show(false);
|
||||
|
||||
Assertions.assertEquals(23, verificationDataset.filter("type = 'publication'").count());
|
||||
|
||||
verificationDataset.createOrReplaceTempView("check");
|
||||
|
||||
org.apache.spark.sql.Dataset<Row> temp = spark
|
||||
.sql(
|
||||
"select id " +
|
||||
"from check " +
|
||||
"lateral view explode (instance) i as inst " +
|
||||
"where inst.articleprocessingcharge is not null");
|
||||
|
||||
Assertions.assertTrue(temp.count() == 2);
|
||||
|
||||
Assertions.assertTrue(temp.filter("id = '50|datacite____::05c611fdfc93d7a2a703d1324e28104a'").count() == 1);
|
||||
|
||||
Assertions.assertTrue(temp.filter("id = '50|dedup_wf_001::01e6a28565ca01376b7548e530c6f6e8'").count() == 1);
|
||||
|
||||
|
||||
//TODO verify value and name of the fields for vocab related value (i.e. accessright, bestaccessright)
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -137,5 +137,10 @@ public class SplitPerFunderTest {
|
|||
.map(item -> OBJECT_MAPPER.readValue(item, CommunityResult.class));
|
||||
Assertions.assertEquals(3, tmp.count());
|
||||
|
||||
// H2020 3
|
||||
tmp = sc
|
||||
.textFile(workingDir.toString() + "/split/MZOS")
|
||||
.map(item -> OBJECT_MAPPER.readValue(item, CommunityResult.class));
|
||||
Assertions.assertEquals(1, tmp.count());
|
||||
}
|
||||
}
|
||||
|
|
File diff suppressed because one or more lines are too long
|
@ -5,4 +5,5 @@
|
|||
{"collectedfrom":[{"key":"10|driver______::bee53aa31dc2cbb538c10c2b65fa5824","value":"DOAJ-Articles"}],"dataInfo":{"deletedbyinference":false,"inferred":false,"invisible":false,"provenanceaction":{"classid":"sysimport:crosswalk:entityregistry","classname":"Harvested","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"},"trust":"0.9"},"lastupdatetimestamp":1603715146539,"properties":[],"relClass":"isProducedBy","relType":"datasourceOrganization","source":"10|doajarticles::8b75543067b50076e70764917e188178","subRelType":"provision","target":"40|snsf________::50cb15ff7a6a3f8531f063770179e346"}
|
||||
{"collectedfrom":[{"key":"10|driver______::bee53aa31dc2cbb538c10c2b65fa5824","value":"DOAJ-Articles"}],"dataInfo":{"deletedbyinference":false,"inferred":false,"invisible":false,"provenanceaction":{"classid":"sysimport:crosswalk:entityregistry","classname":"Harvested","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"},"trust":"0.9"},"lastupdatetimestamp":1603715146539,"properties":[],"relClass":"isProducedBy","relType":"datasourceOrganization","source":"10|doajarticles::9f3ff882f023209d9ffb4dc32b77d376","subRelType":"provision","target":"40|corda_______::ffc1811633b3222e4764c7b0517f83e8"}
|
||||
{"collectedfrom":[{"key":"10|driver______::bee53aa31dc2cbb538c10c2b65fa5824","value":"DOAJ-Articles"}],"dataInfo":{"deletedbyinference":false,"inferred":false,"invisible":false,"provenanceaction":{"classid":"sysimport:crosswalk:entityregistry","classname":"Harvested","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"},"trust":"0.9"},"lastupdatetimestamp":1603715146539,"properties":[],"relClass":"isProducedBy","relType":"datasourceOrganization","source":"10|doajarticles::b566fa319c3923454e1e8eb886ab62d2","subRelType":"provision","target":"40|nhmrc_______::4e6c928fef9851b37ec73f4f6daca35b"}
|
||||
{"collectedfrom":[{"key":"10|driver______::bee53aa31dc2cbb538c10c2b65fa5824","value":"DOAJ-Articles"}],"dataInfo":{"deletedbyinference":false,"inferred":false,"invisible":false,"provenanceaction":{"classid":"sysimport:crosswalk:entityregistry","classname":"Harvested","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"},"trust":"0.9"},"lastupdatetimestamp":1603715146539,"properties":[],"relClass":"isProducedBy","relType":"datasourceOrganization","source":"10|doajarticles::e0554fb004a155bc23cfb43ee9fc8eae","subRelType":"provision","target":"40|corda__h2020::846b777af165fef7c904a81712a83b66"}
|
||||
{"collectedfrom":[{"key":"10|driver______::bee53aa31dc2cbb538c10c2b65fa5824","value":"DOAJ-Articles"}],"dataInfo":{"deletedbyinference":false,"inferred":false,"invisible":false,"provenanceaction":{"classid":"sysimport:crosswalk:entityregistry","classname":"Harvested","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"},"trust":"0.9"},"lastupdatetimestamp":1603715146539,"properties":[],"relClass":"isProducedBy","relType":"datasourceOrganization","source":"10|doajarticles::e0554fb004a155bc23cfb43ee9fc8eae","subRelType":"provision","target":"40|corda__h2020::846b777af165fef7c904a81712a83b66"}
|
||||
{"collectedfrom":[{"key":"10|driver______::bee53aa31dc2cbb538c10c2b65fa5824","value":"DOAJ-Articles"}],"dataInfo":{"deletedbyinference":false,"inferred":false,"invisible":false,"provenanceaction":{"classid":"sysimport:crosswalk:entityregistry","classname":"Harvested","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"},"trust":"0.9"},"lastupdatetimestamp":1603715146539,"properties":[],"relClass":"isProducedBy","relType":"datasourceOrganization","source":"10|doajarticles::2baa9032dc058d3c8ff780c426b0c19f","subRelType":"provision","target":"40|irb_hr______::1e5e62235d094afd01cd56e65112fc63"}
|
File diff suppressed because one or more lines are too long
Loading…
Reference in New Issue