WIP: dump of the OpenAIRE graph - Changes #103

Closed
miriam.baglioni wants to merge 77 commits from miriam.baglioni/dnet-hadoop:dump into master
59 changed files with 2900 additions and 258 deletions

View File

@ -90,6 +90,13 @@ public class MakeTarArchive implements Serializable {
String p_string = p.toString();
if (!p_string.endsWith("_SUCCESS")) {
String name = p_string.substring(p_string.lastIndexOf("/") + 1);
if (name.startsWith("part-") & name.length() > 10) {
String tmp = name.substring(0, 10);
if (name.contains(".")) {
tmp += name.substring(name.indexOf("."));
}
name = tmp;
}
TarArchiveEntry entry = new TarArchiveEntry(dir_name + "/" + name);
entry.setSize(fileStatus.getLen());
current_size += fileStatus.getLen();

View File

@ -24,8 +24,6 @@ public class Constants {
public static String RESEARCH_INFRASTRUCTURE = "Research Infrastructure/Initiative";
public static String ORCID = "orcid";
static {
accessRightsCoarMap.put("OPEN", "c_abf2");
accessRightsCoarMap.put("RESTRICTED", "c_16ec");

View File

@ -37,7 +37,8 @@ public class DumpProducts implements Serializable {
isSparkSessionManaged,
spark -> {
Utils.removeOutputDir(spark, outputPath);
execDump(spark, inputPath, outputPath, communityMapPath, inputClazz, outputClazz, dumpType);
execDump(
spark, inputPath, outputPath, communityMapPath, inputClazz, outputClazz, dumpType);
});
}
@ -89,7 +90,7 @@ public class DumpProducts implements Serializable {
return c.getId();
}
if (c.getId().contains("::") && communities.contains(c.getId().substring(0, c.getId().indexOf("::")))) {
return c.getId().substring(0, 3);
return c.getId().substring(0, c.getId().indexOf("::"));
}
return null;
}).filter(Objects::nonNull).collect(Collectors.toList());

View File

@ -17,10 +17,10 @@ public class QueryInformationSystem {
private ISLookUpService isLookUp;
private static final String XQUERY = "for $x in collection('/db/DRIVER/ContextDSResources/ContextDSResourceType') "
private static final String XQUERY_ALL = "for $x in collection('/db/DRIVER/ContextDSResources/ContextDSResourceType') "
+
" where $x//CONFIGURATION/context[./@type='community' or ./@type='ri'] " +
" and ($x//context/param[./@name = 'status']/text() = 'manager' or $x//context/param[./@name = 'status']/text() = 'all') "
" and ($x//context/param[./@name = 'status']/text() = 'all') "
+
" return " +
"<community> " +
@ -28,9 +28,22 @@ public class QueryInformationSystem {
"{$x//CONFIGURATION/context/@label}" +
"</community>";
public CommunityMap getCommunityMap()
private static final String XQUERY_CI = "for $x in collection('/db/DRIVER/ContextDSResources/ContextDSResourceType') "
+
" where $x//CONFIGURATION/context[./@type='community' or ./@type='ri'] " +
" and $x//CONFIGURATION/context[./@id=%s] "
+
" return " +
"<community> " +
"{$x//CONFIGURATION/context/@id}" +
"{$x//CONFIGURATION/context/@label}" +
"</community>";
public CommunityMap getCommunityMap(boolean singleCommunity, String community_id)
throws ISLookUpException, DocumentException {
return getMap(isLookUp.quickSearchProfile(XQUERY));
if (singleCommunity)
return getMap(isLookUp.quickSearchProfile(XQUERY_CI.replace("%s", "'" + community_id + "'")));
return getMap(isLookUp.quickSearchProfile(XQUERY_ALL));
}

View File

@ -424,6 +424,19 @@ public class ResultMapper implements Serializable {
.ifPresent(value -> instance.setType(value.getClassname()));
Optional.ofNullable(i.getUrl()).ifPresent(value -> instance.setUrl(value));
Optional<Field<String>> oPca = Optional.ofNullable(i.getProcessingchargeamount());
Optional<Field<String>> oPcc = Optional.ofNullable(i.getProcessingchargecurrency());
if (oPca.isPresent() && oPcc.isPresent()) {
Field<String> pca = oPca.get();
Field<String> pcc = oPcc.get();
if (!pca.getValue().trim().equals("") && !pcc.getValue().trim().equals("")) {
APC apc = new APC();
apc.setCurrency(oPcc.get().getValue());
apc.setAmount(oPca.get().getValue());
instance.setArticleprocessingcharge(apc);
}
}
}
private static List<Provenance> getUniqueProvenance(List<Provenance> provenance) {
@ -503,7 +516,7 @@ public class ResultMapper implements Serializable {
private static Pid getOrcid(List<StructuredProperty> p) {
for (StructuredProperty pid : p) {
if (pid.getQualifier().getClassid().equals(Constants.ORCID)) {
if (pid.getQualifier().getClassid().equals(ModelConstants.ORCID)) {
Optional<DataInfo> di = Optional.ofNullable(pid.getDataInfo());
if (di.isPresent()) {
return Pid

View File

@ -6,6 +6,7 @@ import java.io.IOException;
import java.io.OutputStreamWriter;
import java.io.Serializable;
import java.nio.charset.StandardCharsets;
import java.util.Optional;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
@ -71,14 +72,25 @@ public class SaveCommunityMap implements Serializable {
final String isLookUpUrl = parser.get("isLookUpUrl");
log.info("isLookUpUrl: {}", isLookUpUrl);
final Boolean singleCommunity = Optional
.ofNullable(parser.get("singleDeposition"))
.map(Boolean::valueOf)
.orElse(false);
final String community_id = Optional.ofNullable(parser.get("communityId")).orElse(null);
final SaveCommunityMap scm = new SaveCommunityMap(outputPath, nameNode, isLookUpUrl);
scm.saveCommunityMap();
scm.saveCommunityMap(singleCommunity, community_id);
}
private void saveCommunityMap() throws ISLookUpException, IOException, DocumentException {
writer.write(Utils.OBJECT_MAPPER.writeValueAsString(queryInformationSystem.getCommunityMap()));
private void saveCommunityMap(boolean singleCommunity, String community_id)
throws ISLookUpException, IOException, DocumentException {
writer
.write(
Utils.OBJECT_MAPPER
.writeValueAsString(queryInformationSystem.getCommunityMap(singleCommunity, community_id)));
writer.close();
}
}

View File

@ -1,6 +1,7 @@
package eu.dnetlib.dhp.oa.graph.dump;
import java.io.IOException;
import java.io.Serializable;
import java.util.Optional;
@ -9,6 +10,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.*;
import org.jetbrains.annotations.NotNull;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.common.api.MissingConceptDoiException;
@ -48,15 +50,12 @@ public class SendToZenodoHDFS implements Serializable {
.orElse(false);
final String depositionId = Optional.ofNullable(parser.get("depositionId")).orElse(null);
final String communityMapPath = parser.get("communityMapPath");
Configuration conf = new Configuration();
conf.set("fs.defaultFS", hdfsNameNode);
FileSystem fileSystem = FileSystem.get(conf);
CommunityMap communityMap = Utils.readCommunityMap(fileSystem, communityMapPath);
RemoteIterator<LocatedFileStatus> fileStatusListIterator = fileSystem
.listFiles(
new Path(hdfsPath), true);
@ -87,11 +86,6 @@ public class SendToZenodoHDFS implements Serializable {
if (!p_string.endsWith("_SUCCESS")) {
// String tmp = p_string.substring(0, p_string.lastIndexOf("/"));
String name = p_string.substring(p_string.lastIndexOf("/") + 1);
log.info("Sending information for community: " + name);
if (communityMap.containsKey(name.substring(0, name.lastIndexOf(".")))) {
name = communityMap.get(name.substring(0, name.lastIndexOf("."))).replace(" ", "_") + ".tar";
}
FSDataInputStream inputStream = fileSystem.open(p);
zenodoApiClient.uploadIS(inputStream, name, fileStatus.getLen());

View File

@ -34,12 +34,14 @@ public class CommunitySplit implements Serializable {
isSparkSessionManaged,
spark -> {
Utils.removeOutputDir(spark, outputPath);
execSplit(spark, inputPath, outputPath, Utils.getCommunityMap(spark, communityMapPath).keySet());
CommunityMap communityMap = Utils.getCommunityMap(spark, communityMapPath);
execSplit(spark, inputPath, outputPath, communityMap);
});
}
private static void execSplit(SparkSession spark, String inputPath, String outputPath,
Set<String> communities) {
CommunityMap communities) {
Dataset<CommunityResult> result = Utils
.readPath(spark, inputPath + "/publication", CommunityResult.class)
@ -48,8 +50,9 @@ public class CommunitySplit implements Serializable {
.union(Utils.readPath(spark, inputPath + "/software", CommunityResult.class));
communities
.keySet()
.stream()
.forEach(c -> printResult(c, result, outputPath));
.forEach(c -> printResult(c, result, outputPath + "/" + communities.get(c).replace(" ", "_")));
}
@ -57,16 +60,11 @@ public class CommunitySplit implements Serializable {
Dataset<CommunityResult> community_products = result
.filter((FilterFunction<CommunityResult>) r -> containsCommunity(r, c));
try {
community_products.first();
community_products
.write()
.option("compression", "gzip")
.mode(SaveMode.Overwrite)
.json(outputPath + "/" + c);
} catch (Exception e) {
}
community_products
.write()
.option("compression", "gzip")
.mode(SaveMode.Overwrite)
.json(outputPath);
}
@ -75,9 +73,9 @@ public class CommunitySplit implements Serializable {
return r
.getContext()
.stream()
.filter(con -> con.getCode().equals(c))
.map(con -> con.getCode())
.collect(Collectors.toList())
.size() > 0;
.contains(c);
}
return false;
}

View File

@ -70,10 +70,10 @@ public class CreateContextRelation implements Serializable {
cce.execute(Process::getRelation, CONTEX_RELATION_DATASOURCE, ModelSupport.getIdPrefix(Datasource.class));
log.info("Creating relations for projects... ");
// cce
// .execute(
// Process::getRelation, CONTEX_RELATION_PROJECT,
// ModelSupport.getIdPrefix(eu.dnetlib.dhp.schema.oaf.Project.class));
cce
.execute(
Process::getRelation, CONTEX_RELATION_PROJECT,
ModelSupport.getIdPrefix(eu.dnetlib.dhp.schema.oaf.Project.class));
cce.close();

View File

@ -9,6 +9,7 @@ import java.util.*;
import java.util.stream.Collectors;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FilterFunction;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SaveMode;
@ -453,6 +454,7 @@ public class DumpGraphEntities implements Serializable {
.map(
(MapFunction<E, Organization>) o -> mapOrganization((eu.dnetlib.dhp.schema.oaf.Organization) o),
Encoders.bean(Organization.class))
.filter((FilterFunction<Organization>) o -> o != null)
.write()
.mode(SaveMode.Overwrite)
.option("compression", "gzip")
@ -461,6 +463,8 @@ public class DumpGraphEntities implements Serializable {
private static Organization mapOrganization(eu.dnetlib.dhp.schema.oaf.Organization org) {
Organization organization = new Organization();
if (org.getDataInfo().getDeletedbyinference())
return null;
Optional
.ofNullable(org.getLegalshortname())

View File

@ -147,7 +147,7 @@ public class Extractor implements Serializable {
.map(
paction -> Provenance
.newInstance(
paction.getClassid(),
paction.getClassname(),
dinfo.getTrust()))
.orElse(
Provenance

View File

@ -35,7 +35,7 @@ public class Process implements Serializable {
ri.setType(Constants.RESEARCH_INFRASTRUCTURE);
}
ri.setId(Utils.getContextId(ci.getId()));
ri.setOriginalId(ci.getId());
ri.setAcronym(ci.getId());
ri.setDescription(ci.getDescription());
ri.setName(ci.getName());

View File

@ -12,6 +12,7 @@ import org.dom4j.Node;
import org.dom4j.io.SAXReader;
import org.jetbrains.annotations.NotNull;
import eu.dnetlib.dhp.utils.DHPUtils;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
@ -113,14 +114,72 @@ public class QueryInformationSystem {
@NotNull
private List<String> getCategoryList(Element el, String prefix) {
List<String> datasourceList = new ArrayList<>();
for (Object node : el.selectNodes(".//param")) {
Node n = (Node) node;
if (n.valueOf("./@name").equals("openaireId")) {
datasourceList.add(prefix + "|" + n.getText());
}
for (Object node : el.selectNodes(".//concept")) {
String oid = getOpenaireId((Node) node, prefix);
if (oid != null)
datasourceList.add(oid);
}
return datasourceList;
}
private String getOpenaireId(Node el, String prefix) {
for (Object node : el.selectNodes(".//param")) {
Node n = (Node) node;
if (n.valueOf("./@name").equals("openaireId")) {
return prefix + "|" + n.getText();
}
}
return makeOpenaireId(el, prefix);
}
private String makeOpenaireId(Node el, String prefix) {
String funder = null;
String grantId = null;
String funding = null;
for (Object node : el.selectNodes(".//param")) {
Node n = (Node) node;
switch (n.valueOf("./@name")) {
case "funding":
funding = n.getText();
break;
case "funder":
funder = n.getText();
break;
case "CD_PROJECT_NUMBER":
grantId = n.getText();
break;
}
}
String nsp = null;
switch (funder.toLowerCase()) {
case "ec":
if (funding == null) {
return null;
}
if (funding.toLowerCase().contains("h2020")) {
nsp = "corda__h2020::";
} else {
nsp = "corda_______::";
}
break;
case "tubitak":
nsp = "tubitakf____::";
break;
case "dfg":
nsp = "dfgf________::";
break;
default:
nsp = funder.toLowerCase();
for (int i = funder.length(); i < 12; i++)
nsp += "_";
nsp += "::";
}
return prefix + "|" + nsp + DHPUtils.md5(grantId);
}
}

View File

@ -0,0 +1,137 @@
package eu.dnetlib.dhp.oa.graph.dump.complete;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import java.io.Serializable;
import java.util.Optional;
import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.oa.graph.dump.Utils;
import eu.dnetlib.dhp.schema.oaf.*;
/**
* It selects the valid relations among those present in the graph. One relation is valid if it is not deletedbyinference
* and if both the source and the target node are present in the graph and are not deleted by inference nor invisible.
* To check this I made a view of the ids of all the entities in the graph, and select the relations for which a join exists
* with this view for both the source and the target
*/
public class SparkSelectValidRelationsJob implements Serializable {
private static final Logger log = LoggerFactory.getLogger(SparkSelectValidRelationsJob.class);
public static void main(String[] args) throws Exception {
String jsonConfiguration = IOUtils
.toString(
SparkSelectValidRelationsJob.class
.getResourceAsStream(
"/eu/dnetlib/dhp/oa/graph/dump/complete/input_relationdump_parameters.json"));
final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
parser.parseArgument(args);
Boolean isSparkSessionManaged = Optional
.ofNullable(parser.get("isSparkSessionManaged"))
.map(Boolean::valueOf)
.orElse(Boolean.TRUE);
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
final String inputPath = parser.get("sourcePath");
log.info("inputPath: {}", inputPath);
final String outputPath = parser.get("outputPath");
log.info("outputPath: {}", outputPath);
SparkConf conf = new SparkConf();
runWithSparkSession(
conf,
isSparkSessionManaged,
spark -> {
Utils.removeOutputDir(spark, outputPath);
selectValidRelation(spark, inputPath, outputPath);
});
}
private static void selectValidRelation(SparkSession spark, String inputPath, String outputPath) {
Dataset<Relation> relation = Utils.readPath(spark, inputPath + "/relation", Relation.class);
Dataset<Publication> publication = Utils.readPath(spark, inputPath + "/publication", Publication.class);
Dataset<eu.dnetlib.dhp.schema.oaf.Dataset> dataset = Utils
.readPath(spark, inputPath + "/dataset", eu.dnetlib.dhp.schema.oaf.Dataset.class);
Dataset<Software> software = Utils.readPath(spark, inputPath + "/software", Software.class);
Dataset<OtherResearchProduct> other = Utils
.readPath(spark, inputPath + "/otherresearchproduct", OtherResearchProduct.class);
Dataset<Organization> organization = Utils.readPath(spark, inputPath + "/organization", Organization.class);
Dataset<Project> project = Utils.readPath(spark, inputPath + "/project", Project.class);
Dataset<Datasource> datasource = Utils.readPath(spark, inputPath + "/datasource", Datasource.class);
relation.createOrReplaceTempView("relation");
publication.createOrReplaceTempView("publication");
dataset.createOrReplaceTempView("dataset");
other.createOrReplaceTempView("other");
software.createOrReplaceTempView("software");
organization.createOrReplaceTempView("organization");
project.createOrReplaceTempView("project");
datasource.createOrReplaceTempView("datasource");
spark
.sql(
"SELECT id " +
"FROM publication " +
"WHERE datainfo.deletedbyinference = false AND datainfo.invisible = false " +
"UNION ALL " +
"SELECT id " +
"FROM dataset " +
"WHERE datainfo.deletedbyinference = false AND datainfo.invisible = false " +
"UNION ALL " +
"SELECT id " +
"FROM other " +
"WHERE datainfo.deletedbyinference = false AND datainfo.invisible = false " +
"UNION ALL " +
"SELECT id " +
"FROM software " +
"WHERE datainfo.deletedbyinference = false AND datainfo.invisible = false " +
"UNION ALL " +
"SELECT id " +
"FROM organization " +
"WHERE datainfo.deletedbyinference = false AND datainfo.invisible = false " +
"UNION ALL " +
"SELECT id " +
"FROM project " +
"WHERE datainfo.deletedbyinference = false AND datainfo.invisible = false " +
"UNION ALL " +
"SELECT id " +
"FROM datasource " +
"WHERE datainfo.deletedbyinference = false AND datainfo.invisible = false ")
.createOrReplaceTempView("identifiers");
spark
.sql(
"SELECT relation.* " +
"FROM relation " +
"JOIN identifiers i1 " +
"ON source = i1.id " +
"JOIN identifiers i2 " +
"ON target = i2.id " +
"WHERE datainfo.deletedbyinference = false")
.as(Encoders.bean(Relation.class))
.write()
.option("compression", "gzip")
.mode(SaveMode.Overwrite)
.json(outputPath);
;
}
}

View File

@ -54,8 +54,8 @@ public class SparkDumpFunderResults implements Serializable {
final String outputPath = parser.get("outputPath");
log.info("outputPath: {}", outputPath);
final String relationPath = parser.get("relationPath");
log.info("relationPath: {}", relationPath);
final String graphPath = parser.get("graphPath");
log.info("relationPath: {}", graphPath);
SparkConf conf = new SparkConf();
@ -64,18 +64,18 @@ public class SparkDumpFunderResults implements Serializable {
isSparkSessionManaged,
spark -> {
Utils.removeOutputDir(spark, outputPath);
writeResultProjectList(spark, inputPath, outputPath, relationPath);
writeResultProjectList(spark, inputPath, outputPath, graphPath);
});
}
private static void writeResultProjectList(SparkSession spark, String inputPath, String outputPath,
String relationPath) {
String graphPath) {
Dataset<Relation> relation = Utils
.readPath(spark, relationPath + "/relation", Relation.class)
.filter(
"dataInfo.deletedbyinference = false and lower(relClass) = '"
+ ModelConstants.IS_PRODUCED_BY.toLowerCase() + "'");
Dataset<eu.dnetlib.dhp.schema.oaf.Project> project = Utils
.readPath(spark, graphPath + "/project", eu.dnetlib.dhp.schema.oaf.Project.class);
// .filter(
// "dataInfo.deletedbyinference = false and lower(relClass) = '"
// + ModelConstants.IS_PRODUCED_BY.toLowerCase() + "'");
Dataset<CommunityResult> result = Utils
.readPath(spark, inputPath + "/publication", CommunityResult.class)
@ -83,8 +83,14 @@ public class SparkDumpFunderResults implements Serializable {
.union(Utils.readPath(spark, inputPath + "/orp", CommunityResult.class))
.union(Utils.readPath(spark, inputPath + "/software", CommunityResult.class));
List<String> funderList = relation
.select("target")
// List<String> funderList = relation
// .select("target")
// .map((MapFunction<Row, String>) value -> value.getString(0).substring(0, 15), Encoders.STRING())
// .distinct()
// .collectAsList();
List<String> funderList = project
.select("id")
.map((MapFunction<Row, String>) value -> value.getString(0).substring(0, 15), Encoders.STRING())
.distinct()
.collectAsList();
@ -102,19 +108,26 @@ public class SparkDumpFunderResults implements Serializable {
} else {
funderdump = fundernsp.substring(0, fundernsp.indexOf("_")).toUpperCase();
}
writeFunderResult(funder, result, outputPath + "/" + funderdump);
writeFunderResult(funder, result, outputPath, funderdump);
});
}
private static void writeFunderResult(String funder, Dataset<CommunityResult> results, String outputPath) {
private static void dumpResults(String nsp, Dataset<CommunityResult> results, String outputPath,
String funderName) {
results.map((MapFunction<CommunityResult, CommunityResult>) r -> {
if (!Optional.ofNullable(r.getProjects()).isPresent()) {
return null;
}
for (Project p : r.getProjects()) {
if (p.getId().startsWith(funder)) {
if (p.getId().startsWith(nsp)) {
if (nsp.startsWith("40|irb")) {
if (p.getFunder().getShortName().equals(funderName))
return r;
else
return null;
}
return r;
}
}
@ -124,7 +137,18 @@ public class SparkDumpFunderResults implements Serializable {
.write()
.mode(SaveMode.Overwrite)
.option("compression", "gzip")
.json(outputPath);
.json(outputPath + "/" + funderName);
}
private static void writeFunderResult(String funder, Dataset<CommunityResult> results, String outputPath,
String funderDump) {
if (funder.startsWith("40|irb")) {
dumpResults(funder, results, outputPath, "HRZZ");
dumpResults(funder, results, outputPath, "MZOS");
} else
dumpResults(funder, results, outputPath, funderDump);
}
}

View File

@ -10,10 +10,7 @@ import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.api.java.function.MapGroupsFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -21,6 +18,7 @@ import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.oa.graph.dump.Constants;
import eu.dnetlib.dhp.oa.graph.dump.Utils;
import eu.dnetlib.dhp.schema.common.ModelConstants;
import eu.dnetlib.dhp.schema.oaf.Project;
import eu.dnetlib.dhp.schema.oaf.Relation;
import eu.dnetlib.dhp.schema.oaf.Result;
import scala.Tuple2;
@ -59,8 +57,8 @@ public class SparkResultLinkedToProject implements Serializable {
final String resultClassName = parser.get("resultTableName");
log.info("resultTableName: {}", resultClassName);
final String relationPath = parser.get("relationPath");
log.info("relationPath: {}", relationPath);
final String graphPath = parser.get("graphPath");
log.info("graphPath: {}", graphPath);
Class<? extends Result> inputClazz = (Class<? extends Result>) Class.forName(resultClassName);
SparkConf conf = new SparkConf();
@ -70,34 +68,58 @@ public class SparkResultLinkedToProject implements Serializable {
isSparkSessionManaged,
spark -> {
Utils.removeOutputDir(spark, outputPath);
writeResultsLinkedToProjects(spark, inputClazz, inputPath, outputPath, relationPath);
writeResultsLinkedToProjects(spark, inputClazz, inputPath, outputPath, graphPath);
});
}
private static <R extends Result> void writeResultsLinkedToProjects(SparkSession spark, Class<R> inputClazz,
String inputPath, String outputPath, String relationPath) {
String inputPath, String outputPath, String graphPath) {
Dataset<R> results = Utils
.readPath(spark, inputPath, inputClazz)
.filter("dataInfo.deletedbyinference = false and datainfo.invisible = false");
Dataset<Relation> relations = Utils
.readPath(spark, relationPath, Relation.class)
.readPath(spark, graphPath + "/relation", Relation.class)
.filter(
"dataInfo.deletedbyinference = false and lower(relClass) = '"
+ ModelConstants.IS_PRODUCED_BY.toLowerCase() + "'");
Dataset<Project> project = Utils.readPath(spark, graphPath + "/project", Project.class);
relations
.joinWith(
results, relations.col("source").equalTo(results.col("id")),
"inner")
.groupByKey(
(MapFunction<Tuple2<Relation, R>, String>) value -> value
._2()
results.createOrReplaceTempView("result");
relations.createOrReplaceTempView("relation");
project.createOrReplaceTempView("project");
Dataset<R> tmp = spark
.sql(
"Select res.* " +
"from relation rel " +
"join result res " +
"on rel.source = res.id " +
"join project p " +
"on rel.target = p.id " +
"")
.as(Encoders.bean(inputClazz))
;
tmp.groupByKey(
(MapFunction< R, String>) value -> value
.getId(),
Encoders.STRING())
.mapGroups((MapGroupsFunction<String, Tuple2<Relation, R>, R>) (k, it) -> {
return it.next()._2();
}, Encoders.bean(inputClazz))
.mapGroups((MapGroupsFunction<String, R, R>) (k, it) -> it.next(), Encoders.bean(inputClazz))
//
// relations
// .joinWith(
// results, relations.col("source").equalTo(results.col("id")),
// "inner")
// .groupByKey(
// (MapFunction<Tuple2<Relation, R>, String>) value -> value
// ._2()
// .getId(),
// Encoders.STRING())
// .mapGroups((MapGroupsFunction<String, Tuple2<Relation, R>, R>) (k, it) -> {
// return it.next()._2();
// }, Encoders.bean(inputClazz))
//tmp
.write()
.mode(SaveMode.Overwrite)
.option("compression", "gzip")

View File

@ -1,6 +1,18 @@
<workflow-app name="dump_community_products" xmlns="uri:oozie:workflow:0.5">
<parameters>
<property>
<name>singleDeposition</name>
<description>Indicates if each file in the directory should be uploaded in a own deposition</description>
</property>
<property>
<name>upload</name>
<description>true if the dump should be upload in zenodo</description>
</property>
<property>
<name>communityId</name>
<description>the id of the community to be dumped if a dump for a single community should be done</description>
</property>
<property>
<name>sourcePath</name>
<description>the source path</description>
@ -122,11 +134,14 @@
<arg>--outputPath</arg><arg>${workingDir}/communityMap</arg>
<arg>--nameNode</arg><arg>${nameNode}</arg>
<arg>--isLookUpUrl</arg><arg>${isLookUpUrl}</arg>
<arg>--singleDeposition</arg><arg>${singleDeposition}</arg>
<arg>--communityId</arg><arg>${communityId}</arg>
</java>
<ok to="fork_dump"/>
<error to="Kill"/>
</action>
<fork name="fork_dump">
<path start="dump_publication"/>
<path start="dump_dataset"/>
@ -405,10 +420,16 @@
<arg>--nameNode</arg><arg>${nameNode}</arg>
<arg>--sourcePath</arg><arg>${workingDir}/split</arg>
</java>
<ok to="send_zenodo"/>
<ok to="should_upload"/>
<error to="Kill"/>
</action>
<decision name="should_upload">
<switch>
<case to="send_zenodo">${wf:conf('upload') eq true}</case>
<default to="End"/>
</switch>
</decision>
<action name="send_zenodo">
<java>
<main-class>eu.dnetlib.dhp.oa.graph.dump.SendToZenodoHDFS</main-class>
@ -417,7 +438,6 @@
<arg>--accessToken</arg><arg>${accessToken}</arg>
<arg>--connectionUrl</arg><arg>${connectionUrl}</arg>
<arg>--metadata</arg><arg>${metadata}</arg>
<arg>--communityMapPath</arg><arg>${workingDir}/communityMap</arg>
<arg>--conceptRecordId</arg><arg>${conceptRecordId}</arg>
<arg>--depositionId</arg><arg>${depositionId}</arg>
<arg>--depositionType</arg><arg>${depositionType}</arg>

View File

@ -1,9 +1,9 @@
{
"$schema": "http://json-schema.org/draft-07/schema#",
"definitions": {
"AccessRight":{
"type":"object",
"properties":{
"AccessRight": {
"type": "object",
"properties": {
"code": {
"type": "string",
"description": "COAR access mode code: http://vocabularies.coar-repositories.org/documentation/access_rights/"
@ -22,40 +22,65 @@
"type": "object",
"properties": {
"scheme": {
"type": "string"
"type": "string",
"description": "The scheme for the resource"
},
"value": {
"type": "string"
"type": "string",
"description": "the value in the scheme"
}
},
"description": "To represent the information described by a scheme and a value in that scheme (i.e. pid)"
}
},
"KeyValue": {
"type": "object",
"properties": {
"key": {
"type": "string",
"description": "Description of key"
},
"value": {
"type": "string",
"description": "Description of value"
}
}
},
"Provenance": {
"type": "object",
"properties": {
"provenance": {
"type": "string",
"description": "The process that produced/provided the information"
"description": "The provenance of the information"
},
"trust": {
"type": "string"
"type": "string",
"description": "The trust associated to the information"
}
},
"description": "Indicates the process that produced (or provided) the information, and the trust associated to the information"
}
}
},
"type": "object",
"properties": {
"author": {
"description": "List of authors of the research results",
"type": "array",
"items": {
"type": "object",
"properties": {
"affiliation": {
"description": "Affiliations of the author",
"type": "array",
"items": {
"type": "string",
"description": "One of the affiliation of the author"
}
},
"fullname": {
"type": "string"
"type": "string",
"description": "Fullname of the author"
},
"name": {
"type": "string"
"type": "string",
"description": "First name of the author"
},
"pid": {
"type": "object",
@ -69,64 +94,64 @@
"provenance": {
"allOf": [
{"$ref": "#/definitions/Provenance"},
{"description": "Provenance of author's pid"}
{"description": "The provenance of the author's pid"}
]
}
}
},
"description": "Persistent identifier of the author (e.g. ORCID)"
},
"rank": {
"type": "integer"
"type": "integer",
"description": "Order in which the author appears in the authors list"
},
"surname": {
"type": "string"
"type": "string",
"description": "Surname of the author"
}
}
},
"description": "One of the author of the research result"
}
},
"bestaccessright": {
"type": "object",
"properties": {
"code": {
"type": "string",
"description": "COAR access mode code: http://vocabularies.coar-repositories.org/documentation/access_rights/"
},
"label": {
"type": "string",
"description": "Label for the access mode"
},
"scheme": {
"type": "string",
"description": "Scheme of reference for access right code. Always set to COAR access rights vocabulary: http://vocabularies.coar-repositories.org/documentation/access_rights/"
}
},
"description": "The openest access right associated to the manifestations of this research results"
"allOf": [
{"$ref": "#/definitions/AccessRight"},
{"description": "The openest access right associated to the manifestations of this research results"}
]
},
"codeRepositoryUrl": {
"type": "string",
"description": "Only for results with type 'software': the URL to the repository with the source code"
},
"collectedfrom": {
"description": "Information about the sources from which the record has been collected",
"type": "array",
"items": {
"allOf": [
{"$ref": "#/definitions/KeyValue"},
{"description": "Key is the OpenAIRE identifier of the data source, value is its name"}
]
}
},
"contactgroup": {
"description": "Only for results with type 'software': Information on the group responsible for providing further information regarding the resource",
"type": "array",
"items": {
"type": "string"
}
"items": {"type": "string"}
},
"contactperson": {
"description": "Only for results with type 'software': Information on the person responsible for providing further information regarding the resource",
"type": "array",
"items": {
"type": "string"
}
"items": {"type": "string"}
},
"container": {
"type": "object",
"properties": {
"conferencedate": {
"type": "string"
"type": "string",
"description": "Date of the conference"
},
"conferenceplace": {
"type": "string"
"type": "string",
"description": "Place of the conference"
},
"edition": {
"type": "string",
@ -141,13 +166,16 @@
"description": "Journal issue"
},
"issnLinking": {
"type": "string"
"type": "string",
"description": "Journal linking iisn"
},
"issnOnline": {
"type": "string"
"type": "string",
"description": "Journal online issn"
},
"issnPrinted": {
"type": "string"
"type": "string",
"description": "Journal printed issn"
},
"name": {
"type": "string",
@ -155,22 +183,48 @@
},
"sp": {
"type": "string",
"description": "start page"
"description": "Start page"
},
"vol": {
"type": "string"
"type": "string",
"description": "Volume"
}
},
"description": "Container has information about the conference or journal where the result has been presented or published"
},
"contributor": {
"context": {
"description": "Reference to a relevant research infrastructure, initiative or community (RI/RC) among those collaborating with OpenAIRE. Please see https://connect.openaire.eu",
"type": "array",
"items": {
"type": "string",
"description": "Description of contributor"
"type": "object",
"properties": {
"code": {
"type": "string",
"description": "Code identifying the RI/RC"
},
"label": {
"type": "string",
"description": "Label of the RI/RC"
},
"provenance": {
"description": "Why this result is associated to the RI/RC.",
"type": "array",
"items": {
"allOf": [
{"$ref": "#/definitions/Provenance"}
]
}
}
}
}
},
"contributor": {
"description": "Contributors of this result",
"type": "array",
"items": {"type": "string"}
},
"country": {
"description": "Country associated to this result",
"type": "array",
"items": {
"type": "object",
@ -180,7 +234,8 @@
"description": "ISO 3166-1 alpha-2 country code"
},
"label": {
"type": "string"
"type": "string",
"description": "English label of the country"
},
"provenance": {
"allOf": [
@ -193,9 +248,7 @@
},
"coverage": {
"type": "array",
"items": {
"type": "string"
}
"items": {"type": "string"}
},
"dateofcollection": {
"type": "string",
@ -203,16 +256,12 @@
},
"description": {
"type": "array",
"items": {
"type": "string"
}
"items": {"type": "string"}
},
"documentationUrl": {
"description": "Only for results with type 'software': URL to the software documentation",
"type": "array",
"items": {
"type": "string"
}
"items": {"type": "string"}
},
"embargoenddate": {
"type": "string",
@ -220,9 +269,7 @@
},
"format": {
"type": "array",
"items": {
"type": "string"
}
"items": {"type": "string"}
},
"geolocation": {
"description": "Geolocation information",
@ -230,21 +277,71 @@
"items": {
"type": "object",
"properties": {
"box": {
"type": "string"
},
"place": {
"type": "string"
},
"point": {
"type": "string"
}
"box": {"type": "string"},
"place": {"type": "string"},
"point": {"type": "string"}
}
}
},
"id": {
"type": "string",
"description": "OpenAIRE Identifier"
"description": "OpenAIRE identifier"
},
"instance": {
"description": "Manifestations (i.e. different versions) of the result. For example: the pre-print and the published versions are two manifestations of the same research result",
"type": "array",
"items": {
"type": "object",
"properties": {
"accessright": {
"allOf": [
{"$ref": "#/definitions/AccessRight"},
{"description": "Access right of this instance"}
]
},
"articleprocessingcharge": {
"description": "The money spent to make this book or article available in Open Access. Source for this information is the OpenAPC initiative.",
"type": "object",
"properties": {
"amount": {"type": "string"},
"currency": {"type": "string"}
}
},
"collectedfrom": {
"allOf": [
{"$ref": "#/definitions/KeyValue"},
{"description": "Information about the source from which the instance has been collected. Key is the OpenAIRE identifier of the data source, value is its name"}
]
},
"hostedby": {
"allOf": [
{"$ref": "#/definitions/KeyValue"},
{"description": "Information about the source from which the instance can be viewed or downloaded. Key is the OpenAIRE identifier of the data source, value is its name"}
]
},
"license": {
"type": "string",
"description": "License applied to the instance"
},
"publicationdate": {
"type": "string",
"description": "Publication date of the instance"
},
"refereed": {
"type": "string",
"description": "Was the instance subject to peer-review? Possible values are 'Unknown', 'nonPeerReviewed', 'peerReviewed' (see also https://api.openaire.eu/vocabularies/dnet:review_levels)"
},
"type": {
"type": "string",
"description": "Type of the instance. Possible values are listed at https://api.openaire.eu/vocabularies/dnet:publication_resource"
},
"url": {
"description": "Location where the instance is accessible",
"type": "array",
"items": {"type": "string"}
}
}
}
},
"language": {
"type": "object",
@ -264,14 +361,13 @@
"description": "Timestamp of last update of the record in OpenAIRE"
},
"maintitle": {
"type": "string"
"type": "string",
"description": "Title"
},
"originalId": {
"description": "Identifiers of the record at the original sources",
"type": "array",
"items": {
"type": "string"
}
"items": {"type": "string"}
},
"pid": {
"description": "Persistent identifiers of the result",
@ -283,68 +379,70 @@
]
}
},
"instance":{
"description":"Each instance is one specific materialisation or version of the result. For example, you can have one result with three instance: one is the pre-print, one is the post-print, one is te published version",
"type":"array",
"items":{
"type":"object",
"properties":{
"accessright":{
"allOf":[
{
"$ref":"#/definitions/AccessRight"
},
{
"description":"The accessright of this instance. One result may have multiple instances, each with a different accessright"
}
]
},
"articleprocessingcharge":{
"description": "The money spent to make this book or article available in Open Access. Source for this information is the OpenAPC initiative.",
"type":"object",
"properties":{
"amount":{
"type":"string"
},
"currency":{
"type":"string"
}
}
},
"license":{
"type":"string"
},
"publicationdate":{
"type":"string"
},
"refereed":{
"description": "If this instance has been peer-reviewed or not. Allowed values are peerReviewed, nonPeerReviewed, UNKNOWN (as defined in https://api.openaire.eu/vocabularies/dnet:review_levels)",
"type":"string"
},
"type":{
"type":"string",
"description":"The specific sub-type of this instance (see https://api.openaire.eu/vocabularies/dnet:result_typologies following the links)"
},
"url":{
"description":"URLs to the instance. They may link to the actual full-text or to the landing page at the hosting source. ",
"type":"array",
"items":{
"type":"string"
}
}
}
}
},
"programmingLanguage": {
"type": "string",
"description": "Only for results with type 'software': the programming language"
},
"projects": {
"description": "List of projects (i.e. grants) that (co-)funded the production ofn the research results",
"type": "array",
"items": {
"type": "object",
"properties": {
"acronym": {
"type": "string",
"description": "Project acronym"
},
"code": {
"type": "string",
"description": "Grant code"
},
"funder": {
"type": "object",
"properties": {
"fundingStream": {
"type": "string",
"description": "Stream of funding (e.g. for European Commission can be H2020 or FP7)"
},
"jurisdiction": {
"type": "string",
"description": "Geographical jurisdiction (e.g. for European Commission is EU, for Croatian Science Foundation is HR)"
},
"name": {
"type": "string",
"description": "Name of the funder"
},
"shortName": {
"type": "string",
"description": "Short name or acronym of the funder"
}
},
"description": "Information about the funder funding the project"
},
"id": {
"type": "string",
"description": "OpenAIRE identifier of the project"
},
"provenance": {
"allOf": [
{"$ref": "#/definitions/Provenance"},
{"description": "Why this project is associated to the result"}
]
},
"title": {
"type": "string",
"description": "Title of the project"
}
}
}
},
"publicationdate": {
"type": "string"
"type": "string",
"description": "Date of publication"
},
"publisher": {
"type": "string"
"type": "string",
"description": "Publisher"
},
"size": {
"type": "string",
@ -353,9 +451,7 @@
"source": {
"description": "See definition of Dublin Core field dc:source",
"type": "array",
"items": {
"type": "string"
}
"items": {"type": "string"}
},
"subjects": {
"description": "Keywords associated to the result",
@ -372,21 +468,20 @@
"subject": {
"allOf": [
{"$ref": "#/definitions/ControlledField"},
{"description": "OpenAIRE subject classification scheme (https://api.openaire.eu/vocabularies/dnet:subject_classification_typologies) and value. When the scheme is 'keyword', it means that the subject is free-text (i.e. not a term from a controlled vocabulary)."}
{"description": "OpenAIRE subject classification scheme (https://api.openaire.eu/vocabularies/dnet:subject_classification_typologies) and value. When the scheme is 'keyword', it means that the subject is free-text (i.e. not a term from a controlled vocabulary). "}
]
}
}
}
},
"subtitle": {
"type": "string"
"type": "string",
"description": "Sub-title of the result"
},
"tool": {
"description": "Only for results with type 'other': tool useful for the interpretation and/or re-used of the research product",
"type": "array",
"items": {
"type": "string"
}
"items": {"type": "string"}
},
"type": {
"type": "string",

View File

@ -1,6 +1,10 @@
<workflow-app name="dump_complete_graph" xmlns="uri:oozie:workflow:0.5">
<parameters>
<property>
<name>upload</name>
<description>true if the dump should be upload in zenodo</description>
</property>
<property>
<name>sourcePath</name>
<description>the source path</description>
@ -148,7 +152,7 @@
<path start="dump_organization"/>
<path start="dump_project"/>
<path start="dump_datasource"/>
<path start="dump_relation"/>
<path start="select_relation"/>
</fork>
<action name="dump_publication">
@ -333,6 +337,30 @@
<error to="Kill"/>
</action>
<action name="select_relation">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Select valid table relation </name>
<class>eu.dnetlib.dhp.oa.graph.dump.complete.SparkSelectValidRelationsJob</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
</spark-opts>
<arg>--sourcePath</arg><arg>${sourcePath}</arg>
<arg>--outputPath</arg><arg>${workingDir}/validrelation</arg>
</spark>
<ok to="dump_relation"/>
<error to="Kill"/>
</action>
<action name="dump_relation">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
@ -350,7 +378,7 @@
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
</spark-opts>
<arg>--sourcePath</arg><arg>${sourcePath}/relation</arg>
<arg>--sourcePath</arg><arg>${workingDir}/validrelation</arg>
<arg>--outputPath</arg><arg>${workingDir}/relation/relation</arg>
</spark>
<ok to="join_dump"/>
@ -560,10 +588,17 @@
<arg>--nameNode</arg><arg>${nameNode}</arg>
<arg>--sourcePath</arg><arg>${workingDir}/collect</arg>
</java>
<ok to="send_zenodo"/>
<ok to="should_upload"/>
<error to="Kill"/>
</action>
<decision name="should_upload">
<switch>
<case to="send_zenodo">${wf:conf('upload') eq true}</case>
<default to="End"/>
</switch>
</decision>
<action name="send_zenodo">
<java>
<main-class>eu.dnetlib.dhp.oa.graph.dump.SendToZenodoHDFS</main-class>
@ -572,7 +607,6 @@
<arg>--accessToken</arg><arg>${accessToken}</arg>
<arg>--connectionUrl</arg><arg>${connectionUrl}</arg>
<arg>--metadata</arg><arg>${metadata}</arg>
<arg>--communityMapPath</arg><arg>${workingDir}/communityMap</arg>
<arg>--conceptRecordId</arg><arg>${conceptRecordId}</arg>
<arg>--depositionType</arg><arg>${depositionType}</arg>
<arg>--depositionId</arg><arg>${depositionId}</arg>

View File

@ -18,8 +18,8 @@
"paramRequired": false
},
{
"paramName": "rp",
"paramLongName": "relationPath",
"paramName": "gp",
"paramLongName": "graphPath",
"paramDescription": "the relationPath",
"paramRequired": true
}

View File

@ -1,9 +1,8 @@
<workflow-app name="dump_funder_results" xmlns="uri:oozie:workflow:0.5">
<parameters>
<property>
<name>upload</name>
<value>false</value>
<value>true</value>
<description>true to upload the dump for the funders in Zenodo</description>
</property>
<property>
@ -163,7 +162,7 @@
<arg>--sourcePath</arg><arg>${sourcePath}/publication</arg>
<arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.Publication</arg>
<arg>--outputPath</arg><arg>${workingDir}/result/publication</arg>
<arg>--relationPath</arg><arg>${sourcePath}/relation</arg>
<arg>--graphPath</arg><arg>${sourcePath}</arg>
</spark>
<ok to="join_link"/>
<error to="Kill"/>
@ -189,7 +188,7 @@
<arg>--sourcePath</arg><arg>${sourcePath}/dataset</arg>
<arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.Dataset</arg>
<arg>--outputPath</arg><arg>${workingDir}/result/dataset</arg>
<arg>--relationPath</arg><arg>${sourcePath}/relation</arg>
<arg>--graphPath</arg><arg>${sourcePath}</arg>
</spark>
<ok to="join_link"/>
<error to="Kill"/>
@ -215,7 +214,7 @@
<arg>--sourcePath</arg><arg>${sourcePath}/otherresearchproduct</arg>
<arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.OtherResearchProduct</arg>
<arg>--outputPath</arg><arg>${workingDir}/result/otherresearchproduct</arg>
<arg>--relationPath</arg><arg>${sourcePath}/relation</arg>
<arg>--graphPath</arg><arg>${sourcePath}</arg>
</spark>
<ok to="join_link"/>
<error to="Kill"/>
@ -241,7 +240,7 @@
<arg>--sourcePath</arg><arg>${sourcePath}/software</arg>
<arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.Software</arg>
<arg>--outputPath</arg><arg>${workingDir}/result/software</arg>
<arg>--relationPath</arg><arg>${sourcePath}/relation</arg>
<arg>--graphPath</arg><arg>${sourcePath}</arg>
</spark>
<ok to="join_link"/>
<error to="Kill"/>
@ -517,7 +516,7 @@
</spark-opts>
<arg>--sourcePath</arg><arg>${workingDir}/ext</arg>
<arg>--outputPath</arg><arg>${workingDir}/resultperfunder</arg>
<arg>--relationPath</arg><arg>${sourcePath}</arg>
<arg>--graphPath</arg><arg>${sourcePath}</arg>
</spark>
<ok to="make_archive"/>
<error to="Kill"/>
@ -549,7 +548,6 @@
<arg>--accessToken</arg><arg>${accessToken}</arg>
<arg>--connectionUrl</arg><arg>${connectionUrl}</arg>
<arg>--metadata</arg><arg>${metadata}</arg>
<arg>--communityMapPath</arg><arg>${workingDir}/communityMap</arg>
<arg>--conceptRecordId</arg><arg>${conceptRecordId}</arg>
<arg>--depositionType</arg><arg>${depositionType}</arg>
<arg>--depositionId</arg><arg>${depositionId}</arg>

View File

@ -18,6 +18,18 @@
"paramLongName": "outputPath",
"paramDescription": "the path used to store temporary output files",
"paramRequired": true
},
{
"paramName": "sd",
"paramLongName": "singleDeposition",
"paramDescription": "true if the dump should be created for a single community",
"paramRequired": false
},
{
"paramName": "ci",
"paramLongName": "communityId",
"paramDescription": "the id of the community for which to create the dump",
"paramRequired": false
}
]

View File

@ -35,7 +35,12 @@
"paramLongName":"dumpType",
"paramDescription": "the type of the dump (complete for the whole graph, community for the products related to communities, funder for the results with at least a link to project",
"paramRequired": false
}
}, {
"paramName":"cid",
"paramLongName":"communityId",
"paramDescription": "the id of the community to be dumped",
"paramRequired": false
}
]

View File

@ -24,9 +24,9 @@
"paramRequired": true
},
{
"paramName":"rp",
"paramLongName":"relationPath",
"paramDescription": "the path to the relations",
"paramName":"gp",
"paramLongName":"graphPath",
"paramDescription": "the path to the graph",
"paramRequired": true
}
]

View File

@ -0,0 +1,25 @@
[
{
"paramName":"ci",
"paramLongName":"communityId",
"paramDescription": "URL of the isLookUp Service",
"paramRequired": true
},
{
"paramName":"nn",
"paramLongName":"nameNode",
"paramDescription": "the name node",
"paramRequired": true
},
{
"paramName": "p",
"paramLongName": "path",
"paramDescription": "the path used to store temporary output files",
"paramRequired": true
}
]

View File

@ -25,6 +25,12 @@
"paramLongName": "isSparkSessionManaged",
"paramDescription": "true if the spark session is managed, false otherwise",
"paramRequired": false
},
{
"paramName": "cid",
"paramLongName": "communityId",
"paramDescription": "true if the spark session is managed, false otherwise",
"paramRequired": false
}
]

View File

@ -12,12 +12,6 @@
"paramDescription": "The id of the concept record for a new version",
"paramRequired": false
},
{
"paramName":"cmp",
"paramLongName":"communityMapPath",
"paramDescription": "the path to the serialization of the community map",
"paramRequired": false
},
{
"paramName":"di",
"paramLongName":"depositionId",

View File

@ -0,0 +1,30 @@
<configuration>
<property>
<name>jobTracker</name>
<value>yarnRM</value>
</property>
<property>
<name>nameNode</name>
<value>hdfs://nameservice1</value>
</property>
<property>
<name>oozie.use.system.libpath</name>
<value>true</value>
</property>
<property>
<name>hiveMetastoreUris</name>
<value>thrift://iis-cdh5-test-m3.ocean.icm.edu.pl:9083</value>
</property>
<property>
<name>hiveJdbcUrl</name>
<value>jdbc:hive2://iis-cdh5-test-m3.ocean.icm.edu.pl:10000</value>
</property>
<property>
<name>hiveDbName</name>
<value>openaire</value>
</property>
<property>
<name>oozie.launcher.mapreduce.user.classpath.first</name>
<value>true</value>
</property>
</configuration>

View File

@ -0,0 +1,4 @@
## This is a classpath-based import file (this header is required)
dump_complete classpath eu/dnetlib/dhp/oa/graph/dump/wf/subworkflows/complete/oozie_app
dump_funder classpath eu/dnetlib/dhp/oa/graph/dump/wf/subworkflows/funder/oozie_app
dump_community classpath eu/dnetlib/dhp/oa/graph/dump/wf/subworkflows/community/oozie_app

View File

@ -0,0 +1,306 @@
<workflow-app name="dump_graph" xmlns="uri:oozie:workflow:0.5">
<parameters>
<property>
<name>singleDeposition</name>
<description>Indicates if it is a single community deposition</description>
</property>
<property>
<name>communityId</name>
<description>the id of the community to be dumped if a dump for a single community should be done</description>
</property>
<property>
<name>dumpType</name>
<description>the type of the dump one of {complete, community, funder}</description>
</property>
<property>
<name>onlyUpload</name>
<description>true if the dump is already done and should only be upload in zenodo</description>
</property>
<property>
<name>upload</name>
<description>true if the dump should be upload in zenodo</description>
</property>
<property>
<name>sourcePath</name>
<description>the source path</description>
</property>
<property>
<name>isLookUpUrl</name>
<description>the isLookup service endpoint</description>
</property>
<property>
<name>outputPath</name>
<description>the output path</description>
</property>
<property>
<name>resultAggregation</name>
<description>true if all the result type have to be dumped under result. false otherwise</description>
</property>
<property>
<name>accessToken</name>
<description>the access token used for the deposition in Zenodo</description>
</property>
<property>
<name>connectionUrl</name>
<description>the connection url for Zenodo</description>
</property>
<property>
<name>metadata</name>
<description> the metadata associated to the deposition</description>
</property>
<property>
<name>depositionType</name>
<description>the type of deposition we want to perform. "new" for brand new deposition, "version" for a new version of a published deposition (in this case the concept record id must be provided), "upload" to upload content to an open deposition for which we already have the deposition id (in this case the deposition id should be provided)</description>
</property>
<property>
<name>conceptRecordId</name>
<description>for new version, the id of the record for the old deposition</description>
</property>
<property>
<name>depositionId</name>
<description>the depositionId of a deposition open that has to be added content</description>
</property>
<property>
<name>organizationCommunityMap</name>
<description>the organization community map</description>
</property>
<property>
<name>hiveDbName</name>
<description>the target hive database name</description>
</property>
<property>
<name>hiveJdbcUrl</name>
<description>hive server jdbc url</description>
</property>
<property>
<name>hiveMetastoreUris</name>
<description>hive server metastore URIs</description>
</property>
<property>
<name>sparkDriverMemory</name>
<description>memory for driver process</description>
</property>
<property>
<name>sparkExecutorMemory</name>
<description>memory for individual executor</description>
</property>
<property>
<name>sparkExecutorCores</name>
<description>number of cores used by single executor</description>
</property>
<property>
<name>oozieActionShareLibForSpark2</name>
<description>oozie action sharelib for spark 2.*</description>
</property>
<property>
<name>spark2ExtraListeners</name>
<value>com.cloudera.spark.lineage.NavigatorAppListener</value>
<description>spark 2.* extra listeners classname</description>
</property>
<property>
<name>spark2SqlQueryExecutionListeners</name>
<value>com.cloudera.spark.lineage.NavigatorQueryListener</value>
<description>spark 2.* sql query execution listeners classname</description>
</property>
<property>
<name>spark2YarnHistoryServerAddress</name>
<description>spark 2.* yarn history server address</description>
</property>
<property>
<name>spark2EventLogDir</name>
<description>spark 2.* event log dir location</description>
</property>
</parameters>
<global>
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<configuration>
<property>
<name>mapreduce.job.queuename</name>
<value>${queueName}</value>
</property>
<property>
<name>oozie.launcher.mapred.job.queue.name</name>
<value>${oozieLauncherQueueName}</value>
</property>
<property>
<name>oozie.action.sharelib.for.spark</name>
<value>${oozieActionShareLibForSpark2}</value>
</property>
</configuration>
</global>
<start to="only_upload"/>
<kill name="Kill">
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<decision name="only_upload">
<switch>
<case to="send_zenodo">${wf:conf('onlyUpload') eq true}</case>
<default to="reset_outputpath"/>
</switch>
</decision>
<action name="reset_outputpath">
<fs>
<delete path="${outputPath}"/>
<mkdir path="${outputPath}"/>
</fs>
<ok to="save_community_map"/>
<error to="Kill"/>
</action>
<action name="save_community_map">
<java>
<main-class>eu.dnetlib.dhp.oa.graph.dump.SaveCommunityMap</main-class>
<arg>--outputPath</arg><arg>${workingDir}/communityMap</arg>
<arg>--nameNode</arg><arg>${nameNode}</arg>
<arg>--isLookUpUrl</arg><arg>${isLookUpUrl}</arg>
<arg>--singleDeposition</arg><arg>${singleDeposition}</arg>
<arg>--communityId</arg><arg>${communityId}</arg>
</java>
<ok to="choose_dump"/>
<error to="Kill"/>
</action>
<decision name="choose_dump">
<switch>
<case to="dump_funder">${wf:conf('dumpType') eq "funder"}</case>
<case to="dump_community">${wf:conf('dumpType') eq "community"}</case>
<default to="dump_complete"/>
</switch>
</decision>
<!-- Sub-workflow which runs the dump for the complete graph -->
<action name="dump_complete">
<sub-workflow>
<app-path>${wf:appPath()}/dump_complete
</app-path>
<propagate-configuration/>
<configuration>
<property>
<name>communityMapPath</name>
<value>${workingDir}/communityMap</value>
</property>
<property>
<name>outputPath</name>
<value>${workingDir}/tar</value>
</property>
<property>
<name>sourcePath</name>
<value>${sourcePath}</value>
</property>
<property>
<name>organizationCommunityMap</name>
<value>${organizationCommunityMap}</value>
</property>
<property>
<name>isLookUpUrl</name>
<value>${isLookUpUrl}</value>
</property>
<property>
<name>resultAggregation</name>
<value>${resultAggregation}</value>
</property>
</configuration>
</sub-workflow>
<ok to="make_archive" />
<error to="Kill" />
</action>
<!-- Sub-workflow which runs the dump for the complete graph -->
<action name="dump_community">
<sub-workflow>
<app-path>${wf:appPath()}/dump_community
</app-path>
<propagate-configuration/>
<configuration>
<property>
<name>sourcePath</name>
<value>${sourcePath}</value>
</property>
<property>
<name>communityMapPath</name>
<value>${workingDir}/communityMap</value>
</property>
<property>
<name>outputPath</name>
<value>${workingDir}/tar</value>
</property>
</configuration>
</sub-workflow>
<ok to="make_archive" />
<error to="Kill" />
</action>
<action name="dump_funder">
<sub-workflow>
<app-path>${wf:appPath()}/dump_funder
</app-path>
<propagate-configuration/>
<configuration>
<property>
<name>communityMapPath</name>
<value>${workingDir}/communityMap</value>
</property>
<property>
<name>outputPath</name>
<value>${workingDir}/tar</value>
</property>
<property>
<name>sourcePath</name>
<value>${sourcePath}</value>
</property>
<property>
<name>dumpType</name>
<value>${dumpType}</value>
</property>
</configuration>
</sub-workflow>
<ok to="make_archive" />
<error to="Kill" />
</action>
<action name="make_archive">
<java>
<main-class>eu.dnetlib.dhp.oa.graph.dump.MakeTar</main-class>
<arg>--hdfsPath</arg><arg>${outputPath}</arg>
<arg>--nameNode</arg><arg>${nameNode}</arg>
<arg>--sourcePath</arg><arg>${workingDir}/tar</arg>
</java>
<ok to="should_upload"/>
<error to="Kill"/>
</action>
<decision name="should_upload">
<switch>
<case to="send_zenodo">${wf:conf('upload') eq true}</case>
<default to="End"/>
</switch>
</decision>
<action name="send_zenodo">
<java>
<main-class>eu.dnetlib.dhp.oa.graph.dump.SendToZenodoHDFS</main-class>
<arg>--hdfsPath</arg><arg>${outputPath}</arg>
<arg>--nameNode</arg><arg>${nameNode}</arg>
<arg>--accessToken</arg><arg>${accessToken}</arg>
<arg>--connectionUrl</arg><arg>${connectionUrl}</arg>
<arg>--metadata</arg><arg>${metadata}</arg>
<arg>--conceptRecordId</arg><arg>${conceptRecordId}</arg>
<arg>--depositionType</arg><arg>${depositionType}</arg>
<arg>--depositionId</arg><arg>${depositionId}</arg>
</java>
<ok to="End"/>
<error to="Kill"/>
</action>
<end name="End"/>
</workflow-app>

View File

@ -0,0 +1,30 @@
<configuration>
<property>
<name>jobTracker</name>
<value>yarnRM</value>
</property>
<property>
<name>nameNode</name>
<value>hdfs://nameservice1</value>
</property>
<property>
<name>oozie.use.system.libpath</name>
<value>true</value>
</property>
<property>
<name>hiveMetastoreUris</name>
<value>thrift://iis-cdh5-test-m3.ocean.icm.edu.pl:9083</value>
</property>
<property>
<name>hiveJdbcUrl</name>
<value>jdbc:hive2://iis-cdh5-test-m3.ocean.icm.edu.pl:10000</value>
</property>
<property>
<name>hiveDbName</name>
<value>openaire</value>
</property>
<property>
<name>oozie.launcher.mapreduce.user.classpath.first</name>
<value>true</value>
</property>
</configuration>

View File

@ -0,0 +1,347 @@
<workflow-app name="sub_dump_community_funder_results" xmlns="uri:oozie:workflow:0.5">
<parameters>
<property>
<name>sourcePath</name>
<description>the source path</description>
</property>
<property>
<name>outputPath</name>
<description>the output path</description>
</property>
<property>
<name>communityMapPath</name>
<description>the path to the community map</description>
</property>
<property>
<name>selectedResults</name>
<description>the path the the possible subset ot results to be dumped</description>
</property>
<property>
<name>hiveDbName</name>
<description>the target hive database name</description>
</property>
<property>
<name>hiveJdbcUrl</name>
<description>hive server jdbc url</description>
</property>
<property>
<name>hiveMetastoreUris</name>
<description>hive server metastore URIs</description>
</property>
<property>
<name>sparkDriverMemory</name>
<description>memory for driver process</description>
</property>
<property>
<name>sparkExecutorMemory</name>
<description>memory for individual executor</description>
</property>
<property>
<name>sparkExecutorCores</name>
<description>number of cores used by single executor</description>
</property>
<property>
<name>oozieActionShareLibForSpark2</name>
<description>oozie action sharelib for spark 2.*</description>
</property>
<property>
<name>spark2ExtraListeners</name>
<value>com.cloudera.spark.lineage.NavigatorAppListener</value>
<description>spark 2.* extra listeners classname</description>
</property>
<property>
<name>spark2SqlQueryExecutionListeners</name>
<value>com.cloudera.spark.lineage.NavigatorQueryListener</value>
<description>spark 2.* sql query execution listeners classname</description>
</property>
<property>
<name>spark2YarnHistoryServerAddress</name>
<description>spark 2.* yarn history server address</description>
</property>
<property>
<name>spark2EventLogDir</name>
<description>spark 2.* event log dir location</description>
</property>
</parameters>
<global>
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<configuration>
<property>
<name>mapreduce.job.queuename</name>
<value>${queueName}</value>
</property>
<property>
<name>oozie.launcher.mapred.job.queue.name</name>
<value>${oozieLauncherQueueName}</value>
</property>
<property>
<name>oozie.action.sharelib.for.spark</name>
<value>${oozieActionShareLibForSpark2}</value>
</property>
</configuration>
</global>
<start to="fork_dump"/>
<kill name="Kill">
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<fork name="fork_dump">
<path start="dump_publication"/>
<path start="dump_dataset"/>
<path start="dump_orp"/>
<path start="dump_software"/>
</fork>
<action name="dump_publication">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Dump table publication for community/funder related products</name>
<class>eu.dnetlib.dhp.oa.graph.dump.community.SparkDumpCommunityProducts</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
</spark-opts>
<arg>--sourcePath</arg><arg>${selectedResults}/publication</arg>
<arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.Publication</arg>
<arg>--outputPath</arg><arg>${workingDir}/dump/publication</arg>
<arg>--communityMapPath</arg><arg>${communityMapPath}</arg>
<arg>--dumpType</arg><arg>${dumpType}</arg>
</spark>
<ok to="join_dump"/>
<error to="Kill"/>
</action>
<action name="dump_dataset">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Dump table dataset for community/funder related products</name>
<class>eu.dnetlib.dhp.oa.graph.dump.community.SparkDumpCommunityProducts</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
</spark-opts>
<arg>--sourcePath</arg><arg>${selectedResults}/dataset</arg>
<arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.Dataset</arg>
<arg>--outputPath</arg><arg>${workingDir}/dump/dataset</arg>
<arg>--communityMapPath</arg><arg>${communityMapPath}</arg>
</spark>
<ok to="join_dump"/>
<error to="Kill"/>
</action>
<action name="dump_orp">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Dump table ORP for community related products</name>
<class>eu.dnetlib.dhp.oa.graph.dump.community.SparkDumpCommunityProducts</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
</spark-opts>
<arg>--sourcePath</arg><arg>${selectedResults}/otherresearchproduct</arg>
<arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.OtherResearchProduct</arg>
<arg>--outputPath</arg><arg>${workingDir}/dump/otherresearchproduct</arg>
<arg>--communityMapPath</arg><arg>${communityMapPath}</arg>
</spark>
<ok to="join_dump"/>
<error to="Kill"/>
</action>
<action name="dump_software">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Dump table software for community related products</name>
<class>eu.dnetlib.dhp.oa.graph.dump.community.SparkDumpCommunityProducts</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
</spark-opts>
<arg>--sourcePath</arg><arg>${selectedResults}/software</arg>
<arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.Software</arg>
<arg>--outputPath</arg><arg>${workingDir}/dump/software</arg>
<arg>--communityMapPath</arg><arg>${communityMapPath}</arg>
</spark>
<ok to="join_dump"/>
<error to="Kill"/>
</action>
<join name="join_dump" to="prepareResultProject"/>
<action name="prepareResultProject">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Prepare association result subset of project info</name>
<class>eu.dnetlib.dhp.oa.graph.dump.community.SparkPrepareResultProject</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
</spark-opts>
<arg>--sourcePath</arg><arg>${sourcePath}</arg>
<arg>--outputPath</arg><arg>${workingDir}/preparedInfo</arg>
</spark>
<ok to="fork_extendWithProject"/>
<error to="Kill"/>
</action>
<fork name="fork_extendWithProject">
<path start="extend_publication"/>
<path start="extend_dataset"/>
<path start="extend_orp"/>
<path start="extend_software"/>
</fork>
<action name="extend_publication">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Extend dumped publications with information about project</name>
<class>eu.dnetlib.dhp.oa.graph.dump.community.SparkUpdateProjectInfo</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
</spark-opts>
<arg>--sourcePath</arg><arg>${workingDir}/dump/publication</arg>
<arg>--outputPath</arg><arg>${outputPath}/ext/publication</arg>
<arg>--preparedInfoPath</arg><arg>${workingDir}/preparedInfo</arg>
</spark>
<ok to="join_extend"/>
<error to="Kill"/>
</action>
<action name="extend_dataset">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Extend dumped dataset with information about project</name>
<class>eu.dnetlib.dhp.oa.graph.dump.community.SparkUpdateProjectInfo</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
</spark-opts>
<arg>--sourcePath</arg><arg>${workingDir}/dump/dataset</arg>
<arg>--outputPath</arg><arg>${outputPath}/ext/dataset</arg>
<arg>--preparedInfoPath</arg><arg>${workingDir}/preparedInfo</arg>
</spark>
<ok to="join_extend"/>
<error to="Kill"/>
</action>
<action name="extend_orp">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Extend dumped ORP with information about project</name>
<class>eu.dnetlib.dhp.oa.graph.dump.community.SparkUpdateProjectInfo</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
</spark-opts>
<arg>--sourcePath</arg><arg>${workingDir}/dump/otherresearchproduct</arg>
<arg>--outputPath</arg><arg>${outputPath}/ext/orp</arg>
<arg>--preparedInfoPath</arg><arg>${workingDir}/preparedInfo</arg>
</spark>
<ok to="join_extend"/>
<error to="Kill"/>
</action>
<action name="extend_software">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Extend dumped software with information about project</name>
<class>eu.dnetlib.dhp.oa.graph.dump.community.SparkUpdateProjectInfo</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
</spark-opts>
<arg>--sourcePath</arg><arg>${workingDir}/dump/software</arg>
<arg>--outputPath</arg><arg>${outputPath}/ext/software</arg>
<arg>--preparedInfoPath</arg><arg>${workingDir}/preparedInfo</arg>
</spark>
<ok to="join_extend"/>
<error to="Kill"/>
</action>
<join name="join_extend" to="End"/>
<end name="End"/>
</workflow-app>

View File

@ -0,0 +1,30 @@
<configuration>
<property>
<name>jobTracker</name>
<value>yarnRM</value>
</property>
<property>
<name>nameNode</name>
<value>hdfs://nameservice1</value>
</property>
<property>
<name>oozie.use.system.libpath</name>
<value>true</value>
</property>
<property>
<name>hiveMetastoreUris</name>
<value>thrift://iis-cdh5-test-m3.ocean.icm.edu.pl:9083</value>
</property>
<property>
<name>hiveJdbcUrl</name>
<value>jdbc:hive2://iis-cdh5-test-m3.ocean.icm.edu.pl:10000</value>
</property>
<property>
<name>hiveDbName</name>
<value>openaire</value>
</property>
<property>
<name>oozie.launcher.mapreduce.user.classpath.first</name>
<value>true</value>
</property>
</configuration>

View File

@ -0,0 +1,2 @@
## This is a classpath-based import file (this header is required)
dump_common classpath eu/dnetlib/dhp/oa/graph/dump/wf/subworkflows/commoncommunityfunder/oozie_app

View File

@ -0,0 +1,145 @@
<workflow-app name="sub_dump_community_products" xmlns="uri:oozie:workflow:0.5">
<parameters>
<property>
<name>sourcePath</name>
<description>the source path</description>
</property>
<property>
<name>outputPath</name>
<description>the output path</description>
</property>
<property>
<name>hiveDbName</name>
<description>the target hive database name</description>
</property>
<property>
<name>hiveJdbcUrl</name>
<description>hive server jdbc url</description>
</property>
<property>
<name>hiveMetastoreUris</name>
<description>hive server metastore URIs</description>
</property>
<property>
<name>sparkDriverMemory</name>
<description>memory for driver process</description>
</property>
<property>
<name>sparkExecutorMemory</name>
<description>memory for individual executor</description>
</property>
<property>
<name>sparkExecutorCores</name>
<description>number of cores used by single executor</description>
</property>
<property>
<name>oozieActionShareLibForSpark2</name>
<description>oozie action sharelib for spark 2.*</description>
</property>
<property>
<name>spark2ExtraListeners</name>
<value>com.cloudera.spark.lineage.NavigatorAppListener</value>
<description>spark 2.* extra listeners classname</description>
</property>
<property>
<name>spark2SqlQueryExecutionListeners</name>
<value>com.cloudera.spark.lineage.NavigatorQueryListener</value>
<description>spark 2.* sql query execution listeners classname</description>
</property>
<property>
<name>spark2YarnHistoryServerAddress</name>
<description>spark 2.* yarn history server address</description>
</property>
<property>
<name>spark2EventLogDir</name>
<description>spark 2.* event log dir location</description>
</property>
</parameters>
<global>
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<configuration>
<property>
<name>mapreduce.job.queuename</name>
<value>${queueName}</value>
</property>
<property>
<name>oozie.launcher.mapred.job.queue.name</name>
<value>${oozieLauncherQueueName}</value>
</property>
<property>
<name>oozie.action.sharelib.for.spark</name>
<value>${oozieActionShareLibForSpark2}</value>
</property>
</configuration>
</global>
<start to="common_action_community_funder"/>
<kill name="Kill">
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<action name="common_action_community_funder">
<sub-workflow>
<app-path>${wf:appPath()}/dump_common
</app-path>
<propagate-configuration/>
<configuration>
<property>
<name>sourcePath</name>
<value>${sourcePath}</value>
</property>
<property>
<name>selectedResults</name>
<value>${sourcePath}</value>
</property>
<property>
<name>communityMapPath</name>
<value>${workingDir}/communityMap</value>
</property>
<property>
<name>outputPath</name>
<value>${workingDir}</value>
</property>
</configuration>
</sub-workflow>
<ok to="splitForCommunities" />
<error to="Kill" />
</action>
<action name="splitForCommunities">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Split dumped result for community</name>
<class>eu.dnetlib.dhp.oa.graph.dump.community.SparkSplitForCommunity</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
</spark-opts>
<arg>--sourcePath</arg><arg>${workingDir}/ext</arg>
<arg>--outputPath</arg><arg>${outputPath}</arg>
<arg>--communityMapPath</arg><arg>${communityMapPath}</arg>
</spark>
<ok to="End"/>
<error to="Kill"/>
</action>
<end name="End"/>
</workflow-app>

View File

@ -0,0 +1,30 @@
<configuration>
<property>
<name>jobTracker</name>
<value>yarnRM</value>
</property>
<property>
<name>nameNode</name>
<value>hdfs://nameservice1</value>
</property>
<property>
<name>oozie.use.system.libpath</name>
<value>true</value>
</property>
<property>
<name>hiveMetastoreUris</name>
<value>thrift://iis-cdh5-test-m3.ocean.icm.edu.pl:9083</value>
</property>
<property>
<name>hiveJdbcUrl</name>
<value>jdbc:hive2://iis-cdh5-test-m3.ocean.icm.edu.pl:10000</value>
</property>
<property>
<name>hiveDbName</name>
<value>openaire</value>
</property>
<property>
<name>oozie.launcher.mapreduce.user.classpath.first</name>
<value>true</value>
</property>
</configuration>

View File

@ -0,0 +1,538 @@
<workflow-app name="sub-dump_complete" xmlns="uri:oozie:workflow:0.5">
<parameters>
<property>
<name>sourcePath</name>
<description>the source path</description>
</property>
<property>
<name>outputPath</name>
<description>the output path</description>
</property>
<property>
<name>resultAggregation</name>
<description>true if all the result type have to be dumped under result. false otherwise</description>
</property>
<property>
<name>organizationCommunityMap</name>
<description>the organization community map</description>
</property>
<property>
<name>hiveDbName</name>
<description>the target hive database name</description>
</property>
<property>
<name>hiveJdbcUrl</name>
<description>hive server jdbc url</description>
</property>
<property>
<name>hiveMetastoreUris</name>
<description>hive server metastore URIs</description>
</property>
<property>
<name>sparkDriverMemory</name>
<description>memory for driver process</description>
</property>
<property>
<name>sparkExecutorMemory</name>
<description>memory for individual executor</description>
</property>
<property>
<name>sparkExecutorCores</name>
<description>number of cores used by single executor</description>
</property>
<property>
<name>oozieActionShareLibForSpark2</name>
<description>oozie action sharelib for spark 2.*</description>
</property>
<property>
<name>spark2ExtraListeners</name>
<value>com.cloudera.spark.lineage.NavigatorAppListener</value>
<description>spark 2.* extra listeners classname</description>
</property>
<property>
<name>spark2SqlQueryExecutionListeners</name>
<value>com.cloudera.spark.lineage.NavigatorQueryListener</value>
<description>spark 2.* sql query execution listeners classname</description>
</property>
<property>
<name>spark2YarnHistoryServerAddress</name>
<description>spark 2.* yarn history server address</description>
</property>
<property>
<name>spark2EventLogDir</name>
<description>spark 2.* event log dir location</description>
</property>
</parameters>
<global>
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<configuration>
<property>
<name>mapreduce.job.queuename</name>
<value>${queueName}</value>
</property>
<property>
<name>oozie.launcher.mapred.job.queue.name</name>
<value>${oozieLauncherQueueName}</value>
</property>
<property>
<name>oozie.action.sharelib.for.spark</name>
<value>${oozieActionShareLibForSpark2}</value>
</property>
</configuration>
</global>
<start to="fork_dump" />
<fork name="fork_dump">
<path start="dump_publication"/>
<path start="dump_dataset"/>
<path start="dump_orp"/>
<path start="dump_software"/>
<path start="dump_organization"/>
<path start="dump_project"/>
<path start="dump_datasource"/>
<path start="select_relation"/>
</fork>
<action name="dump_publication">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Dump table publication </name>
<class>eu.dnetlib.dhp.oa.graph.dump.complete.SparkDumpEntitiesJob</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
</spark-opts>
<arg>--sourcePath</arg><arg>${sourcePath}/publication</arg>
<arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.Publication</arg>
<arg>--outputPath</arg><arg>${workingDir}/result/publication</arg>
<arg>--communityMapPath</arg><arg>${communityMapPath}</arg>
</spark>
<ok to="join_dump"/>
<error to="Kill"/>
</action>
<action name="dump_dataset">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Dump table dataset </name>
<class>eu.dnetlib.dhp.oa.graph.dump.complete.SparkDumpEntitiesJob</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
</spark-opts>
<arg>--sourcePath</arg><arg>${sourcePath}/dataset</arg>
<arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.Dataset</arg>
<arg>--outputPath</arg><arg>${workingDir}/result/dataset</arg>
<arg>--communityMapPath</arg><arg>${communityMapPath}</arg>
</spark>
<ok to="join_dump"/>
<error to="Kill"/>
</action>
<action name="dump_orp">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Dump table ORP </name>
<class>eu.dnetlib.dhp.oa.graph.dump.complete.SparkDumpEntitiesJob</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
</spark-opts>
<arg>--sourcePath</arg><arg>${sourcePath}/otherresearchproduct</arg>
<arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.OtherResearchProduct</arg>
<arg>--outputPath</arg><arg>${workingDir}/result/otherresearchproduct</arg>
<arg>--communityMapPath</arg><arg>${communityMapPath}</arg>
</spark>
<ok to="join_dump"/>
<error to="Kill"/>
</action>
<action name="dump_software">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Dump table software </name>
<class>eu.dnetlib.dhp.oa.graph.dump.complete.SparkDumpEntitiesJob</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
</spark-opts>
<arg>--sourcePath</arg><arg>${sourcePath}/software</arg>
<arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.Software</arg>
<arg>--outputPath</arg><arg>${workingDir}/result/software</arg>
<arg>--communityMapPath</arg><arg>${communityMapPath}</arg>
</spark>
<ok to="join_dump"/>
<error to="Kill"/>
</action>
<action name="dump_organization">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Dump table organization </name>
<class>eu.dnetlib.dhp.oa.graph.dump.complete.SparkDumpEntitiesJob</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
</spark-opts>
<arg>--sourcePath</arg><arg>${sourcePath}/organization</arg>
<arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.Organization</arg>
<arg>--outputPath</arg><arg>${outputPath}/organization</arg>
<arg>--communityMapPath</arg><arg>${communityMapPath}</arg>
</spark>
<ok to="join_dump"/>
<error to="Kill"/>
</action>
<action name="dump_project">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Dump table project </name>
<class>eu.dnetlib.dhp.oa.graph.dump.complete.SparkDumpEntitiesJob</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
</spark-opts>
<arg>--sourcePath</arg><arg>${sourcePath}/project</arg>
<arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.Project</arg>
<arg>--outputPath</arg><arg>${outputPath}/project</arg>
<arg>--communityMapPath</arg><arg>${communityMapPath}</arg>
</spark>
<ok to="join_dump"/>
<error to="Kill"/>
</action>
<action name="dump_datasource">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Dump table datasource </name>
<class>eu.dnetlib.dhp.oa.graph.dump.complete.SparkDumpEntitiesJob</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
</spark-opts>
<arg>--sourcePath</arg><arg>${sourcePath}/datasource</arg>
<arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.Datasource</arg>
<arg>--outputPath</arg><arg>${outputPath}/datasource</arg>
<arg>--communityMapPath</arg><arg>${workingDir}/communityMap</arg>
</spark>
<ok to="join_dump"/>
<error to="Kill"/>
</action>
<action name="select_relation">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Select valid table relation </name>
<class>eu.dnetlib.dhp.oa.graph.dump.complete.SparkSelectValidRelationsJob</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
</spark-opts>
<arg>--sourcePath</arg><arg>${sourcePath}</arg>
<arg>--outputPath</arg><arg>${workingDir}/validrelation</arg>
</spark>
<ok to="dump_relation"/>
<error to="Kill"/>
</action>
<action name="dump_relation">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Dump table relation </name>
<class>eu.dnetlib.dhp.oa.graph.dump.complete.SparkDumpRelationJob</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
</spark-opts>
<arg>--sourcePath</arg><arg>${workingDir}/validrelation</arg>
<arg>--outputPath</arg><arg>${workingDir}/relation/relation</arg>
</spark>
<ok to="join_dump"/>
<error to="Kill"/>
</action>
<join name="join_dump" to="fork_context"/>
<fork name="fork_context">
<path start="create_entities_fromcontext"/>
<path start="create_relation_fromcontext"/>
<path start="create_relation_fromorgs"/>
</fork>
<action name="create_entities_fromcontext">
<java>
<main-class>eu.dnetlib.dhp.oa.graph.dump.complete.CreateContextEntities</main-class>
<arg>--hdfsPath</arg><arg>${outputPath}/communities_infrastructures/communities_infrastructure.json.gz</arg>
<arg>--nameNode</arg><arg>${nameNode}</arg>
<arg>--isLookUpUrl</arg><arg>${isLookUpUrl}</arg>
</java>
<ok to="join_context"/>
<error to="Kill"/>
</action>
<action name="create_relation_fromcontext">
<java>
<main-class>eu.dnetlib.dhp.oa.graph.dump.complete.CreateContextRelation</main-class>
<arg>--hdfsPath</arg><arg>${workingDir}/relation/context</arg>
<arg>--nameNode</arg><arg>${nameNode}</arg>
<arg>--isLookUpUrl</arg><arg>${isLookUpUrl}</arg>
</java>
<ok to="join_context"/>
<error to="Kill"/>
</action>
<action name="create_relation_fromorgs">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Dump table relation </name>
<class>eu.dnetlib.dhp.oa.graph.dump.complete.SparkOrganizationRelation</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
</spark-opts>
<arg>--sourcePath</arg><arg>${sourcePath}/relation</arg>
<arg>--outputPath</arg><arg>${workingDir}/relation/contextOrg</arg>
<arg>--organizationCommunityMap</arg><arg>${organizationCommunityMap}</arg>
<arg>--communityMapPath</arg><arg>${communityMapPath}</arg>
</spark>
<ok to="join_context"/>
<error to="Kill"/>
</action>
<join name="join_context" to="fork_extract_relations"/>
<fork name="fork_extract_relations">
<path start="rels_from_pubs"/>
<path start="rels_from_dats"/>
<path start="rels_from_orp"/>
<path start="rels_from_sw"/>
</fork>
<action name="rels_from_pubs">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Extract Relations from publication </name>
<class>eu.dnetlib.dhp.oa.graph.dump.complete.SparkExtractRelationFromEntities</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
</spark-opts>
<arg>--sourcePath</arg><arg>${sourcePath}/publication</arg>
<arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.Publication</arg>
<arg>--outputPath</arg><arg>${workingDir}/relation/publication</arg>
<arg>--communityMapPath</arg><arg>${communityMapPath}</arg>
</spark>
<ok to="join_extract_relations"/>
<error to="Kill"/>
</action>
<action name="rels_from_dats">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Dump table dataset </name>
<class>eu.dnetlib.dhp.oa.graph.dump.complete.SparkExtractRelationFromEntities</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
</spark-opts>
<arg>--sourcePath</arg><arg>${sourcePath}/dataset</arg>
<arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.Dataset</arg>
<arg>--outputPath</arg><arg>${workingDir}/relation/dataset</arg>
<arg>--communityMapPath</arg><arg>${communityMapPath}</arg>
</spark>
<ok to="join_extract_relations"/>
<error to="Kill"/>
</action>
<action name="rels_from_orp">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Dump table ORP </name>
<class>eu.dnetlib.dhp.oa.graph.dump.complete.SparkExtractRelationFromEntities</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
</spark-opts>
<arg>--sourcePath</arg><arg>${sourcePath}/otherresearchproduct</arg>
<arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.OtherResearchProduct</arg>
<arg>--outputPath</arg><arg>${workingDir}/relation/orp</arg>
<arg>--communityMapPath</arg><arg>${communityMapPath}</arg>
</spark>
<ok to="join_extract_relations"/>
<error to="Kill"/>
</action>
<action name="rels_from_sw">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Dump table software </name>
<class>eu.dnetlib.dhp.oa.graph.dump.complete.SparkExtractRelationFromEntities</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
</spark-opts>
<arg>--sourcePath</arg><arg>${sourcePath}/software</arg>
<arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.Software</arg>
<arg>--outputPath</arg><arg>${workingDir}/relation/software</arg>
<arg>--communityMapPath</arg><arg>${communityMapPath}</arg>
</spark>
<ok to="join_extract_relations"/>
<error to="Kill"/>
</action>
<join name="join_extract_relations" to="collect_and_save"/>
<action name="collect_and_save">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Collect Results and Relations and put them in the right path </name>
<class>eu.dnetlib.dhp.oa.graph.dump.complete.SparkCollectAndSave</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
</spark-opts>
<arg>--sourcePath</arg><arg>${workingDir}</arg>
<arg>--outputPath</arg><arg>${outputPath}</arg>
<arg>--resultAggregation</arg><arg>${resultAggregation}</arg>
</spark>
<ok to="End"/>
<error to="Kill"/>
</action>
<kill name="Kill">
<message>Sub-workflow dump complete failed with error message ${wf:errorMessage()}
</message>
</kill>
<end name="End" />
</workflow-app>

View File

@ -0,0 +1,30 @@
<configuration>
<property>
<name>jobTracker</name>
<value>yarnRM</value>
</property>
<property>
<name>nameNode</name>
<value>hdfs://nameservice1</value>
</property>
<property>
<name>oozie.use.system.libpath</name>
<value>true</value>
</property>
<property>
<name>hiveMetastoreUris</name>
<value>thrift://iis-cdh5-test-m3.ocean.icm.edu.pl:9083</value>
</property>
<property>
<name>hiveJdbcUrl</name>
<value>jdbc:hive2://iis-cdh5-test-m3.ocean.icm.edu.pl:10000</value>
</property>
<property>
<name>hiveDbName</name>
<value>openaire</value>
</property>
<property>
<name>oozie.launcher.mapreduce.user.classpath.first</name>
<value>true</value>
</property>
</configuration>

View File

@ -0,0 +1,2 @@
## This is a classpath-based import file (this header is required)
dump_common classpath eu/dnetlib/dhp/oa/graph/dump/wf/subworkflows/commoncommunityfunder/oozie_app

View File

@ -0,0 +1,258 @@
<workflow-app name="sub_dump_funder_results" xmlns="uri:oozie:workflow:0.5">
<parameters>
<property>
<name>sourcePath</name>
<description>the source path</description>
</property>
<property>
<name>outputPath</name>
<description>the output path</description>
</property>
<property>
<name>hiveDbName</name>
<description>the target hive database name</description>
</property>
<property>
<name>hiveJdbcUrl</name>
<description>hive server jdbc url</description>
</property>
<property>
<name>hiveMetastoreUris</name>
<description>hive server metastore URIs</description>
</property>
<property>
<name>sparkDriverMemory</name>
<description>memory for driver process</description>
</property>
<property>
<name>sparkExecutorMemory</name>
<description>memory for individual executor</description>
</property>
<property>
<name>sparkExecutorCores</name>
<description>number of cores used by single executor</description>
</property>
<property>
<name>oozieActionShareLibForSpark2</name>
<description>oozie action sharelib for spark 2.*</description>
</property>
<property>
<name>spark2ExtraListeners</name>
<value>com.cloudera.spark.lineage.NavigatorAppListener</value>
<description>spark 2.* extra listeners classname</description>
</property>
<property>
<name>spark2SqlQueryExecutionListeners</name>
<value>com.cloudera.spark.lineage.NavigatorQueryListener</value>
<description>spark 2.* sql query execution listeners classname</description>
</property>
<property>
<name>spark2YarnHistoryServerAddress</name>
<description>spark 2.* yarn history server address</description>
</property>
<property>
<name>spark2EventLogDir</name>
<description>spark 2.* event log dir location</description>
</property>
</parameters>
<global>
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<configuration>
<property>
<name>mapreduce.job.queuename</name>
<value>${queueName}</value>
</property>
<property>
<name>oozie.launcher.mapred.job.queue.name</name>
<value>${oozieLauncherQueueName}</value>
</property>
<property>
<name>oozie.action.sharelib.for.spark</name>
<value>${oozieActionShareLibForSpark2}</value>
</property>
</configuration>
</global>
<start to="fork_result_linked_to_projects"/>
<kill name="Kill">
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<fork name="fork_result_linked_to_projects">
<path start="select_publication_linked_to_projects"/>
<path start="select_dataset_linked_to_projects"/>
<path start="select_orp_linked_to_project"/>
<path start="select_software_linked_to_projects"/>
</fork>
<action name="select_publication_linked_to_projects">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Dump funder results </name>
<class>eu.dnetlib.dhp.oa.graph.dump.funderresults.SparkResultLinkedToProject</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
</spark-opts>
<arg>--sourcePath</arg><arg>${sourcePath}/publication</arg>
<arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.Publication</arg>
<arg>--outputPath</arg><arg>${workingDir}/result/publication</arg>
<arg>--graphPath</arg><arg>${sourcePath}</arg>
</spark>
<ok to="join_link"/>
<error to="Kill"/>
</action>
<action name="select_dataset_linked_to_projects">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Dump funder results </name>
<class>eu.dnetlib.dhp.oa.graph.dump.funderresults.SparkResultLinkedToProject</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
</spark-opts>
<arg>--sourcePath</arg><arg>${sourcePath}/dataset</arg>
<arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.Dataset</arg>
<arg>--outputPath</arg><arg>${workingDir}/result/dataset</arg>
<arg>--graphPath</arg><arg>${sourcePath}</arg>
</spark>
<ok to="join_link"/>
<error to="Kill"/>
</action>
<action name="select_orp_linked_to_project">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Dump funder results </name>
<class>eu.dnetlib.dhp.oa.graph.dump.funderresults.SparkResultLinkedToProject</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
</spark-opts>
<arg>--sourcePath</arg><arg>${sourcePath}/otherresearchproduct</arg>
<arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.OtherResearchProduct</arg>
<arg>--outputPath</arg><arg>${workingDir}/result/otherresearchproduct</arg>
<arg>--graphPath</arg><arg>${sourcePath}</arg>
</spark>
<ok to="join_link"/>
<error to="Kill"/>
</action>
<action name="select_software_linked_to_projects">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Dump funder results </name>
<class>eu.dnetlib.dhp.oa.graph.dump.funderresults.SparkResultLinkedToProject</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
</spark-opts>
<arg>--sourcePath</arg><arg>${sourcePath}/software</arg>
<arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.Software</arg>
<arg>--outputPath</arg><arg>${workingDir}/result/software</arg>
<arg>--graphPath</arg><arg>${sourcePath}</arg>
</spark>
<ok to="join_link"/>
<error to="Kill"/>
</action>
<join name="join_link" to="common_action_community_funder"/>
<action name="common_action_community_funder">
<sub-workflow>
<app-path>${wf:appPath()}/dump_common
</app-path>
<propagate-configuration/>
<configuration>
<property>
<name>sourcePath</name>
<value>${sourcePath}</value>
</property>
<property>
<name>selectedResults</name>
<value>${workingDir}/result</value>
</property>
<property>
<name>communityMapPath</name>
<value>${workingDir}/communityMap</value>
</property>
<property>
<name>outputPath</name>
<value>${workingDir}</value>
</property>
</configuration>
</sub-workflow>
<ok to="dump_funder_results" />
<error to="Kill" />
</action>
<action name="dump_funder_results">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Dump funder results </name>
<class>eu.dnetlib.dhp.oa.graph.dump.funderresults.SparkDumpFunderResults</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
</spark-opts>
<arg>--sourcePath</arg><arg>${workingDir}/ext</arg>
<arg>--outputPath</arg><arg>${outputPath}</arg>
<arg>--graphPath</arg><arg>${sourcePath}</arg>
</spark>
<ok to="End"/>
<error to="Kill"/>
</action>
<end name="End"/>
</workflow-app>

View File

@ -13,6 +13,7 @@ import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.ForeachFunction;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.junit.jupiter.api.*;
import org.slf4j.Logger;
@ -408,4 +409,53 @@ public class DumpJobTest {
}
@Test
public void testArticlePCA() {
final String sourcePath = getClass()
.getResource("/eu/dnetlib/dhp/oa/graph/dump/resultDump/publication_pca")
.getPath();
final String communityMapPath = getClass()
.getResource("/eu/dnetlib/dhp/oa/graph/dump/communityMapPath/communitymap.json")
.getPath();
DumpProducts dump = new DumpProducts();
dump
.run(
// false, sourcePath, workingDir.toString() + "/result", communityMapPath, Publication.class,
false, sourcePath, workingDir.toString() + "/result", communityMapPath, Publication.class,
GraphResult.class, Constants.DUMPTYPE.COMPLETE.getType());
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
JavaRDD<GraphResult> tmp = sc
.textFile(workingDir.toString() + "/result")
.map(item -> OBJECT_MAPPER.readValue(item, GraphResult.class));
org.apache.spark.sql.Dataset<GraphResult> verificationDataset = spark
.createDataset(tmp.rdd(), Encoders.bean(GraphResult.class));
Assertions.assertEquals(23, verificationDataset.count());
// verificationDataset.show(false);
Assertions.assertEquals(23, verificationDataset.filter("type = 'publication'").count());
verificationDataset.createOrReplaceTempView("check");
org.apache.spark.sql.Dataset<Row> temp = spark
.sql(
"select id " +
"from check " +
"lateral view explode (instance) i as inst " +
"where inst.articleprocessingcharge is not null");
Assertions.assertTrue(temp.count() == 2);
Assertions.assertTrue(temp.filter("id = '50|datacite____::05c611fdfc93d7a2a703d1324e28104a'").count() == 1);
Assertions.assertTrue(temp.filter("id = '50|dedup_wf_001::01e6a28565ca01376b7548e530c6f6e8'").count() == 1);
//TODO verify value and name of the fields for vocab related value (i.e. accessright, bestaccessright)
}
}

View File

@ -21,7 +21,7 @@ public class GenerateJsonSchema {
configBuilder.forFields().withDescriptionResolver(field -> "Description of " + field.getDeclaredName());
SchemaGeneratorConfig config = configBuilder.build();
SchemaGenerator generator = new SchemaGenerator(config);
JsonNode jsonSchema = generator.generateSchema(Relation.class);
JsonNode jsonSchema = generator.generateSchema(GraphResult.class);
System.out.println(jsonSchema.toString());
}

View File

@ -70,7 +70,7 @@ public class QueryInformationSystemTest {
lenient().when(isLookUpService.quickSearchProfile(XQUERY)).thenReturn(communityMap);
queryInformationSystem = new QueryInformationSystem();
queryInformationSystem.setIsLookUp(isLookUpService);
map = queryInformationSystem.getCommunityMap();
map = queryInformationSystem.getCommunityMap(false, null);
}
@Test

View File

@ -97,7 +97,7 @@ public class CreateEntityTest {
Assertions.assertEquals(12, riList.size());
riList.stream().forEach(c -> {
switch (c.getOriginalId()) {
switch (c.getAcronym()) {
case "mes":
Assertions
.assertTrue(c.getType().equals(eu.dnetlib.dhp.oa.graph.dump.Constants.RESEARCH_COMMUNITY));
@ -115,9 +115,9 @@ public class CreateEntityTest {
String
.format(
"%s|%s::%s", Constants.CONTEXT_ID, Constants.CONTEXT_NS_PREFIX,
DHPUtils.md5(c.getOriginalId()))));
DHPUtils.md5(c.getAcronym()))));
Assertions.assertTrue(c.getZenodo_community().equals("https://zenodo.org/communities/oac_mes"));
Assertions.assertTrue("mes".equals(c.getOriginalId()));
Assertions.assertTrue("mes".equals(c.getAcronym()));
break;
case "clarin":
Assertions
@ -130,9 +130,9 @@ public class CreateEntityTest {
String
.format(
"%s|%s::%s", Constants.CONTEXT_ID, Constants.CONTEXT_NS_PREFIX,
DHPUtils.md5(c.getOriginalId()))));
DHPUtils.md5(c.getAcronym()))));
Assertions.assertTrue(c.getZenodo_community().equals("https://zenodo.org/communities/oac_clarin"));
Assertions.assertTrue("clarin".equals(c.getOriginalId()));
Assertions.assertTrue("clarin".equals(c.getAcronym()));
break;
}
// TODO add check for all the others Entities

View File

@ -9,11 +9,14 @@ import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.gson.Gson;
import eu.dnetlib.dhp.schema.common.ModelSupport;
import eu.dnetlib.dhp.schema.dump.oaf.graph.Relation;
import eu.dnetlib.dhp.schema.oaf.Datasource;
import eu.dnetlib.dhp.schema.oaf.Project;
import eu.dnetlib.dhp.utils.DHPUtils;
public class CreateRelationTest {
@ -203,6 +206,7 @@ public class CreateRelationTest {
" <param name=\"suggestedAcknowledgement\"/>\n" +
" <param name=\"zenodoCommunity\">oac_ni</param>\n" +
" <param name=\"creationdate\">2018-03-01T12:00:00</param>\n" +
" <category claim=\"false\" id=\"ni::projects\" label=\"NI Content providers\"/>\n" +
" <category claim=\"false\" id=\"ni::contentproviders\" label=\"NI Content providers\">\n" +
" <concept claim=\"false\" id=\"ni::contentproviders::1\" label=\"OpenNeuro\">\n" +
" <param name=\"openaireId\">re3data_____::5b9bf9171d92df854cf3c520692e9122</param>\n" +
@ -376,7 +380,7 @@ public class CreateRelationTest {
" <param name=\"rule\"/>\n" +
" <param name=\"CD_PROJECT_NUMBER\">675858</param>\n" +
" <param name=\"url\"/>\n" +
" <param name=\"funding\">H2020-EINFRA-2015-1</param>\n" +
" <param name=\"funding\">EC | H2020 | RIA</param>\n" +
" <param name=\"funder\">EC</param>\n" +
" <param name=\"acronym\">West-Life</param>\n" +
" </concept>\n" +
@ -437,7 +441,65 @@ public class CreateRelationTest {
" <param name=\"suggestedAcknowledgement\"/>\n" +
" <param name=\"zenodoCommunity\">oaa_elixir-gr</param>\n" +
" <param name=\"creationdate\">2018-03-01T12:00:00</param>\n" +
" <category claim=\"false\" id=\"elixir-gr::projects\" label=\"ELIXIR GR Projects\"/>\n" +
" <category claim=\"false\" id=\"elixir-gr::projects\" label=\"ELIXIR GR Projects\">\n" +
" <concept claim=\"false\" id=\"ni::projects::12\" label=\"\">\n" +
" <param name=\"projectfullname\">BIO-INFORMATICS RESEARCH NETWORK COORDINATING CENTER (BIRN-CC)</param>\n"
+
" <param name=\"acronym\"/>\n" +
" <param name=\"CD_PROJECT_NUMBER\">1U24RR025736-01</param>\n" +
" <param name=\"funder\">NIH</param>\n" +
" </concept>\n" +
" <concept claim=\"false\" id=\"ni::projects::13\" label=\"\">\n" +
" <param name=\"projectfullname\">COLLABORATIVE RESEARCH: The Cognitive Neuroscience of Category Learning</param>\n"
+
" <param name=\"acronym\"/>\n" +
" <param name=\"CD_PROJECT_NUMBER\">0223843</param>\n" +
" <param name=\"funder\">NSF</param>\n" +
" </concept>\n" +
" <concept claim=\"false\" id=\"ni::projects::14\" label=\"\">\n" +
" <param name=\"projectfullname\">The Cognitive Atlas: Developing an Interdisciplinary Knowledge Base Through Socia</param>\n"
+
" <param name=\"acronym\"/>\n" +
" <param name=\"CD_PROJECT_NUMBER\">5R01MH082795-05</param>\n" +
" <param name=\"funder\">NIH</param>\n" +
" </concept>\n" +
" <concept claim=\"false\" id=\"ni::projects::15\" label=\"\">\n" +
" <param name=\"projectfullname\">Fragmented early life environmental and emotional / cognitive vulnerabilities</param>\n"
+
" <param name=\"acronym\"/>\n" +
" <param name=\"CD_PROJECT_NUMBER\">1P50MH096889-01A1</param>\n" +
" <param name=\"funder\">NIH</param>\n" +
" </concept>\n" +
" <concept claim=\"false\" id=\"ni::projects::16\" label=\"\">\n" +
" <param name=\"projectfullname\">Enhancement of the 1000 Functional Connectome Project</param>\n"
+
" <param name=\"acronym\"/>\n" +
" <param name=\"CD_PROJECT_NUMBER\">1R03MH096321-01A1</param>\n" +
" <param name=\"funder\">TUBITAK</param>\n" +
" </concept>\n" +
" <concept claim=\"false\" id=\"ni::projects::17\" label=\"\">\n" +
" <param name=\"projectfullname\">CRCNS Data Sharing: An open data repository for cognitive neuroscience: The OpenfMRI Project</param>\n"
+
" <param name=\"acronym\"/>\n" +
" <param name=\"CD_PROJECT_NUMBER\">1131441</param>\n" +
" <param name=\"funder\">NSF</param>\n" +
" </concept>\n" +
" <concept claim=\"false\" id=\"ni::projects::18\" label=\"\">\n" +
" <param name=\"projectfullname\">Enhancing Human Cortical Plasticity: Visual Psychophysics and fMRI</param>\n"
+
" <param name=\"acronym\"/>\n" +
" <param name=\"CD_PROJECT_NUMBER\">0121950</param>\n" +
" <param name=\"funder\">NSF</param>\n" +
" </concept>\n" +
" <concept claim=\"false\" id=\"ni::projects::18\" label=\"\">\n" +
" <param name=\"projectfullname\">Transforming statistical methodology for neuroimaging meta-analysis.</param>\n"
+
" <param name=\"acronym\"/>\n" +
" <param name=\"CD_PROJECT_NUMBER\">100309</param>\n" +
" <param name=\"funder\">WT</param>\n" +
" </concept>\n" +
" </category>" +
" <category claim=\"false\" id=\"elixir-gr::contentproviders\" label=\"Elixir-GR Content providers\">\n"
+
" <concept claim=\"false\" id=\"elixir-gr::contentproviders::1\" label=\"bio.tools\">\n" +
@ -566,4 +628,98 @@ public class CreateRelationTest {
tmp.contains("10|doajarticles::2899208a99aa7d142646e0a80bfeef05"));
}
@Test
public void test2() {
List<ContextInfo> cInfoList = new ArrayList<>();
final Consumer<ContextInfo> consumer = ci -> cInfoList.add(ci);
queryInformationSystem
.getContextRelation(consumer, "projects", ModelSupport.getIdPrefix(Project.class));
cInfoList.forEach(c -> System.out.println(new Gson().toJson(c)));
List<Relation> rList = new ArrayList<>();
cInfoList.forEach(cInfo -> Process.getRelation(cInfo).forEach(rList::add));
Assertions.assertEquals(44, rList.size());
Assertions
.assertFalse(
rList
.stream()
.map(r -> r.getSource().getId())
.collect(Collectors.toSet())
.contains(
String
.format(
"%s|%s::%s", Constants.CONTEXT_ID,
Constants.CONTEXT_NS_PREFIX,
DHPUtils.md5("dh-ch"))));
Assertions
.assertEquals(
2,
rList
.stream()
.filter(
r -> r
.getSource()
.getId()
.equals(
String
.format(
"%s|%s::%s", Constants.CONTEXT_ID,
Constants.CONTEXT_NS_PREFIX,
DHPUtils.md5("clarin"))))
.collect(Collectors.toList())
.size());
Assertions
.assertEquals(
2,
rList
.stream()
.filter(
r -> r
.getTarget()
.getId()
.equals(
String
.format(
"%s|%s::%s", Constants.CONTEXT_ID,
Constants.CONTEXT_NS_PREFIX,
DHPUtils.md5("clarin"))))
.collect(Collectors.toList())
.size());
Set<String> tmp = rList
.stream()
.filter(
r -> r
.getSource()
.getId()
.equals(
String
.format(
"%s|%s::%s", Constants.CONTEXT_ID,
Constants.CONTEXT_NS_PREFIX,
DHPUtils.md5("clarin"))))
.map(r -> r.getTarget().getId())
.collect(Collectors.toSet());
Assertions
.assertTrue(
tmp.contains("40|corda__h2020::b5a4eb56bf84bef2ebc193306b4d423f") &&
tmp.contains("40|corda_______::ef782b2d85676aa3e5a907427feb18c4"));
rList.forEach(rel -> {
if (rel.getSource().getId().startsWith("40|")) {
String proj = rel.getSource().getId().substring(3);
Assertions.assertTrue(proj.substring(0, proj.indexOf("::")).length() == 12);
}
});
}
}

View File

@ -88,7 +88,7 @@ public class DumpOrganizationProjectDatasourceTest {
org.apache.spark.sql.Dataset<eu.dnetlib.dhp.schema.dump.oaf.graph.Organization> verificationDataset = spark
.createDataset(tmp.rdd(), Encoders.bean(eu.dnetlib.dhp.schema.dump.oaf.graph.Organization.class));
Assertions.assertEquals(34, verificationDataset.count());
Assertions.assertEquals(15, verificationDataset.count());
verificationDataset
.foreach(

View File

@ -0,0 +1,100 @@
package eu.dnetlib.dhp.oa.graph.dump.complete;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.HashMap;
import org.apache.commons.io.FileUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.ForeachFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.schema.oaf.Relation;
import net.sf.saxon.expr.instruct.ForEach;
public class SelectRelationTest {
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
private static SparkSession spark;
private static Path workingDir;
private static final Logger log = LoggerFactory
.getLogger(SelectRelationTest.class);
private static HashMap<String, String> map = new HashMap<>();
@BeforeAll
public static void beforeAll() throws IOException {
workingDir = Files
.createTempDirectory(SelectRelationTest.class.getSimpleName());
log.info("using work dir {}", workingDir);
SparkConf conf = new SparkConf();
conf.setAppName(SelectRelationTest.class.getSimpleName());
conf.setMaster("local[*]");
conf.set("spark.driver.host", "localhost");
conf.set("hive.metastore.local", "true");
conf.set("spark.ui.enabled", "false");
conf.set("spark.sql.warehouse.dir", workingDir.toString());
conf.set("hive.metastore.warehouse.dir", workingDir.resolve("warehouse").toString());
spark = SparkSession
.builder()
.appName(SelectRelationTest.class.getSimpleName())
.config(conf)
.getOrCreate();
}
@AfterAll
public static void afterAll() throws IOException {
FileUtils.deleteDirectory(workingDir.toFile());
spark.stop();
}
@Test
public void test1() throws Exception {
final String sourcePath = getClass()
.getResource("/eu/dnetlib/dhp/oa/graph/dump/selectrelations")
.getPath();
SparkSelectValidRelationsJob.main(new String[] {
"-isSparkSessionManaged", Boolean.FALSE.toString(),
"-outputPath", workingDir.toString() + "/relation",
"-sourcePath", sourcePath
});
// dumpCommunityProducts.exec(MOCK_IS_LOOK_UP_URL,Boolean.FALSE, workingDir.toString()+"/dataset",sourcePath,"eu.dnetlib.dhp.schema.oaf.Dataset","eu.dnetlib.dhp.schema.dump.oaf.Dataset");
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
JavaRDD<eu.dnetlib.dhp.schema.oaf.Relation> tmp = sc
.textFile(workingDir.toString() + "/relation")
.map(item -> OBJECT_MAPPER.readValue(item, eu.dnetlib.dhp.schema.oaf.Relation.class));
org.apache.spark.sql.Dataset<Relation> verificationDataset = spark
.createDataset(tmp.rdd(), Encoders.bean(eu.dnetlib.dhp.schema.oaf.Relation.class));
Assertions.assertTrue(verificationDataset.count() == 7);
}
}

View File

@ -137,5 +137,10 @@ public class SplitPerFunderTest {
.map(item -> OBJECT_MAPPER.readValue(item, CommunityResult.class));
Assertions.assertEquals(3, tmp.count());
// H2020 3
tmp = sc
.textFile(workingDir.toString() + "/split/MZOS")
.map(item -> OBJECT_MAPPER.readValue(item, CommunityResult.class));
Assertions.assertEquals(1, tmp.count());
}
}

View File

@ -6,3 +6,4 @@
{"collectedfrom":[{"key":"10|driver______::bee53aa31dc2cbb538c10c2b65fa5824","value":"DOAJ-Articles"}],"dataInfo":{"deletedbyinference":false,"inferred":false,"invisible":false,"provenanceaction":{"classid":"sysimport:crosswalk:entityregistry","classname":"Harvested","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"},"trust":"0.9"},"lastupdatetimestamp":1603715146539,"properties":[],"relClass":"isProducedBy","relType":"datasourceOrganization","source":"10|doajarticles::9f3ff882f023209d9ffb4dc32b77d376","subRelType":"provision","target":"40|corda_______::ffc1811633b3222e4764c7b0517f83e8"}
{"collectedfrom":[{"key":"10|driver______::bee53aa31dc2cbb538c10c2b65fa5824","value":"DOAJ-Articles"}],"dataInfo":{"deletedbyinference":false,"inferred":false,"invisible":false,"provenanceaction":{"classid":"sysimport:crosswalk:entityregistry","classname":"Harvested","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"},"trust":"0.9"},"lastupdatetimestamp":1603715146539,"properties":[],"relClass":"isProducedBy","relType":"datasourceOrganization","source":"10|doajarticles::b566fa319c3923454e1e8eb886ab62d2","subRelType":"provision","target":"40|nhmrc_______::4e6c928fef9851b37ec73f4f6daca35b"}
{"collectedfrom":[{"key":"10|driver______::bee53aa31dc2cbb538c10c2b65fa5824","value":"DOAJ-Articles"}],"dataInfo":{"deletedbyinference":false,"inferred":false,"invisible":false,"provenanceaction":{"classid":"sysimport:crosswalk:entityregistry","classname":"Harvested","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"},"trust":"0.9"},"lastupdatetimestamp":1603715146539,"properties":[],"relClass":"isProducedBy","relType":"datasourceOrganization","source":"10|doajarticles::e0554fb004a155bc23cfb43ee9fc8eae","subRelType":"provision","target":"40|corda__h2020::846b777af165fef7c904a81712a83b66"}
{"collectedfrom":[{"key":"10|driver______::bee53aa31dc2cbb538c10c2b65fa5824","value":"DOAJ-Articles"}],"dataInfo":{"deletedbyinference":false,"inferred":false,"invisible":false,"provenanceaction":{"classid":"sysimport:crosswalk:entityregistry","classname":"Harvested","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"},"trust":"0.9"},"lastupdatetimestamp":1603715146539,"properties":[],"relClass":"isProducedBy","relType":"datasourceOrganization","source":"10|doajarticles::2baa9032dc058d3c8ff780c426b0c19f","subRelType":"provision","target":"40|irb_hr______::1e5e62235d094afd01cd56e65112fc63"}

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

View File

@ -0,0 +1,15 @@
{"collectedfrom":[{"key":"10|driver______::bee53aa31dc2cbb538c10c2b65fa5824","value":"DOAJ-Articles"}],"dataInfo":{"deletedbyinference":false,"inferred":false,"invisible":false,"provenanceaction":{"classid":"sysimport:crosswalk:entityregistry","classname":"Harvested","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"},"trust":"0.9"},"lastupdatetimestamp":1592688952862,"properties":[],"relClass":"isProvidedBy","relType":"datasourceOrganization","source":"10|doajarticles::2baa9032dc058d3c8ff780c426b0c19f","subRelType":"provision","target":"20|dedup_wf_001::2899e571609779168222fdeb59cb916d"}
{"collectedfrom":[{"key":"10|driver______::bee53aa31dc2cbb538c10c2b65fa5824","value":"DOAJ-Articles"}],"dataInfo":{"deletedbyinference":false,"inferred":false,"invisible":false,"provenanceaction":{"classid":"sysimport:crosswalk:entityregistry","classname":"Harvested","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"},"trust":"0.9"},"lastupdatetimestamp":1592688952862,"properties":[],"relClass":"isProvidedBy","relType":"datasourceOrganization","source":"10|doajarticles::5ac587eb28411c351c2e357eb097fd3d","subRelType":"provision","target":"20|grid________::b91f67a34df55a0aa1aabdcb3700f413"}
{"collectedfrom":[{"key":"10|driver______::bee53aa31dc2cbb538c10c2b65fa5824","value":"DOAJ-Articles"}],"dataInfo":{"deletedbyinference":true,"inferred":false,"invisible":false,"provenanceaction":{"classid":"sysimport:crosswalk:entityregistry","classname":"Harvested","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"},"trust":"0.9"},"lastupdatetimestamp":1592688952862,"properties":[],"relClass":"isProvidedBy","relType":"datasourceOrganization","source":"10|doajarticles::690b3aaf177a4c70b81bacd8d023cbdc","subRelType":"provision","target":"20|doajarticles::396262ee936f3d3e26ff0e60bea6cae0"}
{"collectedfrom":[{"key":"10|driver______::bee53aa31dc2cbb538c10c2b65fa5824","value":"DOAJ-Articles"}],"dataInfo":{"deletedbyinference":false,"inferred":false,"invisible":true,"provenanceaction":{"classid":"sysimport:crosswalk:entityregistry","classname":"Harvested","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"},"trust":"0.9"},"lastupdatetimestamp":1592688952862,"properties":[],"relClass":"isProvidedBy","relType":"datasourceOrganization","source":"50|doajarticles::b566fa319c3923454e1e8eb886ab62d2","subRelType":"provision","target":"20|dedup_wf_001::4e6c928fef9851b37ec73f4f6daca35b"}
{"collectedfrom":[{"key":"10|driver______::bee53aa31dc2cbb538c10c2b65fa5824","value":"DOAJ-Articles"}],"dataInfo":{"deletedbyinference":false,"inferred":false,"invisible":false,"provenanceaction":{"classid":"sysimport:crosswalk:entityregistry","classname":"Harvested","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"},"trust":"0.9"},"lastupdatetimestamp":1592688952862,"properties":[],"relClass":"isProvidedBy","relType":"datasourceOrganization","source":"50|doajarticles::e0554fb004a155bc23cfb43ee9fc8eae","subRelType":"provision","target":"20|grid________::b91f67a34df55a0aa1aabdcb3700f413"}
{"collectedfrom":[{"key":"10|driver______::bee53aa31dc2cbb538c10c2b65fa5824","value":"DOAJ-Articles"}],"dataInfo":{"deletedbyinference":true,"inferred":false,"invisible":false,"provenanceaction":{"classid":"sysimport:crosswalk:entityregistry","classname":"Harvested","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"},"trust":"0.9"},"lastupdatetimestamp":1592688952862,"properties":[],"relClass":"isProvidedBy","relType":"datasourceOrganization","source":"50|doajarticles::fbf7592ddbf2ad3cc0ed70c0f2e1d67c","subRelType":"provision","target":"20|dedup_wf_001::1b965e2c0c53e5526d269d63bcfa0ae6"}
{"collectedfrom":[{"key":"10|driver______::bee53aa31dc2cbb538c10c2b65fa5824","value":"DOAJ-Articles"}],"dataInfo":{"deletedbyinference":true,"inferred":false,"invisible":false,"provenanceaction":{"classid":"sysimport:crosswalk:entityregistry","classname":"Harvested","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"},"trust":"0.9"},"lastupdatetimestamp":1592688952862,"properties":[],"relClass":"isProvidedBy","relType":"datasourceOrganization","source":"50|doajarticles::fd4c399077127f0ba09b5205e2b78406","subRelType":"provision","target":"50|doajarticles::1cae0b82b56ccd97c2db1f698def7074"}
{"collectedfrom":[{"key":"10|infrastruct_::f66f1bd369679b5b077dcdf006089556","value":"OpenAIRE"}],"dataInfo":{"deletedbyinference":false,"inferred":false,"invisible":false,"provenanceaction":{"classid":"sysimport:crosswalk:entityregistry","classname":"Harvested","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"},"trust":"0.9"},"lastupdatetimestamp":1594398578323,"properties":[],"relClass":"isProvidedBy","relType":"datasourceOrganization","source":"50|openaire____::8f991165fae922e29ad55d592f568464","subRelType":"provision","target":"50|openaire____::ec653e804967133b9436fdd30d3ff51d"}
{"collectedfrom":[{"key":"10|openaire____::47ce9e9f4fad46e732cff06419ecaabb","value":"OpenDOAR"}],"dataInfo":{"deletedbyinference":false,"inferred":false,"invisible":false,"provenanceaction":{"classid":"sysimport:crosswalk:entityregistry","classname":"Harvested","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"},"trust":"0.9"},"lastupdatetimestamp":1592688952862,"properties":[],"relClass":"isProvidedBy","relType":"datasourceOrganization","source":"50|opendoar____::15231a7ce4ba789d13b722cc5c955834","subRelType":"provision","target":"50|dedup_wf_001::1ea4bcb1bae8c6befef1e7f1230f0f10"}
{"collectedfrom":[{"key":"10|openaire____::47ce9e9f4fad46e732cff06419ecaabb","value":"OpenDOAR"}],"dataInfo":{"deletedbyinference":false,"inferred":false,"invisible":false,"provenanceaction":{"classid":"sysimport:crosswalk:entityregistry","classname":"Harvested","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"},"trust":"0.9"},"lastupdatetimestamp":1592688952862,"properties":[],"relClass":"isProvidedBy","relType":"datasourceOrganization","source":"50|opendoar____::16d11e9595188dbad0418a85f0351aba","subRelType":"provision","target":"40|opendoar____::041abd8c990fc531ab9bd2674a0e2725"}
{"collectedfrom":[{"key":"10|openaire____::47ce9e9f4fad46e732cff06419ecaabb","value":"OpenDOAR"}],"dataInfo":{"deletedbyinference":false,"inferred":false,"invisible":false,"provenanceaction":{"classid":"sysimport:crosswalk:entityregistry","classname":"Harvested","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"},"trust":"0.9"},"lastupdatetimestamp":1592688952862,"properties":[],"relClass":"isProvidedBy","relType":"datasourceOrganization","source":"50|opendoar____::46d3f6029f6170ebccb28945964d09bf","subRelType":"provision","target":"40|opendoar____::a5fcb8eb25ebd6f7cd219e0fa1e6ddc1"}
{"collectedfrom":[{"key":"10|openaire____::47ce9e9f4fad46e732cff06419ecaabb","value":"OpenDOAR"}],"dataInfo":{"deletedbyinference":false,"inferred":false,"invisible":false,"provenanceaction":{"classid":"sysimport:crosswalk:entityregistry","classname":"Harvested","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"},"trust":"0.9"},"lastupdatetimestamp":1592688952862,"properties":[],"relClass":"isProvidedBy","relType":"datasourceOrganization","source":"50|opendoar____::7501e5d4da87ac39d782741cd794002d","subRelType":"provision","target":"40|dedup_wf_001::04e2c34ef4daa411ff2497afc807b612"}
{"collectedfrom":[{"key":"10|openaire____::6ac933301a3933c8a22ceebea7000326","value":"Academy of Finland"}],"dataInfo":{"deletedbyinference":true,"inferred":false,"invisible":false,"provenanceaction":{"classid":"sysimport:crosswalk:entityregistry","classname":"Harvested","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"},"trust":"0.900000000000000022"},"lastupdatetimestamp":1592688952862,"properties":[],"relClass":"isParticipant","relType":"projectOrganization","source":"20|aka_________::0cd5965141113df5739f1ac7ac7f6d37","subRelType":"participation","target":"40|aka_________::1bc716a1763110da3eb1af867de718a8"}
{"collectedfrom":[{"key":"10|openaire____::6ac933301a3933c8a22ceebea7000326","value":"Academy of Finland"}],"dataInfo":{"deletedbyinference":false,"inferred":false,"invisible":false,"provenanceaction":{"classid":"sysimport:crosswalk:entityregistry","classname":"Harvested","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"},"trust":"0.900000000000000022"},"lastupdatetimestamp":1592688952862,"properties":[],"relClass":"isParticipant","relType":"projectOrganization","source":"20|aka_________::1e2df822bf0932ad0f77565789f22e17","subRelType":"participation","target":"40|aka_________::a6c805bcfd383bae043d8df38e79db78"}
{"collectedfrom":[],"dataInfo":{"deletedbyinference":false,"inferenceprovenance":"iis::document_affiliations","inferred":true,"invisible":false,"provenanceaction":{"classid":"iis","classname":"Inferred by OpenAIRE","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"},"trust":"0.8966"},"lastupdatetimestamp":1595258695262,"properties":[],"relClass":"isAuthorInstitutionOf","relType":"resultOrganization","source":"20|aka_________::2c3aab6bce7516338b4dbfb4f6f86db7","subRelType":"affiliation","target":"40|dedup_wf_001::02859c30f6c8bfbdd8c427068a6ec684"}

File diff suppressed because one or more lines are too long