minor changes and refactoring

This commit is contained in:
Miriam Baglioni 2021-07-13 17:10:02 +02:00
parent 59615da65e
commit 618d2de2da
8 changed files with 176 additions and 175 deletions

View File

@ -428,7 +428,7 @@ public class ResultMapper implements Serializable {
if (oPca.isPresent() && oPcc.isPresent()) { if (oPca.isPresent() && oPcc.isPresent()) {
Field<String> pca = oPca.get(); Field<String> pca = oPca.get();
Field<String> pcc = oPcc.get(); Field<String> pcc = oPcc.get();
if(!pca.getValue().trim().equals("") && !pcc.getValue().trim().equals("")){ if (!pca.getValue().trim().equals("") && !pcc.getValue().trim().equals("")) {
APC apc = new APC(); APC apc = new APC();
apc.setCurrency(oPcc.get().getValue()); apc.setCurrency(oPcc.get().getValue());
apc.setAmount(oPca.get().getValue()); apc.setAmount(oPca.get().getValue());

View File

@ -70,10 +70,10 @@ public class CreateContextRelation implements Serializable {
cce.execute(Process::getRelation, CONTEX_RELATION_DATASOURCE, ModelSupport.getIdPrefix(Datasource.class)); cce.execute(Process::getRelation, CONTEX_RELATION_DATASOURCE, ModelSupport.getIdPrefix(Datasource.class));
log.info("Creating relations for projects... "); log.info("Creating relations for projects... ");
// cce cce
// .execute( .execute(
// Process::getRelation, CONTEX_RELATION_PROJECT, Process::getRelation, CONTEX_RELATION_PROJECT,
// ModelSupport.getIdPrefix(eu.dnetlib.dhp.schema.oaf.Project.class)); ModelSupport.getIdPrefix(eu.dnetlib.dhp.schema.oaf.Project.class));
cce.close(); cce.close();

View File

@ -453,7 +453,7 @@ public class DumpGraphEntities implements Serializable {
.map( .map(
(MapFunction<E, Organization>) o -> mapOrganization((eu.dnetlib.dhp.schema.oaf.Organization) o), (MapFunction<E, Organization>) o -> mapOrganization((eu.dnetlib.dhp.schema.oaf.Organization) o),
Encoders.bean(Organization.class)) Encoders.bean(Organization.class))
.filter(Objects::nonNull) .filter(Objects::nonNull)
.write() .write()
.mode(SaveMode.Overwrite) .mode(SaveMode.Overwrite)
.option("compression", "gzip") .option("compression", "gzip")
@ -461,7 +461,7 @@ public class DumpGraphEntities implements Serializable {
} }
private static Organization mapOrganization(eu.dnetlib.dhp.schema.oaf.Organization org) { private static Organization mapOrganization(eu.dnetlib.dhp.schema.oaf.Organization org) {
if(org.getDataInfo().getDeletedbyinference()) if (org.getDataInfo().getDeletedbyinference())
return null; return null;
Organization organization = new Organization(); Organization organization = new Organization();

View File

@ -129,7 +129,7 @@ public class Extractor implements Serializable {
return relationList.iterator(); return relationList.iterator();
}, Encoders.bean(Relation.class)) }, Encoders.bean(Relation.class))
.write() .write()
.option("compression", "gzip") .option("compression", "gzip")
.mode(SaveMode.Overwrite) .mode(SaveMode.Overwrite)
.json(outputPath); .json(outputPath);
@ -147,7 +147,7 @@ public class Extractor implements Serializable {
.map( .map(
paction -> Provenance paction -> Provenance
.newInstance( .newInstance(
paction.getClassid(), paction.getClassname(),
dinfo.getTrust())) dinfo.getTrust()))
.orElse( .orElse(
Provenance Provenance

View File

@ -1,3 +1,4 @@
package eu.dnetlib.dhp.oa.graph.dump.complete; package eu.dnetlib.dhp.oa.graph.dump.complete;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
@ -25,113 +26,112 @@ import eu.dnetlib.dhp.schema.oaf.*;
* with this view for both the source and the target * with this view for both the source and the target
*/ */
public class SparkSelectValidRelationsJob implements Serializable { public class SparkSelectValidRelationsJob implements Serializable {
private static final Logger log = LoggerFactory.getLogger(SparkSelectValidRelationsJob.class); private static final Logger log = LoggerFactory.getLogger(SparkSelectValidRelationsJob.class);
public static void main(String[] args) throws Exception { public static void main(String[] args) throws Exception {
String jsonConfiguration = IOUtils String jsonConfiguration = IOUtils
.toString( .toString(
SparkSelectValidRelationsJob.class SparkSelectValidRelationsJob.class
.getResourceAsStream( .getResourceAsStream(
"/eu/dnetlib/dhp/oa/graph/dump/complete/input_relationdump_parameters.json")); "/eu/dnetlib/dhp/oa/graph/dump/complete/input_relationdump_parameters.json"));
final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration); final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
parser.parseArgument(args); parser.parseArgument(args);
Boolean isSparkSessionManaged = Optional Boolean isSparkSessionManaged = Optional
.ofNullable(parser.get("isSparkSessionManaged")) .ofNullable(parser.get("isSparkSessionManaged"))
.map(Boolean::valueOf) .map(Boolean::valueOf)
.orElse(Boolean.TRUE); .orElse(Boolean.TRUE);
log.info("isSparkSessionManaged: {}", isSparkSessionManaged); log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
final String inputPath = parser.get("sourcePath"); final String inputPath = parser.get("sourcePath");
log.info("inputPath: {}", inputPath); log.info("inputPath: {}", inputPath);
final String outputPath = parser.get("outputPath"); final String outputPath = parser.get("outputPath");
log.info("outputPath: {}", outputPath); log.info("outputPath: {}", outputPath);
SparkConf conf = new SparkConf(); SparkConf conf = new SparkConf();
runWithSparkSession( runWithSparkSession(
conf, conf,
isSparkSessionManaged, isSparkSessionManaged,
spark -> { spark -> {
Utils.removeOutputDir(spark, outputPath); Utils.removeOutputDir(spark, outputPath);
selectValidRelation(spark, inputPath, outputPath); selectValidRelation(spark, inputPath, outputPath);
}); });
} }
private static void selectValidRelation(SparkSession spark, String inputPath, String outputPath) { private static void selectValidRelation(SparkSession spark, String inputPath, String outputPath) {
Dataset<Relation> relation = Utils.readPath(spark, inputPath + "/relation", Relation.class); Dataset<Relation> relation = Utils.readPath(spark, inputPath + "/relation", Relation.class);
Dataset<Publication> publication = Utils.readPath(spark, inputPath + "/publication", Publication.class); Dataset<Publication> publication = Utils.readPath(spark, inputPath + "/publication", Publication.class);
Dataset<eu.dnetlib.dhp.schema.oaf.Dataset> dataset = Utils Dataset<eu.dnetlib.dhp.schema.oaf.Dataset> dataset = Utils
.readPath(spark, inputPath + "/dataset", eu.dnetlib.dhp.schema.oaf.Dataset.class); .readPath(spark, inputPath + "/dataset", eu.dnetlib.dhp.schema.oaf.Dataset.class);
Dataset<Software> software = Utils.readPath(spark, inputPath + "/software", Software.class); Dataset<Software> software = Utils.readPath(spark, inputPath + "/software", Software.class);
Dataset<OtherResearchProduct> other = Utils Dataset<OtherResearchProduct> other = Utils
.readPath(spark, inputPath + "/otherresearchproduct", OtherResearchProduct.class); .readPath(spark, inputPath + "/otherresearchproduct", OtherResearchProduct.class);
Dataset<Organization> organization = Utils.readPath(spark, inputPath + "/organization", Organization.class); Dataset<Organization> organization = Utils.readPath(spark, inputPath + "/organization", Organization.class);
Dataset<Project> project = Utils.readPath(spark, inputPath + "/project", Project.class); Dataset<Project> project = Utils.readPath(spark, inputPath + "/project", Project.class);
Dataset<Datasource> datasource = Utils.readPath(spark, inputPath + "/datasource", Datasource.class); Dataset<Datasource> datasource = Utils.readPath(spark, inputPath + "/datasource", Datasource.class);
relation.createOrReplaceTempView("relation"); relation.createOrReplaceTempView("relation");
publication.createOrReplaceTempView("publication"); publication.createOrReplaceTempView("publication");
dataset.createOrReplaceTempView("dataset"); dataset.createOrReplaceTempView("dataset");
other.createOrReplaceTempView("other"); other.createOrReplaceTempView("other");
software.createOrReplaceTempView("software"); software.createOrReplaceTempView("software");
organization.createOrReplaceTempView("organization"); organization.createOrReplaceTempView("organization");
project.createOrReplaceTempView("project"); project.createOrReplaceTempView("project");
datasource.createOrReplaceTempView("datasource"); datasource.createOrReplaceTempView("datasource");
spark spark
.sql( .sql(
"SELECT id " + "SELECT id " +
"FROM publication " + "FROM publication " +
"WHERE datainfo.deletedbyinference = false AND datainfo.invisible = false " + "WHERE datainfo.deletedbyinference = false AND datainfo.invisible = false " +
"UNION ALL " + "UNION ALL " +
"SELECT id " + "SELECT id " +
"FROM dataset " + "FROM dataset " +
"WHERE datainfo.deletedbyinference = false AND datainfo.invisible = false " + "WHERE datainfo.deletedbyinference = false AND datainfo.invisible = false " +
"UNION ALL " + "UNION ALL " +
"SELECT id " + "SELECT id " +
"FROM other " + "FROM other " +
"WHERE datainfo.deletedbyinference = false AND datainfo.invisible = false " + "WHERE datainfo.deletedbyinference = false AND datainfo.invisible = false " +
"UNION ALL " + "UNION ALL " +
"SELECT id " + "SELECT id " +
"FROM software " + "FROM software " +
"WHERE datainfo.deletedbyinference = false AND datainfo.invisible = false " + "WHERE datainfo.deletedbyinference = false AND datainfo.invisible = false " +
"UNION ALL " + "UNION ALL " +
"SELECT id " + "SELECT id " +
"FROM organization " + "FROM organization " +
"WHERE datainfo.deletedbyinference = false AND datainfo.invisible = false " + "WHERE datainfo.deletedbyinference = false AND datainfo.invisible = false " +
"UNION ALL " + "UNION ALL " +
"SELECT id " + "SELECT id " +
"FROM project " + "FROM project " +
"WHERE datainfo.deletedbyinference = false AND datainfo.invisible = false " + "WHERE datainfo.deletedbyinference = false AND datainfo.invisible = false " +
"UNION ALL " + "UNION ALL " +
"SELECT id " + "SELECT id " +
"FROM datasource " + "FROM datasource " +
"WHERE datainfo.deletedbyinference = false AND datainfo.invisible = false " ) "WHERE datainfo.deletedbyinference = false AND datainfo.invisible = false ")
.createOrReplaceTempView("identifiers"); .createOrReplaceTempView("identifiers");
spark spark
.sql( .sql(
"SELECT relation.* " + "SELECT relation.* " +
"FROM relation " + "FROM relation " +
"JOIN identifiers i1 " + "JOIN identifiers i1 " +
"ON source = i1.id " + "ON source = i1.id " +
"JOIN identifiers i2 " + "JOIN identifiers i2 " +
"ON target = i2.id " + "ON target = i2.id " +
"WHERE datainfo.deletedbyinference = false") "WHERE datainfo.deletedbyinference = false")
.as(Encoders.bean(Relation.class)) .as(Encoders.bean(Relation.class))
.write() .write()
.option("compression", "gzip") .option("compression", "gzip")
.mode(SaveMode.Overwrite) .mode(SaveMode.Overwrite)
.json(outputPath); .json(outputPath);
; ;
} }
} }

View File

@ -412,40 +412,42 @@ public class DumpJobTest {
@Test @Test
public void testArticlePCA() { public void testArticlePCA() {
final String sourcePath = getClass() final String sourcePath = getClass()
.getResource("/eu/dnetlib/dhp/oa/graph/dump/resultDump/publication_pca") .getResource("/eu/dnetlib/dhp/oa/graph/dump/resultDump/publication_pca")
.getPath(); .getPath();
final String communityMapPath = getClass() final String communityMapPath = getClass()
.getResource("/eu/dnetlib/dhp/oa/graph/dump/communityMapPath/communitymap.json") .getResource("/eu/dnetlib/dhp/oa/graph/dump/communityMapPath/communitymap.json")
.getPath(); .getPath();
DumpProducts dump = new DumpProducts(); DumpProducts dump = new DumpProducts();
dump dump
.run( .run(
// false, sourcePath, workingDir.toString() + "/result", communityMapPath, Publication.class, // false, sourcePath, workingDir.toString() + "/result", communityMapPath, Publication.class,
false, sourcePath, workingDir.toString() + "/result", communityMapPath, Publication.class, false, sourcePath, workingDir.toString() + "/result", communityMapPath, Publication.class,
GraphResult.class, Constants.DUMPTYPE.COMPLETE.getType()); GraphResult.class, Constants.DUMPTYPE.COMPLETE.getType());
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
JavaRDD<GraphResult> tmp = sc JavaRDD<GraphResult> tmp = sc
.textFile(workingDir.toString() + "/result") .textFile(workingDir.toString() + "/result")
.map(item -> OBJECT_MAPPER.readValue(item, GraphResult.class)); .map(item -> OBJECT_MAPPER.readValue(item, GraphResult.class));
org.apache.spark.sql.Dataset<GraphResult> verificationDataset = spark org.apache.spark.sql.Dataset<GraphResult> verificationDataset = spark
.createDataset(tmp.rdd(), Encoders.bean(GraphResult.class)); .createDataset(tmp.rdd(), Encoders.bean(GraphResult.class));
Assertions.assertEquals(23, verificationDataset.count()); Assertions.assertEquals(23, verificationDataset.count());
//verificationDataset.show(false); // verificationDataset.show(false);
Assertions.assertEquals(23, verificationDataset.filter("type = 'publication'").count()); Assertions.assertEquals(23, verificationDataset.filter("type = 'publication'").count());
verificationDataset.createOrReplaceTempView("check"); verificationDataset.createOrReplaceTempView("check");
org.apache.spark.sql.Dataset<Row> temp = spark.sql("select id " + org.apache.spark.sql.Dataset<Row> temp = spark
"from check " + .sql(
"lateral view explode (instance) i as inst " + "select id " +
"where inst.articleprocessingcharge is not null"); "from check " +
"lateral view explode (instance) i as inst " +
"where inst.articleprocessingcharge is not null");
Assertions.assertTrue(temp.count() == 2); Assertions.assertTrue(temp.count() == 2);
@ -453,8 +455,6 @@ public class DumpJobTest {
Assertions.assertTrue(temp.filter("id = '50|dedup_wf_001::01e6a28565ca01376b7548e530c6f6e8'").count() == 1); Assertions.assertTrue(temp.filter("id = '50|dedup_wf_001::01e6a28565ca01376b7548e530c6f6e8'").count() == 1);
// verificationDataset.filter("bestAccessright.code = 'c_abf2'").count() == verificationDataset // verificationDataset.filter("bestAccessright.code = 'c_abf2'").count() == verificationDataset
// .filter("bestAccessright.code = 'c_abf2' and bestAccessright.label = 'OPEN'") // .filter("bestAccessright.code = 'c_abf2' and bestAccessright.label = 'OPEN'")
// .count() // .count()

View File

@ -97,7 +97,7 @@ public class CreateEntityTest {
Assertions.assertEquals(12, riList.size()); Assertions.assertEquals(12, riList.size());
riList.stream().forEach(c -> { riList.stream().forEach(c -> {
switch (c.getOriginalId()) { switch (c.getAcronym()) {
case "mes": case "mes":
Assertions Assertions
.assertTrue(c.getType().equals(eu.dnetlib.dhp.oa.graph.dump.Constants.RESEARCH_COMMUNITY)); .assertTrue(c.getType().equals(eu.dnetlib.dhp.oa.graph.dump.Constants.RESEARCH_COMMUNITY));
@ -115,9 +115,9 @@ public class CreateEntityTest {
String String
.format( .format(
"%s|%s::%s", Constants.CONTEXT_ID, Constants.CONTEXT_NS_PREFIX, "%s|%s::%s", Constants.CONTEXT_ID, Constants.CONTEXT_NS_PREFIX,
DHPUtils.md5(c.getOriginalId())))); DHPUtils.md5(c.getAcronym()))));
Assertions.assertTrue(c.getZenodo_community().equals("https://zenodo.org/communities/oac_mes")); Assertions.assertTrue(c.getZenodo_community().equals("https://zenodo.org/communities/oac_mes"));
Assertions.assertTrue("mes".equals(c.getOriginalId())); Assertions.assertTrue("mes".equals(c.getAcronym()));
break; break;
case "clarin": case "clarin":
Assertions Assertions
@ -130,9 +130,9 @@ public class CreateEntityTest {
String String
.format( .format(
"%s|%s::%s", Constants.CONTEXT_ID, Constants.CONTEXT_NS_PREFIX, "%s|%s::%s", Constants.CONTEXT_ID, Constants.CONTEXT_NS_PREFIX,
DHPUtils.md5(c.getOriginalId())))); DHPUtils.md5(c.getAcronym()))));
Assertions.assertTrue(c.getZenodo_community().equals("https://zenodo.org/communities/oac_clarin")); Assertions.assertTrue(c.getZenodo_community().equals("https://zenodo.org/communities/oac_clarin"));
Assertions.assertTrue("clarin".equals(c.getOriginalId())); Assertions.assertTrue("clarin".equals(c.getAcronym()));
break; break;
} }
// TODO add check for all the others Entities // TODO add check for all the others Entities

View File

@ -1,3 +1,4 @@
package eu.dnetlib.dhp.oa.graph.dump.complete; package eu.dnetlib.dhp.oa.graph.dump.complete;
import java.io.IOException; import java.io.IOException;
@ -28,72 +29,72 @@ import net.sf.saxon.expr.instruct.ForEach;
public class SelectRelationTest { public class SelectRelationTest {
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
private static SparkSession spark; private static SparkSession spark;
private static Path workingDir; private static Path workingDir;
private static final Logger log = LoggerFactory private static final Logger log = LoggerFactory
.getLogger(SelectRelationTest.class); .getLogger(SelectRelationTest.class);
private static HashMap<String, String> map = new HashMap<>(); private static HashMap<String, String> map = new HashMap<>();
@BeforeAll @BeforeAll
public static void beforeAll() throws IOException { public static void beforeAll() throws IOException {
workingDir = Files workingDir = Files
.createTempDirectory(SelectRelationTest.class.getSimpleName()); .createTempDirectory(SelectRelationTest.class.getSimpleName());
log.info("using work dir {}", workingDir); log.info("using work dir {}", workingDir);
SparkConf conf = new SparkConf(); SparkConf conf = new SparkConf();
conf.setAppName(SelectRelationTest.class.getSimpleName()); conf.setAppName(SelectRelationTest.class.getSimpleName());
conf.setMaster("local[*]"); conf.setMaster("local[*]");
conf.set("spark.driver.host", "localhost"); conf.set("spark.driver.host", "localhost");
conf.set("hive.metastore.local", "true"); conf.set("hive.metastore.local", "true");
conf.set("spark.ui.enabled", "false"); conf.set("spark.ui.enabled", "false");
conf.set("spark.sql.warehouse.dir", workingDir.toString()); conf.set("spark.sql.warehouse.dir", workingDir.toString());
conf.set("hive.metastore.warehouse.dir", workingDir.resolve("warehouse").toString()); conf.set("hive.metastore.warehouse.dir", workingDir.resolve("warehouse").toString());
spark = SparkSession spark = SparkSession
.builder() .builder()
.appName(SelectRelationTest.class.getSimpleName()) .appName(SelectRelationTest.class.getSimpleName())
.config(conf) .config(conf)
.getOrCreate(); .getOrCreate();
} }
@AfterAll @AfterAll
public static void afterAll() throws IOException { public static void afterAll() throws IOException {
FileUtils.deleteDirectory(workingDir.toFile()); FileUtils.deleteDirectory(workingDir.toFile());
spark.stop(); spark.stop();
} }
@Test @Test
public void test1() throws Exception { public void test1() throws Exception {
final String sourcePath = getClass() final String sourcePath = getClass()
.getResource("/eu/dnetlib/dhp/oa/graph/dump/selectrelations") .getResource("/eu/dnetlib/dhp/oa/graph/dump/selectrelations")
.getPath(); .getPath();
SparkSelectValidRelationsJob.main(new String[] { SparkSelectValidRelationsJob.main(new String[] {
"-isSparkSessionManaged", Boolean.FALSE.toString(), "-isSparkSessionManaged", Boolean.FALSE.toString(),
"-outputPath", workingDir.toString() + "/relation", "-outputPath", workingDir.toString() + "/relation",
"-sourcePath", sourcePath "-sourcePath", sourcePath
}); });
// dumpCommunityProducts.exec(MOCK_IS_LOOK_UP_URL,Boolean.FALSE, workingDir.toString()+"/dataset",sourcePath,"eu.dnetlib.dhp.schema.oaf.Dataset","eu.dnetlib.dhp.schema.dump.oaf.Dataset"); // dumpCommunityProducts.exec(MOCK_IS_LOOK_UP_URL,Boolean.FALSE, workingDir.toString()+"/dataset",sourcePath,"eu.dnetlib.dhp.schema.oaf.Dataset","eu.dnetlib.dhp.schema.dump.oaf.Dataset");
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
JavaRDD<eu.dnetlib.dhp.schema.oaf.Relation> tmp = sc JavaRDD<eu.dnetlib.dhp.schema.oaf.Relation> tmp = sc
.textFile(workingDir.toString() + "/relation") .textFile(workingDir.toString() + "/relation")
.map(item -> OBJECT_MAPPER.readValue(item, eu.dnetlib.dhp.schema.oaf.Relation.class)); .map(item -> OBJECT_MAPPER.readValue(item, eu.dnetlib.dhp.schema.oaf.Relation.class));
org.apache.spark.sql.Dataset<Relation> verificationDataset = spark org.apache.spark.sql.Dataset<Relation> verificationDataset = spark
.createDataset(tmp.rdd(), Encoders.bean(eu.dnetlib.dhp.schema.oaf.Relation.class)); .createDataset(tmp.rdd(), Encoders.bean(eu.dnetlib.dhp.schema.oaf.Relation.class));
Assertions.assertTrue(verificationDataset.count() == 7); Assertions.assertTrue(verificationDataset.count() == 7);
} }
} }