forked from antonis.lempesis/dnet-hadoop
added APC in the dump and test method
This commit is contained in:
parent
65a242646d
commit
6410ab71d8
|
@ -423,6 +423,14 @@ public class ResultMapper implements Serializable {
|
|||
.ofNullable(i.getInstancetype())
|
||||
.ifPresent(value -> instance.setType(value.getClassname()));
|
||||
Optional.ofNullable(i.getUrl()).ifPresent(value -> instance.setUrl(value));
|
||||
Optional<Field<String>> oPca = Optional.ofNullable(i.getProcessingchargeamount());
|
||||
Optional<Field<String>> oPcc = Optional.ofNullable(i.getProcessingchargecurrency());
|
||||
if (oPca.isPresent() && oPcc.isPresent()) {
|
||||
APC apc = new APC();
|
||||
apc.setCurrency(oPcc.get().getValue());
|
||||
apc.setAmount(oPca.get().getValue());
|
||||
instance.setArticleprocessingcharge(apc);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
|
|
|
@ -13,6 +13,7 @@ import org.apache.spark.api.java.JavaRDD;
|
|||
import org.apache.spark.api.java.JavaSparkContext;
|
||||
import org.apache.spark.api.java.function.ForeachFunction;
|
||||
import org.apache.spark.sql.Encoders;
|
||||
import org.apache.spark.sql.Row;
|
||||
import org.apache.spark.sql.SparkSession;
|
||||
import org.junit.jupiter.api.*;
|
||||
import org.slf4j.Logger;
|
||||
|
@ -408,4 +409,57 @@ public class DumpJobTest {
|
|||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testArticlePCA() {
|
||||
final String sourcePath = getClass()
|
||||
.getResource("/eu/dnetlib/dhp/oa/graph/dump/resultDump/publication_pca")
|
||||
.getPath();
|
||||
|
||||
final String communityMapPath = getClass()
|
||||
.getResource("/eu/dnetlib/dhp/oa/graph/dump/communityMapPath/communitymap.json")
|
||||
.getPath();
|
||||
|
||||
DumpProducts dump = new DumpProducts();
|
||||
dump
|
||||
.run(
|
||||
// false, sourcePath, workingDir.toString() + "/result", communityMapPath, Publication.class,
|
||||
false, sourcePath, workingDir.toString() + "/result", communityMapPath, Publication.class,
|
||||
GraphResult.class, Constants.DUMPTYPE.COMPLETE.getType());
|
||||
|
||||
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
|
||||
|
||||
JavaRDD<GraphResult> tmp = sc
|
||||
.textFile(workingDir.toString() + "/result")
|
||||
.map(item -> OBJECT_MAPPER.readValue(item, GraphResult.class));
|
||||
|
||||
org.apache.spark.sql.Dataset<GraphResult> verificationDataset = spark
|
||||
.createDataset(tmp.rdd(), Encoders.bean(GraphResult.class));
|
||||
|
||||
Assertions.assertEquals(23, verificationDataset.count());
|
||||
//verificationDataset.show(false);
|
||||
|
||||
Assertions.assertEquals(23, verificationDataset.filter("type = 'publication'").count());
|
||||
|
||||
verificationDataset.createOrReplaceTempView("check");
|
||||
|
||||
org.apache.spark.sql.Dataset<Row> temp = spark.sql("select id " +
|
||||
"from check " +
|
||||
"lateral view explode (instance) i as inst " +
|
||||
"where inst.articleprocessingcharge is not null");
|
||||
|
||||
Assertions.assertTrue(temp.count() == 2);
|
||||
|
||||
Assertions.assertTrue(temp.filter("id = '50|datacite____::05c611fdfc93d7a2a703d1324e28104a'").count() == 1);
|
||||
|
||||
Assertions.assertTrue(temp.filter("id = '50|dedup_wf_001::01e6a28565ca01376b7548e530c6f6e8'").count() == 1);
|
||||
|
||||
|
||||
|
||||
// verificationDataset.filter("bestAccessright.code = 'c_abf2'").count() == verificationDataset
|
||||
// .filter("bestAccessright.code = 'c_abf2' and bestAccessright.label = 'OPEN'")
|
||||
// .count()
|
||||
|
||||
//TODO verify value and name of the fields for vocab related value (i.e. accessright, bestaccessright)
|
||||
}
|
||||
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue