[Dump Subset] moved one step ahead the change of master in hosted by, collectedfrom

This commit is contained in:
Miriam Baglioni 2022-11-30 09:54:45 +01:00
parent 054103ae70
commit 45cc165e92
7 changed files with 113 additions and 640 deletions

View File

@ -10,13 +10,10 @@ import java.util.stream.Collectors;
import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FilterFunction;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.catalyst.expressions.Encode;
import org.dom4j.Document;
import org.dom4j.DocumentException;
import org.dom4j.Node;
@ -33,21 +30,16 @@ import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.oa.graph.dump.Constants;
import eu.dnetlib.dhp.oa.graph.dump.ResultMapper;
import eu.dnetlib.dhp.oa.graph.dump.Utils;
import eu.dnetlib.dhp.oa.graph.dump.community.CommunityMap;
import eu.dnetlib.dhp.oa.graph.dump.exceptions.CardinalityTooHighException;
import eu.dnetlib.dhp.oa.graph.dump.exceptions.NoAvailableEntityTypeException;
import eu.dnetlib.dhp.oa.graph.dump.subset.criteria.VerbResolver;
import eu.dnetlib.dhp.oa.graph.dump.subset.criteria.VerbResolverFactory;
import eu.dnetlib.dhp.oa.graph.dump.subset.selectionconstraints.Param;
import eu.dnetlib.dhp.oa.graph.dump.subset.selectionconstraints.SelectionConstraints;
import eu.dnetlib.dhp.oa.model.Container;
import eu.dnetlib.dhp.oa.model.Result;
import eu.dnetlib.dhp.oa.model.graph.*;
import eu.dnetlib.dhp.schema.common.ModelSupport;
import eu.dnetlib.dhp.schema.oaf.DataInfo;
import eu.dnetlib.dhp.schema.oaf.Field;
import eu.dnetlib.dhp.schema.oaf.Journal;
import eu.dnetlib.dhp.schema.oaf.OafEntity;
import eu.dnetlib.dhp.oa.model.graph.Datasource;
import eu.dnetlib.dhp.oa.model.graph.Organization;
import eu.dnetlib.dhp.oa.model.graph.Project;
import eu.dnetlib.dhp.schema.oaf.*;
/**
* Spark Job that fires the dump for the entites
@ -86,6 +78,9 @@ public class SparkDumpResult implements Serializable {
final String resultClassName = parser.get("resultTableName");
log.info("resultTableName: {}", resultClassName);
final String masterDuplicatePath = parser.get("masterDuplicatePath");
log.info("masterDuplicatePath: {}", masterDuplicatePath);
Optional<String> pathString = Optional.ofNullable(parser.get("pathMap"));
HashMap<String, String> pathMap = null;
if (pathString.isPresent()) {
@ -101,17 +96,18 @@ public class SparkDumpResult implements Serializable {
}
Class<? extends OafEntity> inputClazz = (Class<? extends OafEntity>) Class.forName(resultClassName);
Class<? extends eu.dnetlib.dhp.schema.oaf.Result> inputClazz = (Class<? extends eu.dnetlib.dhp.schema.oaf.Result>) Class
.forName(resultClassName);
run(
isSparkSessionManaged, inputPath, outputPath, pathMap, selectionConstraints, inputClazz,
resultType);
resultType, masterDuplicatePath);
}
private static void run(Boolean isSparkSessionManaged, String inputPath, String outputPath,
HashMap<String, String> pathMap, SelectionConstraints selectionConstraints,
Class<? extends OafEntity> inputClazz, String resultType) {
Class<? extends eu.dnetlib.dhp.schema.oaf.Result> inputClazz, String resultType, String masterDuplicatePath) {
SparkConf conf = new SparkConf();
HashMap<String, String> finalPathMap = pathMap;
@ -124,24 +120,29 @@ public class SparkDumpResult implements Serializable {
Utils.removeOutputDir(spark, outputPath + "/dump/" + resultType);
resultDump(
spark, inputPath, outputPath, inputClazz, finalPathMap,
finalSelectionConstraints, resultType);
finalSelectionConstraints, resultType, masterDuplicatePath);
});
}
public static <I extends OafEntity> void resultDump(
public static <I extends eu.dnetlib.dhp.schema.oaf.Result> void resultDump(
SparkSession spark,
String inputPath,
String outputPath,
Class<I> inputClazz,
Map<String, String> pathMap,
SelectionConstraints selectionConstraints,
String resultType) {
String resultType,
String masterDuplicatePath) {
List<MasterDuplicate> masterDuplicateList = Utils
.readPath(spark, masterDuplicatePath, MasterDuplicate.class)
.collectAsList();
Utils
.readPath(spark, inputPath, inputClazz)
.map(
(MapFunction<I, I>) value -> filterResult(value, pathMap, selectionConstraints, inputClazz),
(MapFunction<I, I>) value -> filterResult(
value, pathMap, selectionConstraints, inputClazz, masterDuplicateList),
Encoders.bean(inputClazz))
.filter(Objects::nonNull)
.write()
@ -165,8 +166,8 @@ public class SparkDumpResult implements Serializable {
}
private static <I extends OafEntity> I filterResult(I value, Map<String, String> pathMap,
SelectionConstraints selectionConstraints, Class<I> inputClazz) {
private static <I extends eu.dnetlib.dhp.schema.oaf.Result> I filterResult(I value, Map<String, String> pathMap,
SelectionConstraints selectionConstraints, Class<I> inputClazz, List<MasterDuplicate> masterDuplicateList) {
Optional<DataInfo> odInfo = Optional.ofNullable(value.getDataInfo());
if (Boolean.FALSE.equals(odInfo.isPresent())) {
@ -193,441 +194,24 @@ public class SparkDumpResult implements Serializable {
return null;
}
}
if (Optional.ofNullable(value.getCollectedfrom()).isPresent())
value.getCollectedfrom().forEach(cf -> update(cf, masterDuplicateList));
if (Optional.ofNullable(value.getInstance()).isPresent()) {
value.getInstance().forEach(i -> {
update(i.getCollectedfrom(), masterDuplicateList);
update(i.getHostedby(), masterDuplicateList);
});
}
return value;
}
private static <E extends OafEntity> void datasourceMap(SparkSession spark, String inputPath, String outputPath,
Class<E> inputClazz) {
Utils
.readPath(spark, inputPath, inputClazz)
.map(
(MapFunction<E, Datasource>) d -> mapDatasource((eu.dnetlib.dhp.schema.oaf.Datasource) d),
Encoders.bean(Datasource.class))
.filter(Objects::nonNull)
.write()
.mode(SaveMode.Overwrite)
.option(COMPRESSION, GZIP)
.json(outputPath);
}
private static <E extends OafEntity> void projectMap(SparkSession spark, String inputPath, String outputPath,
Class<E> inputClazz) {
Utils
.readPath(spark, inputPath, inputClazz)
.map(
(MapFunction<E, Project>) p -> mapProject((eu.dnetlib.dhp.schema.oaf.Project) p),
Encoders.bean(Project.class))
.write()
.mode(SaveMode.Overwrite)
.option(COMPRESSION, GZIP)
.json(outputPath);
}
private static Datasource mapDatasource(eu.dnetlib.dhp.schema.oaf.Datasource d) {
Datasource datasource = new Datasource();
datasource.setId(d.getId());
Optional
.ofNullable(d.getOriginalId())
.ifPresent(
oId -> datasource.setOriginalId(oId.stream().filter(Objects::nonNull).collect(Collectors.toList())));
Optional
.ofNullable(d.getPid())
.ifPresent(
pids -> datasource
.setPid(
pids
.stream()
.map(p -> DatasourcePid.newInstance(p.getQualifier().getClassid(), p.getValue()))
.collect(Collectors.toList())));
Optional
.ofNullable(d.getDatasourcetype())
.ifPresent(
dsType -> datasource
.setDatasourcetype(DatasourceSchemeValue.newInstance(dsType.getClassid(), dsType.getClassname())));
Optional
.ofNullable(d.getOpenairecompatibility())
.ifPresent(v -> datasource.setOpenairecompatibility(v.getClassname()));
Optional
.ofNullable(d.getOfficialname())
.ifPresent(oname -> datasource.setOfficialname(oname.getValue()));
Optional
.ofNullable(d.getEnglishname())
.ifPresent(ename -> datasource.setEnglishname(ename.getValue()));
Optional
.ofNullable(d.getWebsiteurl())
.ifPresent(wsite -> datasource.setWebsiteurl(wsite.getValue()));
Optional
.ofNullable(d.getLogourl())
.ifPresent(lurl -> datasource.setLogourl(lurl.getValue()));
Optional
.ofNullable(d.getDateofvalidation())
.ifPresent(dval -> datasource.setDateofvalidation(dval.getValue()));
Optional
.ofNullable(d.getDescription())
.ifPresent(dex -> datasource.setDescription(dex.getValue()));
Optional
.ofNullable(d.getSubjects())
.ifPresent(
sbjs -> datasource.setSubjects(sbjs.stream().map(sbj -> sbj.getValue()).collect(Collectors.toList())));
Optional
.ofNullable(d.getOdpolicies())
.ifPresent(odp -> datasource.setPolicies(Arrays.asList(odp.getValue())));
Optional
.ofNullable(d.getOdlanguages())
.ifPresent(
langs -> datasource
.setLanguages(langs.stream().map(lang -> lang.getValue()).collect(Collectors.toList())));
Optional
.ofNullable(d.getOdcontenttypes())
.ifPresent(
ctypes -> datasource
.setContenttypes(ctypes.stream().map(ctype -> ctype.getValue()).collect(Collectors.toList())));
Optional
.ofNullable(d.getReleasestartdate())
.ifPresent(rd -> datasource.setReleasestartdate(rd.getValue()));
Optional
.ofNullable(d.getReleaseenddate())
.ifPresent(ed -> datasource.setReleaseenddate(ed.getValue()));
Optional
.ofNullable(d.getMissionstatementurl())
.ifPresent(ms -> datasource.setMissionstatementurl(ms.getValue()));
Optional
.ofNullable(d.getDatabaseaccesstype())
.ifPresent(ar -> datasource.setAccessrights(ar.getValue()));
Optional
.ofNullable(d.getDatauploadtype())
.ifPresent(dut -> datasource.setUploadrights(dut.getValue()));
Optional
.ofNullable(d.getDatabaseaccessrestriction())
.ifPresent(dar -> datasource.setDatabaseaccessrestriction(dar.getValue()));
Optional
.ofNullable(d.getDatauploadrestriction())
.ifPresent(dur -> datasource.setDatauploadrestriction(dur.getValue()));
Optional
.ofNullable(d.getVersioning())
.ifPresent(v -> datasource.setVersioning(v.getValue()));
Optional
.ofNullable(d.getCitationguidelineurl())
.ifPresent(cu -> datasource.setCitationguidelineurl(cu.getValue()));
Optional
.ofNullable(d.getPidsystems())
.ifPresent(ps -> datasource.setPidsystems(ps.getValue()));
Optional
.ofNullable(d.getCertificates())
.ifPresent(c -> datasource.setCertificates(c.getValue()));
Optional
.ofNullable(d.getPolicies())
.ifPresent(ps -> datasource.setPolicies(ps.stream().map(p -> p.getValue()).collect(Collectors.toList())));
Optional
.ofNullable(d.getJournal())
.ifPresent(j -> datasource.setJournal(getContainer(j)));
return datasource;
}
private static Container getContainer(Journal j) {
Container c = new Container();
Optional
.ofNullable(j.getName())
.ifPresent(n -> c.setName(n));
Optional
.ofNullable(j.getIssnPrinted())
.ifPresent(issnp -> c.setIssnPrinted(issnp));
Optional
.ofNullable(j.getIssnOnline())
.ifPresent(issno -> c.setIssnOnline(issno));
Optional
.ofNullable(j.getIssnLinking())
.ifPresent(isnl -> c.setIssnLinking(isnl));
Optional
.ofNullable(j.getEp())
.ifPresent(ep -> c.setEp(ep));
Optional
.ofNullable(j.getIss())
.ifPresent(iss -> c.setIss(iss));
Optional
.ofNullable(j.getSp())
.ifPresent(sp -> c.setSp(sp));
Optional
.ofNullable(j.getVol())
.ifPresent(vol -> c.setVol(vol));
Optional
.ofNullable(j.getEdition())
.ifPresent(edition -> c.setEdition(edition));
Optional
.ofNullable(j.getConferencedate())
.ifPresent(cdate -> c.setConferencedate(cdate));
Optional
.ofNullable(j.getConferenceplace())
.ifPresent(cplace -> c.setConferenceplace(cplace));
return c;
}
private static Project mapProject(eu.dnetlib.dhp.schema.oaf.Project p) throws DocumentException {
Project project = new Project();
Optional
.ofNullable(p.getId())
.ifPresent(id -> project.setId(id));
Optional
.ofNullable(p.getWebsiteurl())
.ifPresent(w -> project.setWebsiteurl(w.getValue()));
Optional
.ofNullable(p.getCode())
.ifPresent(code -> project.setCode(code.getValue()));
Optional
.ofNullable(p.getAcronym())
.ifPresent(acronynim -> project.setAcronym(acronynim.getValue()));
Optional
.ofNullable(p.getTitle())
.ifPresent(title -> project.setTitle(title.getValue()));
Optional
.ofNullable(p.getStartdate())
.ifPresent(sdate -> project.setStartdate(sdate.getValue()));
Optional
.ofNullable(p.getEnddate())
.ifPresent(edate -> project.setEnddate(edate.getValue()));
Optional
.ofNullable(p.getCallidentifier())
.ifPresent(cide -> project.setCallidentifier(cide.getValue()));
Optional
.ofNullable(p.getKeywords())
.ifPresent(key -> project.setKeywords(key.getValue()));
Optional<Field<String>> omandate = Optional.ofNullable(p.getOamandatepublications());
Optional<Field<String>> oecsc39 = Optional.ofNullable(p.getEcsc39());
boolean mandate = false;
if (omandate.isPresent()) {
if (omandate.get().getValue().equals("true")) {
mandate = true;
private static void update(KeyValue kv, List<MasterDuplicate> masterDuplicateList) {
for (MasterDuplicate md : masterDuplicateList) {
if (md.getDuplicate().equals(kv.getKey())) {
kv.setKey(md.getMaster());
return;
}
}
if (oecsc39.isPresent()) {
if (oecsc39.get().getValue().equals("true")) {
mandate = true;
}
}
project.setOpenaccessmandateforpublications(mandate);
project.setOpenaccessmandatefordataset(false);
Optional
.ofNullable(p.getEcarticle29_3())
.ifPresent(oamandate -> project.setOpenaccessmandatefordataset(oamandate.getValue().equals("true")));
project
.setSubject(
Optional
.ofNullable(p.getSubjects())
.map(subjs -> subjs.stream().map(s -> s.getValue()).collect(Collectors.toList()))
.orElse(new ArrayList<>()));
Optional
.ofNullable(p.getSummary())
.ifPresent(summary -> project.setSummary(summary.getValue()));
Optional<Float> ofundedamount = Optional.ofNullable(p.getFundedamount());
Optional<Field<String>> ocurrency = Optional.ofNullable(p.getCurrency());
Optional<Float> ototalcost = Optional.ofNullable(p.getTotalcost());
if (ocurrency.isPresent()) {
if (ofundedamount.isPresent()) {
if (ototalcost.isPresent()) {
project
.setGranted(
Granted.newInstance(ocurrency.get().getValue(), ototalcost.get(), ofundedamount.get()));
} else {
project.setGranted(Granted.newInstance(ocurrency.get().getValue(), ofundedamount.get()));
}
}
}
project
.setH2020programme(
Optional
.ofNullable(p.getH2020classification())
.map(
classification -> classification
.stream()
.map(
c -> Programme
.newInstance(
c.getH2020Programme().getCode(), c.getH2020Programme().getDescription()))
.collect(Collectors.toList()))
.orElse(new ArrayList<>()));
Optional<List<Field<String>>> ofundTree = Optional
.ofNullable(p.getFundingtree());
List<Funder> funList = new ArrayList<>();
if (ofundTree.isPresent()) {
for (Field<String> fundingtree : ofundTree.get()) {
funList.add(getFunder(fundingtree.getValue()));
}
}
project.setFunding(funList);
return project;
}
public static Funder getFunder(String fundingtree) throws DocumentException {
Funder f = new Funder();
final Document doc;
doc = new SAXReader().read(new StringReader(fundingtree));
f.setShortName(((Node) (doc.selectNodes("//funder/shortname").get(0))).getText());
f.setName(((Node) (doc.selectNodes("//funder/name").get(0))).getText());
f.setJurisdiction(((Node) (doc.selectNodes("//funder/jurisdiction").get(0))).getText());
String id = "";
StringBuilder bld = new StringBuilder();
int level = 0;
List<Node> nodes = doc.selectNodes("//funding_level_" + level);
while (!nodes.isEmpty()) {
for (Node n : nodes) {
List node = n.selectNodes("./id");
id = ((Node) node.get(0)).getText();
id = id.substring(id.indexOf("::") + 2);
node = n.selectNodes("./description");
bld.append(((Node) node.get(0)).getText() + " - ");
}
level += 1;
nodes = doc.selectNodes("//funding_level_" + level);
}
String description = bld.toString();
if (!id.equals("")) {
Fundings fundings = new Fundings();
fundings.setId(id);
fundings.setDescription(description.substring(0, description.length() - 3).trim());
f.setFunding_stream(fundings);
}
return f;
}
private static <E extends OafEntity> void organizationMap(SparkSession spark, String inputPath, String outputPath,
Class<E> inputClazz) {
Utils
.readPath(spark, inputPath, inputClazz)
.map(
(MapFunction<E, Organization>) o -> mapOrganization((eu.dnetlib.dhp.schema.oaf.Organization) o),
Encoders.bean(Organization.class))
.filter(Objects::nonNull)
.write()
.mode(SaveMode.Overwrite)
.option(COMPRESSION, GZIP)
.json(outputPath);
}
private static Organization mapOrganization(
eu.dnetlib.dhp.schema.oaf.Organization org) {
if (Boolean.TRUE.equals(org.getDataInfo().getDeletedbyinference()))
return null;
Organization organization = new Organization();
Optional
.ofNullable(org.getLegalshortname())
.ifPresent(value -> organization.setLegalshortname(value.getValue()));
Optional
.ofNullable(org.getLegalname())
.ifPresent(value -> organization.setLegalname(value.getValue()));
Optional
.ofNullable(org.getWebsiteurl())
.ifPresent(value -> organization.setWebsiteurl(value.getValue()));
Optional
.ofNullable(org.getAlternativeNames())
.ifPresent(
value -> organization
.setAlternativenames(
value
.stream()
.map(v -> v.getValue())
.collect(Collectors.toList())));
Optional
.ofNullable(org.getCountry())
.ifPresent(
value -> {
if (!value.getClassid().equals(eu.dnetlib.dhp.oa.graph.dump.complete.Constants.UNKNOWN)) {
organization
.setCountry(
eu.dnetlib.dhp.oa.model.Country.newInstance(value.getClassid(), value.getClassname()));
}
});
Optional
.ofNullable(org.getId())
.ifPresent(value -> organization.setId(value));
Optional
.ofNullable(org.getPid())
.ifPresent(
value -> organization
.setPid(
value
.stream()
.map(p -> OrganizationPid.newInstance(p.getQualifier().getClassid(), p.getValue()))
.collect(Collectors.toList())));
return organization;
}
}

View File

@ -59,9 +59,6 @@ public class SparkSelectSubset implements Serializable {
final String outputPath = parser.get("outputPath");
log.info("outputPath: {}", outputPath);
final String masterDuplicatePath = parser.get("masterDuplicatePath");
log.info("masterDuplicatePath: {}", masterDuplicatePath);
Optional<String> rs = Optional.ofNullable(parser.get("removeSet"));
final Set<String> removeSet = new HashSet<>();
if (rs.isPresent()) {
@ -74,14 +71,13 @@ public class SparkSelectSubset implements Serializable {
conf,
isSparkSessionManaged,
spark -> {
selectSubset(spark, inputPath, outputPath, removeSet, masterDuplicatePath);
selectSubset(spark, inputPath, outputPath, removeSet);
});
}
private static void selectSubset(SparkSession spark, String inputPath, String outputPath, Set<String> removeSet,
String masterDuplicatePath) {
private static void selectSubset(SparkSession spark, String inputPath, String outputPath, Set<String> removeSet) {
Dataset<Relation> relation = Utils
.readPath(spark, inputPath + "/relation", Relation.class)
.filter(
@ -275,32 +271,14 @@ public class SparkSelectSubset implements Serializable {
.filter((FilterFunction<String>) s -> !s.equals(ModelConstants.UNKNOWN_REPOSITORY.getKey()))
.distinct();
Dataset<MasterDuplicate> masterDuplicate = Utils.readPath(spark, masterDuplicatePath, MasterDuplicate.class);
Dataset<String> cfhb = cfhb_orig
.joinWith(masterDuplicate, masterDuplicate.col("duplicate").equalTo(cfhb_orig.col("value")), "left")
.map((MapFunction<Tuple2<String, MasterDuplicate>, String>) t2 -> {
if (!Optional.ofNullable(t2._2()).isPresent()) {
return t2._1();
}
return t2._2().getMaster();
}, Encoders.STRING());
datasource
.joinWith(cfhb, datasource.col("id").equalTo(cfhb.col("value")))
.joinWith(cfhb_orig, datasource.col("id").equalTo(cfhb_orig.col("value")))
.map((MapFunction<Tuple2<Datasource, String>, Datasource>) t2 -> t2._1(), Encoders.bean(Datasource.class))
.write()
.mode(SaveMode.Append)
.option("compression", "gzip")
.json(outputPath + "/original/datasource");
cfhb.foreach((ForeachFunction<String>) s -> System.out.println("cf " + s));
datasource.foreach((ForeachFunction<Datasource>) ds -> System.out.println("ds " + ds.getId()));
Utils
.readPath(spark, outputPath + "/original/datasource", Datasource.class)
.foreach((ForeachFunction<Datasource>) ds -> System.out.println("dsID: " + ds.getId()));
Dataset<Project> project = Utils
.readPath(spark, inputPath + "/project", Project.class)
.filter((FilterFunction<Project>) d -> !d.getDataInfo().getDeletedbyinference());
@ -328,17 +306,10 @@ public class SparkSelectSubset implements Serializable {
.readPath(spark, outputPath + "/original/datasource", Datasource.class)
.map((MapFunction<Datasource, String>) d -> d.getId(), Encoders.STRING()));
selectedIDs.foreach((ForeachFunction<String>) s -> System.out.println(s));
relJoinSource = relation
.joinWith(selectedIDs, relation.col("source").equalTo(selectedIDs.col("value")))
.map((MapFunction<Tuple2<Relation, String>, Relation>) t2 -> t2._1(), Encoders.bean(Relation.class));
relJoinSource.foreach((ForeachFunction<Relation>) r -> {
System.out.print("Source : {}" + r.getSource());
System.out.println("Target : {}" + r.getTarget());
});
relJoinSource
.joinWith(selectedIDs, relJoinSource.col("target").equalTo(selectedIDs.col("value")))
.map((MapFunction<Tuple2<Relation, String>, Relation>) t2 -> t2._1(), Encoders.bean(Relation.class))

View File

@ -60,7 +60,15 @@
"paramLongName":"resultType",
"paramDescription": "the map to find fields in the json",
"paramRequired": false
},
{
"paramName":"md",
"paramLongName":"masterDuplicatePath",
"paramDescription": "the map to find fields in the json",
"paramRequired": false
}
]

View File

@ -89,7 +89,20 @@
</configuration>
</global>
<start to="fork_select_and_dump" />
<start to="get_master_duplicate" />
<action name="get_master_duplicate">
<java>
<main-class>eu.dnetlib.dhp.oa.graph.dump.subset.ReadMasterDuplicateFromDB</main-class>
<arg>--hdfsPath</arg><arg>${workingDir}/masterduplicate</arg>
<arg>--hdfsNameNode</arg><arg>${nameNode}</arg>
<arg>--postgresUrl</arg><arg>${postgresURL}</arg>
<arg>--postgresUser</arg><arg>${postgresUser}</arg>
<arg>--postgresPassword</arg><arg>${postgresPassword}</arg>
</java>
<ok to="fork_select_and_dump"/>
<error to="Kill"/>
</action>
<fork name="fork_select_and_dump">
<path start="select_and_dump_publication"/>
@ -122,6 +135,7 @@
<arg>--pathMap</arg><arg>${pathMap}</arg>
<arg>--selectionCriteria</arg><arg>${selectionCriteria}</arg>
<arg>--resultType</arg><arg>publication</arg>
<arg>--masterDuplicatePath</arg><arg>${workingDir}/masterduplicate</arg>
</spark>
<ok to="join_dump"/>
<error to="Kill"/>
@ -151,7 +165,7 @@
<arg>--pathMap</arg><arg>${pathMap}</arg>
<arg>--selectionCriteria</arg><arg>${selectionCriteria}</arg>
<arg>--resultType</arg><arg>dataset</arg>
<arg>--masterDuplicatePath</arg><arg>${workingDir}/masterduplicate</arg>
</spark>
<ok to="join_dump"/>
<error to="Kill"/>
@ -180,6 +194,7 @@
<arg>--pathMap</arg><arg>${pathMap}</arg>
<arg>--selectionCriteria</arg><arg>${selectionCriteria}</arg>
<arg>--resultType</arg><arg>otherresearchproduct</arg>
<arg>--masterDuplicatePath</arg><arg>${workingDir}/masterduplicate</arg>
</spark>
<ok to="join_dump"/>
<error to="Kill"/>
@ -208,25 +223,15 @@
<arg>--pathMap</arg><arg>${pathMap}</arg>
<arg>--selectionCriteria</arg><arg>${selectionCriteria}</arg>
<arg>--resultType</arg><arg>software</arg>
<arg>--masterDuplicatePath</arg><arg>${workingDir}/masterduplicate</arg>
</spark>
<ok to="join_dump"/>
<error to="Kill"/>
</action>
<join name="join_dump" to="get_master_duplicate"/>
<join name="join_dump" to="select_subset"/>
<action name="get_master_duplicate">
<java>
<main-class>eu.dnetlib.dhp.oa.graph.dump.subset.ReadMasterDuplicateFromDB</main-class>
<arg>--hdfsPath</arg><arg>${workingDir}/masterduplicate</arg>
<arg>--hdfsNameNode</arg><arg>${nameNode}</arg>
<arg>--postgresUrl</arg><arg>${postgresURL}</arg>
<arg>--postgresUser</arg><arg>${postgresUser}</arg>
<arg>--postgresPassword</arg><arg>${postgresPassword}</arg>
</java>
<ok to="select_subset"/>
<error to="Kill"/>
</action>
<action name="select_subset">
<spark xmlns="uri:oozie:spark-action:0.2">
@ -249,7 +254,7 @@
<arg>--sourcePath</arg><arg>${sourcePath}</arg>
<arg>--outputPath</arg><arg>${outputPath}</arg>
<arg>--removeSet</arg><arg>${removeSet}</arg>
<arg>--masterDuplicatePath</arg><arg>${workingDir}/masterduplicate</arg>
</spark>
<ok to="fork_dump_otherentities"/>
<error to="Kill"/>

View File

@ -130,7 +130,10 @@ public class DumpSubsetTest {
"-communityMapPath", communityMapPath,
"-pathMap", pathMap,
"-selectionCriteria", constraint,
"-resultType", "publication"
"-resultType", "publication",
"-masterDuplicatePath", getClass()
.getResource("/eu/dnetlib/dhp/oa/graph/dump/subset/masterDuplicate/correspondence")
.getPath()
});
@ -140,13 +143,39 @@ public class DumpSubsetTest {
.textFile(workingDir.toString() + "/dump/publication")
.map(item -> OBJECT_MAPPER.readValue(item, GraphResult.class));
Assertions.assertEquals(15, tmp.count());
Assertions.assertEquals(16, tmp.count());
JavaRDD<Publication> tmp_pubs = sc
.textFile(workingDir.toString() + "/original/publication")
.map(item -> OBJECT_MAPPER.readValue(item, Publication.class));
Assertions.assertEquals(15, tmp_pubs.count());
JavaRDD<Publication> input = sc
.textFile(sourcePath)
.map(item -> OBJECT_MAPPER.readValue(item, Publication.class));
Assertions
.assertTrue(
input
.filter(r -> r.getId().equals("50|dedup_wf_001::01e6a28565ca01376b7548e530c6f6e8"))
.first()
.getCollectedfrom()
.stream()
.anyMatch(
cf -> cf.getKey().equals("10|openaire____::806360c771262b4d6770e7cdf04b5c5a")
&& cf.getValue().equals("ZENODO")));
Assertions.assertEquals(16, tmp_pubs.count());
Assertions
.assertTrue(
tmp_pubs
.filter(r -> r.getId().equals("50|dedup_wf_001::01e6a28565ca01376b7548e530c6f6e8"))
.first()
.getCollectedfrom()
.stream()
.anyMatch(
cf -> cf.getKey().equals("10|fairsharing_::cd0f74b5955dc87fd0605745c4b49ee8")
&& cf.getValue().equals("ZENODO")));
tmp_pubs.foreach(p -> System.out.println(OBJECT_MAPPER.writeValueAsString(p)));
}
@ -179,7 +208,10 @@ public class DumpSubsetTest {
"-communityMapPath", communityMapPath,
"-pathMap", pathMap,
"-selectionCriteria", constraint,
"-resultType", "publication"
"-resultType", "publication",
"-masterDuplicatePath", getClass()
.getResource("/eu/dnetlib/dhp/oa/graph/dump/subset/masterDuplicate/empty")
.getPath()
});
@ -266,134 +298,7 @@ public class DumpSubsetTest {
new String[] {
"-isSparkSessionManaged", Boolean.FALSE.toString(),
"-sourcePath", sourcePath,
"-outputPath", workingDir.toString(),
"-masterDuplicatePath", getClass()
.getResource("/eu/dnetlib/dhp/oa/graph/dump/subset/masterDuplicate/empty")
.getPath()
});
JavaRDD<Relation> tmp = sc
.textFile(workingDir.toString() + "/original/relation")
.map(item -> OBJECT_MAPPER.readValue(item, Relation.class));
Assertions.assertEquals(20, tmp.count());
Assertions
.assertEquals(
6, tmp.filter(r -> r.getSource().startsWith("50|") && r.getTarget().startsWith("50|")).count());
Assertions
.assertEquals(
3, tmp.filter(r -> r.getSource().startsWith("50|") && r.getTarget().startsWith("20|")).count());
Assertions
.assertEquals(
3, tmp.filter(r -> r.getSource().startsWith("20|") && r.getTarget().startsWith("50|")).count());
Assertions
.assertEquals(
4, tmp.filter(r -> r.getSource().startsWith("40|") && r.getTarget().startsWith("50|")).count());
Assertions
.assertEquals(
1, tmp.filter(r -> r.getSource().startsWith("10|") && r.getTarget().startsWith("20|")).count());
Assertions
.assertEquals(
1, tmp.filter(r -> r.getSource().startsWith("20|") && r.getTarget().startsWith("10|")).count());
Assertions
.assertEquals(
1, tmp.filter(r -> r.getSource().startsWith("20|") && r.getTarget().startsWith("40|")).count());
Assertions
.assertEquals(
1, tmp.filter(r -> r.getSource().startsWith("40|") && r.getTarget().startsWith("20|")).count());
JavaRDD<eu.dnetlib.dhp.schema.oaf.Datasource> tmp_datasource = sc
.textFile(workingDir.toString() + "/original/datasource")
.map(item -> OBJECT_MAPPER.readValue(item, Datasource.class));
Assertions.assertEquals(4, tmp_datasource.count());
Assertions
.assertEquals(
0,
tmp_datasource
.filter(d -> d.getId().equals("10|issn___print::0a79337eaf5145faa478785423273355"))
.count());
JavaRDD<Organization> tmp_organization = sc
.textFile(workingDir.toString() + "/original/organization")
.map(item -> OBJECT_MAPPER.readValue(item, Organization.class));
Assertions.assertEquals(3, tmp_organization.count());
JavaRDD<Project> tmp_project = sc
.textFile(workingDir.toString() + "/original/project")
.map(item -> OBJECT_MAPPER.readValue(item, Project.class));
Assertions.assertEquals(3, tmp_project.count());
}
@Test // Step2
void testSelectSubsetMaster() throws Exception {
final String sourcePath = getClass()
.getResource("/eu/dnetlib/dhp/oa/graph/dump/subset/input/")
.getPath();
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
sc
.textFile(
getClass()
.getResource("/eu/dnetlib/dhp/oa/graph/dump/subset/original/publication")
.getPath())
.saveAsTextFile(workingDir.toString() + "/original/publication");
sc
.textFile(
getClass()
.getResource("/eu/dnetlib/dhp/oa/graph/dump/subset/original/software")
.getPath())
.saveAsTextFile(workingDir.toString() + "/original/software");
sc
.textFile(
getClass()
.getResource("/eu/dnetlib/dhp/oa/graph/dump/subset/original/dataset")
.getPath())
.saveAsTextFile(workingDir.toString() + "/original/dataset");
sc
.textFile(
getClass()
.getResource("/eu/dnetlib/dhp/oa/graph/dump/subset/original/otherresearchproduct")
.getPath())
.saveAsTextFile(workingDir.toString() + "/original/otherresearchproduct");
sc
.textFile(
getClass()
.getResource("/eu/dnetlib/dhp/oa/graph/dump/subset/dump/publication")
.getPath())
.saveAsTextFile(workingDir.toString() + "/dump/publication");
sc
.textFile(
getClass()
.getResource("/eu/dnetlib/dhp/oa/graph/dump/subset/dump/software")
.getPath())
.saveAsTextFile(workingDir.toString() + "/dump/software");
sc
.textFile(
getClass()
.getResource("/eu/dnetlib/dhp/oa/graph/dump/subset/dump/dataset")
.getPath())
.saveAsTextFile(workingDir.toString() + "/dump/dataset");
sc
.textFile(
getClass()
.getResource("/eu/dnetlib/dhp/oa/graph/dump/subset/dump/otherresearchproduct")
.getPath())
.saveAsTextFile(workingDir.toString() + "/dump/otherresearchproduct");
SparkSelectSubset
.main(
new String[] {
"-isSparkSessionManaged", Boolean.FALSE.toString(),
"-sourcePath", sourcePath,
"-outputPath", workingDir.toString(),
"-masterDuplicatePath", getClass()
.getResource("/eu/dnetlib/dhp/oa/graph/dump/subset/masterDuplicate/correspondence")
.getPath()
"-outputPath", workingDir.toString()
});
@ -432,12 +337,11 @@ public class DumpSubsetTest {
.textFile(workingDir.toString() + "/original/datasource")
.map(item -> OBJECT_MAPPER.readValue(item, Datasource.class));
Assertions.assertEquals(5, tmp_datasource.count());
Assertions
.assertEquals(
1,
0,
tmp_datasource
.filter(d -> d.getId().equals("10|fairsharing_::cd0f74b5955dc87fd0605745c4b49ee8"))
.filter(d -> d.getId().equals("10|issn___print::0a79337eaf5145faa478785423273355"))
.count());
JavaRDD<Organization> tmp_organization = sc
@ -607,10 +511,10 @@ public class DumpSubsetTest {
.textFile(workingDir.toString() + "/relation")
.map(item -> OBJECT_MAPPER.readValue(item, eu.dnetlib.dhp.oa.model.graph.Relation.class));
Assertions.assertEquals(80, tmp.count());
Assertions.assertEquals(40, tmp.filter(r -> r.getSource().getId().startsWith("50|")).count());
Assertions.assertEquals(30, tmp.filter(r -> r.getSource().getId().startsWith("10|")).count());
Assertions.assertEquals(10, tmp.filter(r -> r.getSource().getId().startsWith("00|")).count());
Assertions.assertEquals(94, tmp.count());
Assertions.assertEquals(47, tmp.filter(r -> r.getSource().getId().startsWith("50|")).count());
Assertions.assertEquals(36, tmp.filter(r -> r.getSource().getId().startsWith("10|")).count());
Assertions.assertEquals(11, tmp.filter(r -> r.getSource().getId().startsWith("00|")).count());
}
}

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long