[Dump Subset] issue on the relations

This commit is contained in:
Miriam Baglioni 2022-12-22 09:38:09 +01:00
parent 45cc165e92
commit db36a9be2e
13 changed files with 330 additions and 49 deletions

View File

@ -323,7 +323,7 @@ public class ResultMapper implements Serializable {
}
}
} catch (ClassCastException cce) {
return out;
return null;
}
}

View File

@ -14,15 +14,18 @@ import org.dom4j.Element;
import org.dom4j.Node;
import org.dom4j.io.SAXReader;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.xml.sax.SAXException;
import eu.dnetlib.dhp.oa.graph.dump.subset.SparkDumpResult;
import eu.dnetlib.dhp.schema.common.ModelSupport;
import eu.dnetlib.dhp.utils.DHPUtils;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
public class QueryInformationSystem {
private static final Logger log = LoggerFactory.getLogger(QueryInformationSystem.class);
private ISLookUpService isLookUp;
private List<String> contextRelationResult;
@ -51,6 +54,7 @@ public class QueryInformationSystem {
String[] cSplit = c.split("@@");
cinfo.setId(cSplit[0]);
cinfo.setName(cSplit[1]);
log.info("community name : {}", cSplit[1]);
cinfo.setDescription(cSplit[2]);
if (!cSplit[3].trim().equals("")) {
cinfo.setSubject(Arrays.asList(cSplit[3].split(",")));

View File

@ -34,15 +34,11 @@ import eu.dnetlib.dhp.oa.graph.dump.subset.criteria.VerbResolver;
import eu.dnetlib.dhp.oa.graph.dump.subset.criteria.VerbResolverFactory;
import eu.dnetlib.dhp.oa.graph.dump.subset.selectionconstraints.Param;
import eu.dnetlib.dhp.oa.graph.dump.subset.selectionconstraints.SelectionConstraints;
import eu.dnetlib.dhp.oa.model.Container;
import eu.dnetlib.dhp.oa.model.graph.*;
import eu.dnetlib.dhp.oa.model.graph.Datasource;
import eu.dnetlib.dhp.oa.model.graph.Organization;
import eu.dnetlib.dhp.oa.model.graph.Project;
import eu.dnetlib.dhp.schema.oaf.*;
/**
* Spark Job that fires the dump for the entites
* Spark Job that fires the dump for the entities
*/
public class SparkDumpResult implements Serializable {
private static final Logger log = LoggerFactory.getLogger(SparkDumpResult.class);
@ -142,7 +138,7 @@ public class SparkDumpResult implements Serializable {
.readPath(spark, inputPath, inputClazz)
.map(
(MapFunction<I, I>) value -> filterResult(
value, pathMap, selectionConstraints, inputClazz, masterDuplicateList),
value, pathMap, selectionConstraints, inputClazz, masterDuplicateList, resultType),
Encoders.bean(inputClazz))
.filter(Objects::nonNull)
.write()
@ -167,7 +163,8 @@ public class SparkDumpResult implements Serializable {
}
private static <I extends eu.dnetlib.dhp.schema.oaf.Result> I filterResult(I value, Map<String, String> pathMap,
SelectionConstraints selectionConstraints, Class<I> inputClazz, List<MasterDuplicate> masterDuplicateList) {
SelectionConstraints selectionConstraints, Class<I> inputClazz, List<MasterDuplicate> masterDuplicateList,
String resultType) {
Optional<DataInfo> odInfo = Optional.ofNullable(value.getDataInfo());
if (Boolean.FALSE.equals(odInfo.isPresent())) {
@ -178,6 +175,10 @@ public class SparkDumpResult implements Serializable {
return null;
}
if (!isCompatible(value.getResulttype().getClassid(), resultType)) {
return null;
}
if (selectionConstraints != null) {
Param param = new Param();
String json = new Gson().toJson(value, inputClazz);
@ -205,6 +206,10 @@ public class SparkDumpResult implements Serializable {
return value;
}
private static boolean isCompatible(String classid, String resultType) {
return (classid.equals(resultType) || (classid.equals("other") && resultType.equals("otherresearchproduct")));
}
private static void update(KeyValue kv, List<MasterDuplicate> masterDuplicateList) {
for (MasterDuplicate md : masterDuplicateList) {
if (md.getDuplicate().equals(kv.getKey())) {

View File

@ -85,32 +85,32 @@ public class SparkSelectSubset implements Serializable {
&& !removeSet.contains(r.getRelClass()));
Dataset<String> resultIds = Utils
.readPath(spark, outputPath + "/dump/publication", GraphResult.class)
.readPath(spark, outputPath + "/original/publication", Publication.class)
.map((MapFunction<GraphResult, String>) p -> p.getId(), Encoders.STRING())
.map((MapFunction<Publication, String>) p -> p.getId(), Encoders.STRING())
.union(
Utils
.readPath(spark, outputPath + "/dump/dataset", GraphResult.class)
.readPath(spark, outputPath + "/original/dataset", eu.dnetlib.dhp.schema.oaf.Dataset.class)
.map((MapFunction<GraphResult, String>) d -> d.getId(), Encoders.STRING()))
.map((MapFunction<eu.dnetlib.dhp.schema.oaf.Dataset, String>) d -> d.getId(), Encoders.STRING()))
.union(
Utils
.readPath(spark, outputPath + "/dump/software", GraphResult.class)
.readPath(spark, outputPath + "/original/software", Software.class)
.map((MapFunction<GraphResult, String>) s -> s.getId(), Encoders.STRING()))
.map((MapFunction<Software, String>) s -> s.getId(), Encoders.STRING()))
.union(
Utils
.readPath(spark, outputPath + "/dump/otherresearchproduct", GraphResult.class)
.readPath(spark, outputPath + "/original/otherresearchproduct", OtherResearchProduct.class)
.map((MapFunction<GraphResult, String>) o -> o.getId(), Encoders.STRING()));
.map((MapFunction<OtherResearchProduct, String>) o -> o.getId(), Encoders.STRING()));
// select result -> result relations
Dataset<Relation> relJoinSource = relation
Dataset<Relation> relResultResult = relation
.joinWith(resultIds, relation.col("source").equalTo(resultIds.col("value")))
.map((MapFunction<Tuple2<Relation, String>, Relation>) t2 -> t2._1(), Encoders.bean(Relation.class));
relJoinSource
.joinWith(resultIds, relJoinSource.col("target").equalTo(resultIds.col("value")))
relResultResult
.joinWith(resultIds, relResultResult.col("target").equalTo(resultIds.col("value")))
.map((MapFunction<Tuple2<Relation, String>, Relation>) t2 -> t2._1(), Encoders.bean(Relation.class))
.write()
.option("compression", "gzip")
@ -133,40 +133,45 @@ public class SparkSelectSubset implements Serializable {
.filter((FilterFunction<Datasource>) e -> !e.getDataInfo().getDeletedbyinference())
.map((MapFunction<Datasource, String>) d -> d.getId(), Encoders.STRING()));
relJoinSource = relation
Dataset<Relation> relResultOther = relation
.joinWith(resultIds, relation.col("source").equalTo(resultIds.col("value")))
.map((MapFunction<Tuple2<Relation, String>, Relation>) t2 -> t2._1(), Encoders.bean(Relation.class));
relJoinSource
.joinWith(otherIds, relJoinSource.col("target").equalTo(otherIds.col("value")))
relResultOther
.joinWith(otherIds, relResultOther.col("target").equalTo(otherIds.col("value")))
.map((MapFunction<Tuple2<Relation, String>, Relation>) t2 -> t2._1(), Encoders.bean(Relation.class))
.write()
.mode(SaveMode.Append)
.option("compression", "gzip")
.json(outputPath + "/original/relation");
relJoinSource = relation
Dataset<Relation> relOtherResult = relation
.joinWith(resultIds, relation.col("target").equalTo(resultIds.col("value")))
.map((MapFunction<Tuple2<Relation, String>, Relation>) t2 -> t2._1(), Encoders.bean(Relation.class));
relJoinSource
.joinWith(otherIds, relJoinSource.col("source").equalTo(otherIds.col("value")))
relOtherResult
.joinWith(otherIds, relOtherResult.col("source").equalTo(otherIds.col("value")))
.map((MapFunction<Tuple2<Relation, String>, Relation>) t2 -> t2._1(), Encoders.bean(Relation.class))
.write()
.mode(SaveMode.Append)
.option("compression", "gzip")
.json(outputPath + "/original/relation");
relJoinSource = Utils.readPath(spark, outputPath + "/original/relation", Relation.class);
Dataset<String> relAll = Utils
.readPath(spark, outputPath + "/original/relation", Relation.class)
.flatMap(
(FlatMapFunction<Relation, String>) r -> Arrays.asList(r.getSource(), r.getTarget()).iterator(),
Encoders.STRING())
.distinct();
// Save the entities in relations with at least one result
// relations are bidirectional, so it is enough to have the match to one side
Dataset<Organization> organization = Utils
.readPath(spark, inputPath + "/organization", Organization.class)
.filter((FilterFunction<Organization>) o -> !o.getDataInfo().getDeletedbyinference());
organization
.joinWith(relJoinSource, organization.col("id").equalTo(relJoinSource.col("source")))
.joinWith(relAll, organization.col("id").equalTo(relAll.col("value")))
.map(
(MapFunction<Tuple2<Organization, Relation>, Organization>) t2 -> t2._1(),
(MapFunction<Tuple2<Organization, String>, Organization>) t2 -> t2._1(),
Encoders.bean(Organization.class))
.groupByKey((MapFunction<Organization, String>) v -> v.getId(), Encoders.STRING())
.mapGroups(
@ -181,8 +186,8 @@ public class SparkSelectSubset implements Serializable {
.readPath(spark, inputPath + "/datasource", Datasource.class)
.filter((FilterFunction<Datasource>) d -> !d.getDataInfo().getDeletedbyinference());
datasource
.joinWith(relJoinSource, datasource.col("id").equalTo(relJoinSource.col("source")))
.map((MapFunction<Tuple2<Datasource, Relation>, Datasource>) t2 -> t2._1(), Encoders.bean(Datasource.class))
.joinWith(relAll, datasource.col("id").equalTo(relAll.col("value")))
.map((MapFunction<Tuple2<Datasource, String>, Datasource>) t2 -> t2._1(), Encoders.bean(Datasource.class))
.groupByKey((MapFunction<Datasource, String>) v -> v.getId(), Encoders.STRING())
.mapGroups(
(MapGroupsFunction<String, Datasource, Datasource>) (k, it) -> it.next(),
@ -283,8 +288,8 @@ public class SparkSelectSubset implements Serializable {
.readPath(spark, inputPath + "/project", Project.class)
.filter((FilterFunction<Project>) d -> !d.getDataInfo().getDeletedbyinference());
project
.joinWith(relJoinSource, project.col("id").equalTo(relJoinSource.col("source")))
.map((MapFunction<Tuple2<Project, Relation>, Project>) t2 -> t2._1(), Encoders.bean(Project.class))
.joinWith(relAll, project.col("id").equalTo(relAll.col("value")))
.map((MapFunction<Tuple2<Project, String>, Project>) t2 -> t2._1(), Encoders.bean(Project.class))
.groupByKey((MapFunction<Project, String>) v -> v.getId(), Encoders.STRING())
.mapGroups((MapGroupsFunction<String, Project, Project>) (k, it) -> it.next(), Encoders.bean(Project.class))
.write()
@ -306,12 +311,12 @@ public class SparkSelectSubset implements Serializable {
.readPath(spark, outputPath + "/original/datasource", Datasource.class)
.map((MapFunction<Datasource, String>) d -> d.getId(), Encoders.STRING()));
relJoinSource = relation
Dataset<Relation> relOtherOther = relation
.joinWith(selectedIDs, relation.col("source").equalTo(selectedIDs.col("value")))
.map((MapFunction<Tuple2<Relation, String>, Relation>) t2 -> t2._1(), Encoders.bean(Relation.class));
relJoinSource
.joinWith(selectedIDs, relJoinSource.col("target").equalTo(selectedIDs.col("value")))
relOtherOther
.joinWith(selectedIDs, relOtherOther.col("target").equalTo(selectedIDs.col("value")))
.map((MapFunction<Tuple2<Relation, String>, Relation>) t2 -> t2._1(), Encoders.bean(Relation.class))
.write()
.mode(SaveMode.Append)

View File

@ -127,7 +127,7 @@ public class SparkSelectValidContext implements Serializable {
private static boolean extracted(String c, List<String> keySet) {
if (keySet.contains(c))
return true;
if (c.contains("::") && keySet.contains(c.substring(0, c.indexOf("::"))))
if (c.contains(":") && keySet.contains(c.substring(0, c.indexOf(":"))))
return true;
return false;
}

View File

@ -0,0 +1,151 @@
package eu.dnetlib.dhp.oa.graph.dump.subset;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import java.io.Serializable;
import java.util.Optional;
import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.oa.graph.dump.Utils;
import eu.dnetlib.dhp.oa.model.graph.GraphResult;
import eu.dnetlib.dhp.oa.model.graph.Relation;
import eu.dnetlib.dhp.oa.model.graph.ResearchCommunity;
import scala.Tuple2;
/**
* @author miriam.baglioni
* @Date 15/11/22
*/
public class SparkSelectValidRelation implements Serializable {
private static final Logger log = LoggerFactory.getLogger(SparkSelectValidRelation.class);
public static void main(String[] args) throws Exception {
String jsonConfiguration = IOUtils
.toString(
SparkSelectValidRelation.class
.getResourceAsStream(
"/eu/dnetlib/dhp/oa/graph/dump/input_select_valid_relation_parameters.json"));
final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
parser.parseArgument(args);
Boolean isSparkSessionManaged = Optional
.ofNullable(parser.get("isSparkSessionManaged"))
.map(Boolean::valueOf)
.orElse(Boolean.TRUE);
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
// results dumped
final String inputPath = parser.get("sourcePath");
log.info("inputPath: {}", inputPath);
// all relations plus those produced via context and extracted from results
final String relationPath = parser.get("relationPath");
log.info("relationPath: {}", relationPath);
SparkConf conf = new SparkConf();
runWithSparkSession(
conf,
isSparkSessionManaged,
spark -> {
selectValidRelation(spark, inputPath, relationPath);
});
}
private static void selectValidRelation(SparkSession spark, String inputPath,
String relationPath) {
// read the results
Dataset<String> dumpedIds = Utils
.readPath(spark, inputPath + "/publication", GraphResult.class)
.map((MapFunction<GraphResult, String>) r -> r.getId(), Encoders.STRING())
.union(
Utils
.readPath(spark, inputPath + "/dataset", GraphResult.class)
.map((MapFunction<GraphResult, String>) r -> r.getId(), Encoders.STRING()))
.union(
Utils
.readPath(spark, inputPath + "/software", GraphResult.class)
.map((MapFunction<GraphResult, String>) r -> r.getId(), Encoders.STRING()))
.union(
Utils
.readPath(spark, inputPath + "/otherresearchproduct", GraphResult.class)
.map((MapFunction<GraphResult, String>) r -> r.getId(), Encoders.STRING()))
.union(
Utils
.readPath(spark, inputPath + "/organization", eu.dnetlib.dhp.oa.model.graph.Organization.class)
.map(
(MapFunction<eu.dnetlib.dhp.oa.model.graph.Organization, String>) o -> o.getId(),
Encoders.STRING()))
.union(
Utils
.readPath(spark, inputPath + "/project", eu.dnetlib.dhp.oa.model.graph.Project.class)
.map(
(MapFunction<eu.dnetlib.dhp.oa.model.graph.Project, String>) o -> o.getId(), Encoders.STRING()))
.union(
Utils
.readPath(spark, inputPath + "/datasource", eu.dnetlib.dhp.oa.model.graph.Datasource.class)
.map(
(MapFunction<eu.dnetlib.dhp.oa.model.graph.Datasource, String>) o -> o.getId(),
Encoders.STRING()))
.union(
Utils
.readPath(spark, inputPath + "/community_infrastructure", ResearchCommunity.class)
.map((MapFunction<ResearchCommunity, String>) c -> c.getId(), Encoders.STRING()));
Dataset<Tuple2<String, Relation>> relationSource = Utils
.readPath(spark, relationPath, Relation.class)
.map(
(MapFunction<Relation, Tuple2<String, Relation>>) r -> new Tuple2<>(r.getSource().getId(), r),
Encoders.tuple(Encoders.STRING(), Encoders.bean(Relation.class)));
Dataset<Tuple2<String, Relation>> relJoinSource = relationSource
.joinWith(dumpedIds, relationSource.col("_1").equalTo(dumpedIds.col("value")))
.map(
(MapFunction<Tuple2<Tuple2<String, Relation>, String>, Tuple2<String, Relation>>) t2 -> new Tuple2<>(
t2._1()._2().getTarget().getId(), t2._1()._2()),
Encoders.tuple(Encoders.STRING(), Encoders.bean(Relation.class)));
relJoinSource
.joinWith(dumpedIds, relJoinSource.col("_1").equalTo(dumpedIds.col("value")))
.map(
(MapFunction<Tuple2<Tuple2<String, Relation>, String>, Relation>) t2 -> t2._1()._2(),
Encoders.bean(Relation.class))
.write()
.mode(SaveMode.Overwrite)
.option("compression", "gzip")
.json(inputPath + "/relation");
// relJoinSource = relationSource
// .joinWith(dumpedIds, relationSource.col("_1").equalTo(dumpedIds.col("value")))
// .map(
// (MapFunction<Tuple2<Tuple2<String, Relation>, String>, Tuple2<String, Relation>>) t2 -> new Tuple2<>(
// t2._1()._2().getTarget().getId(), t2._1()._2()),
// Encoders.tuple(Encoders.STRING(), Encoders.bean(Relation.class)));
//
// relJoinSource
// .joinWith(dumpedIds, relJoinSource.col("_1").equalTo(dumpedIds.col("value")))
// .map(
// (MapFunction<Tuple2<Tuple2<String, Relation>, String>, Relation>) t2 -> t2._1()._2(),
// Encoders.bean(Relation.class))
// .write()
// .mode(SaveMode.Append)
// .option("compression", "gzip")
// .json(inputPath + "/relation");
}
}

View File

@ -0,0 +1,27 @@
[
{
"paramName":"s",
"paramLongName":"sourcePath",
"paramDescription": "the path of the sequencial file to read",
"paramRequired": true
},
{
"paramName": "ssm",
"paramLongName": "isSparkSessionManaged",
"paramDescription": "true if the spark session is managed, false otherwise",
"paramRequired": false
},
{
"paramName":"rp",
"paramLongName":"relationPath",
"paramDescription": "the map to find fields in the json",
"paramRequired": false
}
]

View File

@ -349,8 +349,6 @@
<path start="create_relation_fromorgs"/>
</fork>
<action name="create_entities_fromcontext">
<java>
<main-class>eu.dnetlib.dhp.oa.graph.dump.complete.CreateContextEntities</main-class>
@ -390,7 +388,7 @@
<action name="create_relation_fromcontext">
<java>
<main-class>eu.dnetlib.dhp.oa.graph.dump.complete.CreateContextRelation</main-class>
<arg>--hdfsPath</arg><arg>${workingDir}/relation/context</arg>
<arg>--hdfsPath</arg><arg>${workingDir}/dump/relation/context</arg>
<arg>--nameNode</arg><arg>${nameNode}</arg>
<arg>--isLookUpUrl</arg><arg>${isLookUpUrl}</arg>
</java>
@ -416,7 +414,7 @@
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
</spark-opts>
<arg>--sourcePath</arg><arg>${sourcePath}/relation</arg>
<arg>--outputPath</arg><arg>${workingDir}/relation/contextOrg</arg>
<arg>--outputPath</arg><arg>${workingDir}/dump/relation/contextOrg</arg>
<arg>--organizationCommunityMap</arg><arg>${organizationCommunityMap}</arg>
<arg>--communityMapPath</arg><arg>${communityMapPath}</arg>
</spark>
@ -470,7 +468,7 @@
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
</spark-opts>
<arg>--sourcePath</arg><arg>${outputPath}/original/relation</arg>
<arg>--outputPath</arg><arg>${outputPath}/dump/relation</arg>
<arg>--outputPath</arg><arg>${workingDir}/relation</arg>
<arg>--removeSet</arg><arg>${removeSet}</arg>
</spark>
<ok to="rels_from_pubs"/>
@ -496,7 +494,7 @@
</spark-opts>
<arg>--sourcePath</arg><arg>${outputPath}/original/publication</arg>
<arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.Publication</arg>
<arg>--outputPath</arg><arg>${outputPath}/dump/relation</arg>
<arg>--outputPath</arg><arg>${workingDir}/relation</arg>
<arg>--communityMapPath</arg><arg>${communityMapPath}</arg>
</spark>
<ok to="rels_from_dats"/>
@ -522,7 +520,7 @@
</spark-opts>
<arg>--sourcePath</arg><arg>${outputPath}/original/dataset</arg>
<arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.Dataset</arg>
<arg>--outputPath</arg><arg>${outputPath}/dump/relation</arg>
<arg>--outputPath</arg><arg>${workingDir}/relation</arg>
<arg>--communityMapPath</arg><arg>${communityMapPath}</arg>
</spark>
<ok to="rels_from_orp"/>
@ -548,7 +546,7 @@
</spark-opts>
<arg>--sourcePath</arg><arg>${outputPath}/original/otherresearchproduct</arg>
<arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.OtherResearchProduct</arg>
<arg>--outputPath</arg><arg>${outputPath}/dump/relation</arg>
<arg>--outputPath</arg><arg>${workingDir}/relation</arg>
<arg>--communityMapPath</arg><arg>${communityMapPath}</arg>
</spark>
<ok to="rels_from_sw"/>
@ -574,14 +572,37 @@
</spark-opts>
<arg>--sourcePath</arg><arg>${outputPath}/original/software</arg>
<arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.Software</arg>
<arg>--outputPath</arg><arg>${outputPath}/dump/relation</arg>
<arg>--outputPath</arg><arg>${workingDir}/relation</arg>
<arg>--communityMapPath</arg><arg>${communityMapPath}</arg>
</spark>
<ok to="filter_relation"/>
<error to="Kill"/>
</action>
<action name="filter_relation">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Select valid relations</name>
<class>eu.dnetlib.dhp.oa.graph.dump.subset.SparkSelectValidRelation</class>
<jar>dump-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
</spark-opts>
<arg>--sourcePath</arg><arg>${outputPath}/dump</arg>
<arg>--relationPath</arg><arg>${workingDir}/relation</arg> <!-- new relations from context -->
</spark>
<ok to="End"/>
<error to="Kill"/>
</action>
<kill name="Kill">
<message>Sub-workflow dump complete failed with error message ${wf:errorMessage()}
</message>

View File

@ -1077,4 +1077,35 @@ public class DumpJobTest {
.getString(2));
}
@Test
public void testresultNotDumped() throws Exception {
final String sourcePath = getClass()
.getResource("/eu/dnetlib/dhp/oa/graph/dump/resultDump/resultNotDumped.json")
.getPath();
final String communityMapPath = getClass()
.getResource("/eu/dnetlib/dhp/oa/graph/dump/communityMapPath/communitymap.json")
.getPath();
SparkDumpEntitiesJob
.main(
new String[] {
"-isSparkSessionManaged", Boolean.FALSE.toString(),
"-sourcePath", sourcePath,
"-resultTableName", "eu.dnetlib.dhp.schema.oaf.Publication",
"-outputPath", workingDir.toString() + "/result",
"-communityMapPath", communityMapPath
});
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
JavaRDD<GraphResult> tmp = sc
.textFile(workingDir.toString() + "/result")
.map(item -> OBJECT_MAPPER.readValue(item, GraphResult.class));
Assertions.assertEquals(0, tmp.count());
}
}

View File

@ -172,4 +172,37 @@ public class DumpOrganizationProjectDatasourceTest {
.println(OBJECT_MAPPER.writeValueAsString(o)));
}
@Test
public void dumpDatasourceNotDumpedTest() throws Exception {
final String sourcePath = getClass()
.getResource("/eu/dnetlib/dhp/oa/graph/dump/complete/datasourcenotdumped")
.getPath();
SparkDumpEntitiesJob
.main(
new String[] {
"-isSparkSessionManaged", Boolean.FALSE.toString(),
"-sourcePath", sourcePath,
"-resultTableName", "eu.dnetlib.dhp.schema.oaf.Datasource",
"-outputPath", workingDir.toString() + "/dump"
});
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
JavaRDD<eu.dnetlib.dhp.oa.model.graph.Datasource> tmp = sc
.textFile(workingDir.toString() + "/dump")
.map(item -> OBJECT_MAPPER.readValue(item, eu.dnetlib.dhp.oa.model.graph.Datasource.class));
org.apache.spark.sql.Dataset<eu.dnetlib.dhp.oa.model.graph.Datasource> verificationDataset = spark
.createDataset(tmp.rdd(), Encoders.bean(eu.dnetlib.dhp.oa.model.graph.Datasource.class));
Assertions.assertEquals(1, verificationDataset.count());
verificationDataset
.foreach(
(ForeachFunction<eu.dnetlib.dhp.oa.model.graph.Datasource>) o -> System.out
.println(OBJECT_MAPPER.writeValueAsString(o)));
}
}

View File

@ -134,6 +134,8 @@ public class SplitPerFunderTest {
.map(item -> OBJECT_MAPPER.readValue(item, CommunityResult.class));
Assertions.assertEquals(3, tmp.count());
tmp.foreach(r -> System.out.println(new ObjectMapper().writeValueAsString(r)));
// MZOS 1
tmp = sc
.textFile(workingDir.toString() + "/split/MZOS")

View File

@ -0,0 +1 @@
{"accessinfopackage":[],"collectedfrom":[{"key":"10|openaire____::081b82f96300b6a6e3d282bad31cb6e2","value":"Crossref"}],"consenttermsofuse":false,"contentpolicies":[{"classid":"Journal articles","classname":"Journal articles","schemeid":"eosc:contentpolicies","schemename":"eosc:contentpolicies"}],"dataInfo":{"deletedbyinference":false,"inferred":false,"invisible":false,"provenanceaction":{"classid":"sysimport:crosswalk:entityregistry","classname":"Harvested","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"},"trust":"0.900"},"datasourcetype":{"classid":"pubsrepository::journal","classname":"Journal","schemeid":"dnet:datasource_typologies","schemename":"dnet:datasource_typologies"},"datasourcetypeui":{"classid":"Journal archive","classname":"Journal archive","schemeid":"dnet:datasource_typologies_ui","schemename":"dnet:datasource_typologies_ui"},"dateofcollection":"2020-07-10","englishname":{"dataInfo":{"deletedbyinference":false,"inferred":false,"invisible":false,"provenanceaction":{"classid":"sysimport:crosswalk:entityregistry","classname":"Harvested","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"},"trust":"0.900"},"value":"Arachnology"},"eoscdatasourcetype":{"classid":"Journal archive","classname":"Journal Archive","schemeid":"dnet:eosc_datasource_types","schemename":"dnet:eosc_datasource_types"},"eosctype":{"classid":"Data Source","classname":"Data Source","schemeid":"","schemename":""},"extraInfo":[],"fulltextdownload":false,"id":"10|issn___print::2d7299a5fd9d7e3db4e6b4c0245fd7c3","journal":{"dataInfo":{"deletedbyinference":false,"inferred":false,"invisible":false,"provenanceaction":{"classid":"sysimport:crosswalk:entityregistry","classname":"Harvested","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"},"trust":"0.900"},"issnOnline":"2050-9936","issnPrinted":"2050-9928","name":"Arachnology"},"languages":[],"lastupdatetimestamp":1668505479963,"latitude":{"dataInfo":{"deletedbyinference":false,"inferred":false,"invisible":false,"provenanceaction":{"classid":"sysimport:crosswalk:entityregistry","classname":"Harvested","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"},"trust":"0.900"},"value":"0.0"},"longitude":{"dataInfo":{"deletedbyinference":false,"inferred":false,"invisible":false,"provenanceaction":{"classid":"sysimport:crosswalk:entityregistry","classname":"Harvested","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"},"trust":"0.900"},"value":"0.0"},"namespaceprefix":{"dataInfo":{"deletedbyinference":false,"inferred":false,"invisible":false,"provenanceaction":{"classid":"sysimport:crosswalk:entityregistry","classname":"Harvested","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"},"trust":"0.900"},"value":"jrnl20509928"},"odlanguages":[],"odnumberofitems":{"dataInfo":{"deletedbyinference":false,"inferred":false,"invisible":false,"provenanceaction":{"classid":"sysimport:crosswalk:entityregistry","classname":"Harvested","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"},"trust":"0.900"},"value":"0.0"},"officialname":{"dataInfo":{"deletedbyinference":false,"inferred":false,"invisible":false,"provenanceaction":{"classid":"sysimport:crosswalk:entityregistry","classname":"Harvested","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"},"trust":"0.900"},"value":"Arachnology"},"openairecompatibility":{"classid":"hostedBy","classname":"collected from a compatible aggregator","schemeid":"dnet:datasourceCompatibilityLevel","schemename":"dnet:datasourceCompatibilityLevel"},"originalId":["issn___print::2050-9928"],"pid":[],"policies":[],"researchentitytypes":["Literature"],"subjects":[],"thematic":false,"versioncontrol":false,"versioning":{"dataInfo":{"deletedbyinference":false,"inferred":false,"invisible":false,"provenanceaction":{"classid":"sysimport:crosswalk:entityregistry","classname":"Harvested","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"},"trust":"0.900"},"value":false}}

File diff suppressed because one or more lines are too long