forked from antonis.lempesis/dnet-hadoop
Merge remote-tracking branch 'upstream/master'
This commit is contained in:
commit
259525cb93
|
@ -1,14 +1,25 @@
|
|||
package eu.dnetlib.dhp.schema.oaf;
|
||||
|
||||
import java.io.Serializable;
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
|
||||
public abstract class Oaf implements Serializable {
|
||||
|
||||
protected List<KeyValue> collectedfrom;
|
||||
|
||||
private DataInfo dataInfo;
|
||||
|
||||
private Long lastupdatetimestamp;
|
||||
|
||||
public List<KeyValue> getCollectedfrom() {
|
||||
return collectedfrom;
|
||||
}
|
||||
|
||||
public void setCollectedfrom(List<KeyValue> collectedfrom) {
|
||||
this.collectedfrom = collectedfrom;
|
||||
}
|
||||
|
||||
public DataInfo getDataInfo() {
|
||||
return dataInfo;
|
||||
}
|
||||
|
|
|
@ -10,8 +10,6 @@ public abstract class OafEntity extends Oaf implements Serializable {
|
|||
|
||||
private List<String> originalId;
|
||||
|
||||
private List<KeyValue> collectedfrom;
|
||||
|
||||
private List<StructuredProperty> pid;
|
||||
|
||||
private String dateofcollection;
|
||||
|
@ -38,14 +36,6 @@ public abstract class OafEntity extends Oaf implements Serializable {
|
|||
this.originalId = originalId;
|
||||
}
|
||||
|
||||
public List<KeyValue> getCollectedfrom() {
|
||||
return collectedfrom;
|
||||
}
|
||||
|
||||
public void setCollectedfrom(List<KeyValue> collectedfrom) {
|
||||
this.collectedfrom = collectedfrom;
|
||||
}
|
||||
|
||||
public List<StructuredProperty> getPid() {
|
||||
return pid;
|
||||
}
|
||||
|
|
|
@ -18,8 +18,6 @@ public class Relation extends Oaf {
|
|||
|
||||
private String target;
|
||||
|
||||
private List<KeyValue> collectedFrom = new ArrayList<>();
|
||||
|
||||
public String getRelType() {
|
||||
return relType;
|
||||
}
|
||||
|
@ -60,14 +58,6 @@ public class Relation extends Oaf {
|
|||
this.target = target;
|
||||
}
|
||||
|
||||
public List<KeyValue> getCollectedFrom() {
|
||||
return collectedFrom;
|
||||
}
|
||||
|
||||
public void setCollectedFrom(final List<KeyValue> collectedFrom) {
|
||||
this.collectedFrom = collectedFrom;
|
||||
}
|
||||
|
||||
public void mergeFrom(final Relation r) {
|
||||
|
||||
checkArgument(Objects.equals(getSource(), r.getSource()), "source ids must be equal");
|
||||
|
@ -77,12 +67,12 @@ public class Relation extends Oaf {
|
|||
Objects.equals(getSubRelType(), r.getSubRelType()), "subRelType(s) must be equal");
|
||||
checkArgument(Objects.equals(getRelClass(), r.getRelClass()), "relClass(es) must be equal");
|
||||
|
||||
setCollectedFrom(
|
||||
setCollectedfrom(
|
||||
Stream.concat(
|
||||
Optional.ofNullable(getCollectedFrom())
|
||||
Optional.ofNullable(getCollectedfrom())
|
||||
.map(Collection::stream)
|
||||
.orElse(Stream.empty()),
|
||||
Optional.ofNullable(r.getCollectedFrom())
|
||||
Optional.ofNullable(r.getCollectedfrom())
|
||||
.map(Collection::stream)
|
||||
.orElse(Stream.empty()))
|
||||
.distinct() // relies on KeyValue.equals
|
||||
|
@ -103,6 +93,6 @@ public class Relation extends Oaf {
|
|||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return Objects.hash(relType, subRelType, relClass, source, target, collectedFrom);
|
||||
return Objects.hash(relType, subRelType, relClass, source, target, collectedfrom);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -47,6 +47,16 @@
|
|||
<artifactId>jaxen</artifactId>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.apache.hadoop</groupId>
|
||||
<artifactId>hadoop-distcp</artifactId>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>eu.dnetlib</groupId>
|
||||
<artifactId>dnet-openaire-data-protos</artifactId>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>eu.dnetlib.dhp</groupId>
|
||||
<artifactId>dhp-schemas</artifactId>
|
||||
|
@ -57,6 +67,44 @@
|
|||
<groupId>eu.dnetlib</groupId>
|
||||
<artifactId>dnet-actionmanager-api</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>eu.dnetlib</groupId>
|
||||
<artifactId>dnet-actionmanager-common</artifactId>
|
||||
<exclusions>
|
||||
<exclusion>
|
||||
<groupId>eu.dnetlib</groupId>
|
||||
<artifactId>dnet-openaireplus-mapping-utils</artifactId>
|
||||
</exclusion>
|
||||
<exclusion>
|
||||
<groupId>saxonica</groupId>
|
||||
<artifactId>saxon</artifactId>
|
||||
</exclusion>
|
||||
<exclusion>
|
||||
<groupId>saxonica</groupId>
|
||||
<artifactId>saxon-dom</artifactId>
|
||||
</exclusion>
|
||||
<exclusion>
|
||||
<groupId>jgrapht</groupId>
|
||||
<artifactId>jgrapht</artifactId>
|
||||
</exclusion>
|
||||
<exclusion>
|
||||
<groupId>net.sf.ehcache</groupId>
|
||||
<artifactId>ehcache</artifactId>
|
||||
</exclusion>
|
||||
<exclusion>
|
||||
<groupId>org.springframework</groupId>
|
||||
<artifactId>spring-test</artifactId>
|
||||
</exclusion>
|
||||
<exclusion>
|
||||
<groupId>org.apache.*</groupId>
|
||||
<artifactId>*</artifactId>
|
||||
</exclusion>
|
||||
<exclusion>
|
||||
<groupId>apache</groupId>
|
||||
<artifactId>*</artifactId>
|
||||
</exclusion>
|
||||
</exclusions>
|
||||
</dependency>
|
||||
|
||||
</dependencies>
|
||||
</project>
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
package eu.dnetlib.dhp.migration.actions;
|
||||
package eu.dnetlib.dhp.actionmanager.migration;
|
||||
|
||||
import eu.dnetlib.data.proto.FieldTypeProtos.Qualifier;
|
||||
import java.util.Comparator;
|
|
@ -1,4 +1,4 @@
|
|||
package eu.dnetlib.dhp.migration.actions;
|
||||
package eu.dnetlib.dhp.actionmanager.migration;
|
||||
|
||||
import com.google.common.base.Splitter;
|
||||
import com.google.common.collect.Lists;
|
||||
|
@ -9,35 +9,36 @@ import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
|
|||
import java.io.File;
|
||||
import java.io.FileOutputStream;
|
||||
import java.io.OutputStream;
|
||||
import java.util.*;
|
||||
import java.util.ArrayList;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
import java.util.Properties;
|
||||
import java.util.stream.Collectors;
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.tools.DistCp;
|
||||
import org.apache.hadoop.tools.DistCpOptions;
|
||||
import org.apache.hadoop.util.ToolRunner;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
public class MigrateActionSet {
|
||||
|
||||
private static final Log log = LogFactory.getLog(MigrateActionSet.class);
|
||||
private static final Logger log = LoggerFactory.getLogger(MigrateActionSet.class);
|
||||
|
||||
private static final String SEPARATOR = "/";
|
||||
private static final String TARGET_PATHS = "target_paths";
|
||||
private static final String RAWSET_PREFIX = "rawset_";
|
||||
|
||||
private static Boolean DEFAULT_TRANSFORM_ONLY = false;
|
||||
|
||||
public static void main(String[] args) throws Exception {
|
||||
final ArgumentApplicationParser parser =
|
||||
new ArgumentApplicationParser(
|
||||
IOUtils.toString(
|
||||
MigrateActionSet.class.getResourceAsStream(
|
||||
"/eu/dnetlib/dhp/migration/migrate_actionsets_parameters.json")));
|
||||
"/eu/dnetlib/dhp/actionmanager/migration/migrate_actionsets_parameters.json")));
|
||||
parser.parseArgument(args);
|
||||
|
||||
new MigrateActionSet().run(parser);
|
||||
|
@ -56,11 +57,11 @@ public class MigrateActionSet {
|
|||
|
||||
final String transform_only_s = parser.get("transform_only");
|
||||
|
||||
log.info("transform only param: " + transform_only_s);
|
||||
log.info("transform only param: {}", transform_only_s);
|
||||
|
||||
final Boolean transformOnly = Boolean.valueOf(parser.get("transform_only"));
|
||||
|
||||
log.info("transform only: " + transformOnly);
|
||||
log.info("transform only: {}", transformOnly);
|
||||
|
||||
ISLookUpService isLookUp = ISLookupClientFactory.getLookUpService(isLookupUrl);
|
||||
|
||||
|
@ -79,22 +80,19 @@ public class MigrateActionSet {
|
|||
|
||||
final List<Path> sourcePaths = getSourcePaths(sourceNN, isLookUp);
|
||||
log.info(
|
||||
String.format(
|
||||
"paths to process:\n%s",
|
||||
sourcePaths.stream()
|
||||
.map(p -> p.toString())
|
||||
.collect(Collectors.joining("\n"))));
|
||||
"paths to process:\n{}",
|
||||
sourcePaths.stream().map(p -> p.toString()).collect(Collectors.joining("\n")));
|
||||
for (Path source : sourcePaths) {
|
||||
|
||||
if (!sourceFS.exists(source)) {
|
||||
log.warn(String.format("skipping unexisting path: %s", source));
|
||||
log.warn("skipping unexisting path: {}", source);
|
||||
} else {
|
||||
|
||||
LinkedList<String> pathQ =
|
||||
Lists.newLinkedList(Splitter.on(SEPARATOR).split(source.toUri().getPath()));
|
||||
|
||||
final String rawSet = pathQ.pollLast();
|
||||
log.info(String.format("got RAWSET: %s", rawSet));
|
||||
log.info("got RAWSET: {}", rawSet);
|
||||
|
||||
if (StringUtils.isNotBlank(rawSet) && rawSet.startsWith(RAWSET_PREFIX)) {
|
||||
|
||||
|
@ -109,7 +107,7 @@ public class MigrateActionSet {
|
|||
+ SEPARATOR
|
||||
+ rawSet);
|
||||
|
||||
log.info(String.format("using TARGET PATH: %s", targetPath));
|
||||
log.info("using TARGET PATH: {}", targetPath);
|
||||
|
||||
if (!transformOnly) {
|
||||
if (targetFS.exists(targetPath)) {
|
|
@ -1,4 +1,9 @@
|
|||
package eu.dnetlib.dhp.migration.actions;
|
||||
package eu.dnetlib.dhp.actionmanager.migration;
|
||||
|
||||
import static eu.dnetlib.data.proto.KindProtos.Kind.entity;
|
||||
import static eu.dnetlib.data.proto.KindProtos.Kind.relation;
|
||||
import static eu.dnetlib.data.proto.TypeProtos.*;
|
||||
import static eu.dnetlib.data.proto.TypeProtos.Type.*;
|
||||
|
||||
import com.google.common.collect.Lists;
|
||||
import com.googlecode.protobuf.format.JsonFormat;
|
||||
|
@ -41,7 +46,7 @@ public class ProtoConverter implements Serializable {
|
|||
rel.setRelType(r.getRelType().toString());
|
||||
rel.setSubRelType(r.getSubRelType().toString());
|
||||
rel.setRelClass(r.getRelClass());
|
||||
rel.setCollectedFrom(
|
||||
rel.setCollectedfrom(
|
||||
r.getCollectedfromCount() > 0
|
||||
? r.getCollectedfromList().stream()
|
||||
.map(kv -> mapKV(kv))
|
|
@ -0,0 +1,179 @@
|
|||
package eu.dnetlib.dhp.actionmanager.migration;
|
||||
|
||||
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.google.common.base.Splitter;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.protobuf.InvalidProtocolBufferException;
|
||||
import eu.dnetlib.data.proto.OafProtos;
|
||||
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
||||
import eu.dnetlib.dhp.schema.action.AtomicAction;
|
||||
import eu.dnetlib.dhp.schema.oaf.*;
|
||||
import eu.dnetlib.dhp.utils.ISLookupClientFactory;
|
||||
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
|
||||
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
|
||||
import java.io.IOException;
|
||||
import java.io.Serializable;
|
||||
import java.util.LinkedList;
|
||||
import java.util.Objects;
|
||||
import java.util.Optional;
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.io.Text;
|
||||
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
|
||||
import org.apache.spark.SparkConf;
|
||||
import org.apache.spark.api.java.JavaSparkContext;
|
||||
import org.apache.spark.sql.SparkSession;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import scala.Tuple2;
|
||||
|
||||
public class TransformActions implements Serializable {
|
||||
|
||||
private static final Logger log = LoggerFactory.getLogger(TransformActions.class);
|
||||
|
||||
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
|
||||
|
||||
private static final String SEPARATOR = "/";
|
||||
|
||||
public static void main(String[] args) throws Exception {
|
||||
final ArgumentApplicationParser parser =
|
||||
new ArgumentApplicationParser(
|
||||
IOUtils.toString(
|
||||
MigrateActionSet.class.getResourceAsStream(
|
||||
"/eu/dnetlib/dhp/actionmanager/migration/transform_actionsets_parameters.json")));
|
||||
parser.parseArgument(args);
|
||||
|
||||
Boolean isSparkSessionManaged =
|
||||
Optional.ofNullable(parser.get("isSparkSessionManaged"))
|
||||
.map(Boolean::valueOf)
|
||||
.orElse(Boolean.TRUE);
|
||||
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
|
||||
|
||||
final String isLookupUrl = parser.get("isLookupUrl");
|
||||
log.info("isLookupUrl: {}", isLookupUrl);
|
||||
|
||||
final String inputPaths = parser.get("inputPaths");
|
||||
|
||||
if (StringUtils.isBlank(inputPaths)) {
|
||||
throw new RuntimeException("empty inputPaths");
|
||||
}
|
||||
log.info("inputPaths: {}", inputPaths);
|
||||
|
||||
final String targetBaseDir = getTargetBaseDir(isLookupUrl);
|
||||
|
||||
SparkConf conf = new SparkConf();
|
||||
|
||||
runWithSparkSession(
|
||||
conf,
|
||||
isSparkSessionManaged,
|
||||
spark -> transformActions(inputPaths, targetBaseDir, spark));
|
||||
}
|
||||
|
||||
private static void transformActions(
|
||||
String inputPaths, String targetBaseDir, SparkSession spark) throws IOException {
|
||||
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
|
||||
final FileSystem fs = FileSystem.get(spark.sparkContext().hadoopConfiguration());
|
||||
|
||||
for (String sourcePath : Lists.newArrayList(Splitter.on(",").split(inputPaths))) {
|
||||
|
||||
LinkedList<String> pathQ =
|
||||
Lists.newLinkedList(Splitter.on(SEPARATOR).split(sourcePath));
|
||||
|
||||
final String rawset = pathQ.pollLast();
|
||||
final String actionSetDirectory = pathQ.pollLast();
|
||||
|
||||
final Path targetDirectory =
|
||||
new Path(targetBaseDir + SEPARATOR + actionSetDirectory + SEPARATOR + rawset);
|
||||
|
||||
if (fs.exists(targetDirectory)) {
|
||||
log.info("found target directory '{}", targetDirectory);
|
||||
fs.delete(targetDirectory, true);
|
||||
log.info("deleted target directory '{}", targetDirectory);
|
||||
}
|
||||
|
||||
log.info("transforming actions from '{}' to '{}'", sourcePath, targetDirectory);
|
||||
|
||||
sc.sequenceFile(sourcePath, Text.class, Text.class)
|
||||
.map(
|
||||
a ->
|
||||
eu.dnetlib.actionmanager.actions.AtomicAction.fromJSON(
|
||||
a._2().toString()))
|
||||
.map(TransformActions::doTransform)
|
||||
.filter(Objects::nonNull)
|
||||
.mapToPair(
|
||||
a ->
|
||||
new Tuple2<>(
|
||||
a.getClazz().toString(),
|
||||
OBJECT_MAPPER.writeValueAsString(a)))
|
||||
.mapToPair(t -> new Tuple2(new Text(t._1()), new Text(t._2())))
|
||||
.saveAsNewAPIHadoopFile(
|
||||
targetDirectory.toString(),
|
||||
Text.class,
|
||||
Text.class,
|
||||
SequenceFileOutputFormat.class,
|
||||
sc.hadoopConfiguration());
|
||||
}
|
||||
}
|
||||
|
||||
private static AtomicAction doTransform(eu.dnetlib.actionmanager.actions.AtomicAction aa)
|
||||
throws InvalidProtocolBufferException {
|
||||
|
||||
// dedup similarity relations had empty target value, don't migrate them
|
||||
if (aa.getTargetValue().length == 0) {
|
||||
return null;
|
||||
}
|
||||
final OafProtos.Oaf proto_oaf = OafProtos.Oaf.parseFrom(aa.getTargetValue());
|
||||
final Oaf oaf = ProtoConverter.convert(proto_oaf);
|
||||
switch (proto_oaf.getKind()) {
|
||||
case entity:
|
||||
switch (proto_oaf.getEntity().getType()) {
|
||||
case datasource:
|
||||
return new AtomicAction<>(Datasource.class, (Datasource) oaf);
|
||||
case organization:
|
||||
return new AtomicAction<>(Organization.class, (Organization) oaf);
|
||||
case project:
|
||||
return new AtomicAction<>(Project.class, (Project) oaf);
|
||||
case result:
|
||||
final String resulttypeid =
|
||||
proto_oaf
|
||||
.getEntity()
|
||||
.getResult()
|
||||
.getMetadata()
|
||||
.getResulttype()
|
||||
.getClassid();
|
||||
switch (resulttypeid) {
|
||||
case "publication":
|
||||
return new AtomicAction<>(Publication.class, (Publication) oaf);
|
||||
case "software":
|
||||
return new AtomicAction<>(Software.class, (Software) oaf);
|
||||
case "other":
|
||||
return new AtomicAction<>(
|
||||
OtherResearchProduct.class, (OtherResearchProduct) oaf);
|
||||
case "dataset":
|
||||
return new AtomicAction<>(Dataset.class, (Dataset) oaf);
|
||||
default:
|
||||
// can be an update, where the resulttype is not specified
|
||||
return new AtomicAction<>(Result.class, (Result) oaf);
|
||||
}
|
||||
default:
|
||||
throw new IllegalArgumentException(
|
||||
"invalid entity type: " + proto_oaf.getEntity().getType());
|
||||
}
|
||||
case relation:
|
||||
return new AtomicAction<>(Relation.class, (Relation) oaf);
|
||||
default:
|
||||
throw new IllegalArgumentException("invalid kind: " + proto_oaf.getKind());
|
||||
}
|
||||
}
|
||||
|
||||
private static String getTargetBaseDir(String isLookupUrl) throws ISLookUpException {
|
||||
ISLookUpService isLookUp = ISLookupClientFactory.getLookUpService(isLookupUrl);
|
||||
String XQUERY =
|
||||
"collection('/db/DRIVER/ServiceResources/ActionManagerServiceResourceType')//SERVICE_PROPERTIES/PROPERTY[@key = 'basePath']/@value/string()";
|
||||
return isLookUp.getResourceProfileByQuery(XQUERY);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,56 @@
|
|||
[
|
||||
{
|
||||
"paramName": "issm",
|
||||
"paramLongName": "isSparkSessionManaged",
|
||||
"paramDescription": "when true will stop SparkSession after job execution",
|
||||
"paramRequired": false
|
||||
},
|
||||
{
|
||||
"paramName": "is",
|
||||
"paramLongName": "isLookupUrl",
|
||||
"paramDescription": "URL of the isLookUp Service",
|
||||
"paramRequired": true
|
||||
},
|
||||
{
|
||||
"paramName": "sn",
|
||||
"paramLongName": "sourceNameNode",
|
||||
"paramDescription": "nameNode of the source cluster",
|
||||
"paramRequired": true
|
||||
},
|
||||
{
|
||||
"paramName": "tn",
|
||||
"paramLongName": "targetNameNode",
|
||||
"paramDescription": "namoNode of the target cluster",
|
||||
"paramRequired": true
|
||||
},
|
||||
{
|
||||
"paramName": "w",
|
||||
"paramLongName": "workingDirectory",
|
||||
"paramDescription": "working directory",
|
||||
"paramRequired": true
|
||||
},
|
||||
{
|
||||
"paramName": "nm",
|
||||
"paramLongName": "distcp_num_maps",
|
||||
"paramDescription": "maximum number of map tasks used in the distcp process",
|
||||
"paramRequired": true
|
||||
},
|
||||
{
|
||||
"paramName": "mm",
|
||||
"paramLongName": "distcp_memory_mb",
|
||||
"paramDescription": "memory for distcp action copying actionsets from remote cluster",
|
||||
"paramRequired": true
|
||||
},
|
||||
{
|
||||
"paramName": "tt",
|
||||
"paramLongName": "distcp_task_timeout",
|
||||
"paramDescription": "timeout for distcp copying actions from remote cluster",
|
||||
"paramRequired": true
|
||||
},
|
||||
{
|
||||
"paramName": "tr",
|
||||
"paramLongName": "transform_only",
|
||||
"paramDescription": "activate tranform-only mode. Only apply transformation step",
|
||||
"paramRequired": true
|
||||
}
|
||||
]
|
|
@ -0,0 +1,20 @@
|
|||
[
|
||||
{
|
||||
"paramName": "issm",
|
||||
"paramLongName": "isSparkSessionManaged",
|
||||
"paramDescription": "when true will stop SparkSession after job execution",
|
||||
"paramRequired": false
|
||||
},
|
||||
{
|
||||
"paramName": "is",
|
||||
"paramLongName": "isLookupUrl",
|
||||
"paramDescription": "URL of the isLookUp Service",
|
||||
"paramRequired": true
|
||||
},
|
||||
{
|
||||
"paramName": "i",
|
||||
"paramLongName": "inputPaths",
|
||||
"paramDescription": "URL of the isLookUp Service",
|
||||
"paramRequired": true
|
||||
}
|
||||
]
|
|
@ -10,7 +10,6 @@
|
|||
</property>
|
||||
<property>
|
||||
<name>workingDirectory</name>
|
||||
<value>/tmp/actionsets</value>
|
||||
<description>working directory</description>
|
||||
</property>
|
||||
<property>
|
||||
|
@ -44,6 +43,20 @@
|
|||
<name>sparkExecutorCores</name>
|
||||
<description>number of cores used by single executor</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>oozieActionShareLibForSpark2</name>
|
||||
<description>oozie action sharelib for spark 2.*</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>spark2ExtraListeners</name>
|
||||
<value>com.cloudera.spark.lineage.NavigatorAppListener</value>
|
||||
<description>spark 2.* extra listeners classname</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>spark2SqlQueryExecutionListeners</name>
|
||||
<value>com.cloudera.spark.lineage.NavigatorQueryListener</value>
|
||||
<description>spark 2.* sql query execution listeners classname</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>spark2YarnHistoryServerAddress</name>
|
||||
<description>spark 2.* yarn history server address</description>
|
||||
|
@ -66,23 +79,27 @@
|
|||
<name>oozie.launcher.mapred.job.queue.name</name>
|
||||
<value>${oozieLauncherQueueName}</value>
|
||||
</property>
|
||||
<property>
|
||||
<name>oozie.action.sharelib.for.spark</name>
|
||||
<value>${oozieActionShareLibForSpark2}</value>
|
||||
</property>
|
||||
</configuration>
|
||||
</global>
|
||||
|
||||
<start to='migrate_actionsets' />
|
||||
<start to="migrate_actionsets"/>
|
||||
|
||||
<action name='migrate_actionsets'>
|
||||
<action name="migrate_actionsets">
|
||||
<java>
|
||||
<main-class>eu.dnetlib.dhp.migration.actions.MigrateActionSet</main-class>
|
||||
<main-class>eu.dnetlib.dhp.actionmanager.migration.MigrateActionSet</main-class>
|
||||
<java-opt>-Dmapred.task.timeout=${distcp_task_timeout}</java-opt>
|
||||
<arg>-is</arg><arg>${isLookupUrl}</arg>
|
||||
<arg>-sn</arg><arg>${sourceNN}</arg>
|
||||
<arg>-tn</arg><arg>${nameNode}</arg>
|
||||
<arg>-w</arg><arg>${workingDirectory}</arg>
|
||||
<arg>-nm</arg><arg>${distcp_num_maps}</arg>
|
||||
<arg>-mm</arg><arg>${distcp_memory_mb}</arg>
|
||||
<arg>-tt</arg><arg>${distcp_task_timeout}</arg>
|
||||
<arg>-tr</arg><arg>${transform_only}</arg>
|
||||
<arg>--isLookupUrl</arg><arg>${isLookupUrl}</arg>
|
||||
<arg>--sourceNameNode</arg><arg>${sourceNN}</arg>
|
||||
<arg>--targetNameNode</arg><arg>${nameNode}</arg>
|
||||
<arg>--workingDirectory</arg><arg>${workingDirectory}</arg>
|
||||
<arg>--distcp_num_maps</arg><arg>${distcp_num_maps}</arg>
|
||||
<arg>--distcp_memory_mb</arg><arg>${distcp_memory_mb}</arg>
|
||||
<arg>--distcp_task_timeout</arg><arg>${distcp_task_timeout}</arg>
|
||||
<arg>--transform_only</arg><arg>${transform_only}</arg>
|
||||
<capture-output/>
|
||||
</java>
|
||||
<ok to="transform_actions" />
|
||||
|
@ -94,19 +111,18 @@
|
|||
<master>yarn</master>
|
||||
<mode>cluster</mode>
|
||||
<name>transform_actions</name>
|
||||
<class>eu.dnetlib.dhp.migration.actions.TransformActions</class>
|
||||
<jar>dhp-aggregation-${projectVersion}.jar</jar>
|
||||
<class>eu.dnetlib.dhp.actionmanager.migration.TransformActions</class>
|
||||
<jar>dhp-actionmanager-${projectVersion}.jar</jar>
|
||||
<spark-opts>
|
||||
--executor-cores ${sparkExecutorCores}
|
||||
--executor-memory ${sparkExecutorMemory}
|
||||
--executor-cores=${sparkExecutorCores}
|
||||
--executor-memory=${sparkExecutorMemory}
|
||||
--driver-memory=${sparkDriverMemory}
|
||||
--conf spark.extraListeners="com.cloudera.spark.lineage.NavigatorAppListener"
|
||||
--conf spark.sql.queryExecutionListeners="com.cloudera.spark.lineage.NavigatorQueryListener"
|
||||
--conf spark.extraListeners=${spark2ExtraListeners}
|
||||
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
||||
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
||||
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
||||
</spark-opts>
|
||||
<arg>-mt</arg><arg>yarn</arg>
|
||||
<arg>-is</arg><arg>${isLookupUrl}</arg>
|
||||
<arg>--isLookupUrl</arg><arg>${isLookupUrl}</arg>
|
||||
<arg>--inputPaths</arg><arg>${wf:actionData('migrate_actionsets')['target_paths']}</arg>
|
||||
</spark>
|
||||
<ok to="end"/>
|
|
@ -1,10 +1,10 @@
|
|||
{"collectedFrom":[],"dataInfo":{"deletedbyinference":true,"inferenceprovenance":"","inferred":false,"invisible":false,"provenanceaction":{"classid":"UNKNOWN","classname":"UNKNOWN","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"},"trust":"0.9"},"lastupdatetimestamp":0,"relClass":"provides","relType":"datasourceOrganization","source":"10|CRIS_UNS____::f66f1bd369679b5b077dcdf006089556","subRelType":"provision","target":"20|openaire____::d0bbea1f5bed5864d1904eb602e608a6"}
|
||||
{"collectedFrom":[],"dataInfo":{"deletedbyinference":false,"inferenceprovenance":"","inferred":false,"invisible":false,"provenanceaction":{"classid":"UNKNOWN","classname":"UNKNOWN","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"},"trust":"0.9"},"lastupdatetimestamp":0,"relClass":"provides","relType":"datasourceOrganization","source":"10|OpenstarTs__::f66f1bd369679b5b077dcdf006089556","subRelType":"provision","target":"20|openaire____::fc7459b8fed8c0d47947fe04275251c0"}
|
||||
{"collectedFrom":[],"dataInfo":{"deletedbyinference":false,"inferenceprovenance":"","inferred":false,"invisible":false,"provenanceaction":{"classid":"UNKNOWN","classname":"UNKNOWN","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"},"trust":"0.9"},"lastupdatetimestamp":0,"relClass":"provides","relType":"datasourceOrganization","source":"10|NARCIS__cris::f66f1bd369679b5b077dcdf006089556","subRelType":"provision","target":"20|openaire____::c978e29d3b2ddf4f0c2b6e60d6613426"}
|
||||
{"collectedFrom":[],"dataInfo":{"deletedbyinference":true,"inferenceprovenance":"","inferred":false,"invisible":false,"provenanceaction":{"classid":"UNKNOWN","classname":"UNKNOWN","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"},"trust":"0.9"},"lastupdatetimestamp":0,"relClass":"provides","relType":"datasourceOrganization","source":"10|MetisRadboud::f66f1bd369679b5b077dcdf006089556","subRelType":"provision","target":"20|openaire____::b58bdbe8ae5acead04fc76777d2f8017"}
|
||||
{"collectedFrom":[],"dataInfo":{"deletedbyinference":false,"inferenceprovenance":"","inferred":true,"invisible":false,"provenanceaction":{"classid":"UNKNOWN","classname":"UNKNOWN","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"},"trust":"0.9"},"lastupdatetimestamp":0,"relClass":"provides","relType":"datasourceOrganization","source":"10|MetisRadboud::f66f1bd369679b5b077dcdf006089556","subRelType":"provision","target":"20|dedup_wf_001::8de0f5a712997aafe0d794a53e51b75a"}
|
||||
{"collectedFrom":[],"dataInfo":{"deletedbyinference":false,"inferenceprovenance":"","inferred":false,"invisible":false,"provenanceaction":{"classid":"UNKNOWN","classname":"UNKNOWN","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"},"trust":"0.9"},"lastupdatetimestamp":0,"relClass":"provides","relType":"datasourceOrganization","source":"10|UnityFVG____::f66f1bd369679b5b077dcdf006089556","subRelType":"provision","target":"20|openaire____::89bab7c5a227fc27b2b9cadf475a6b71"}
|
||||
{"collectedFrom":[],"dataInfo":{"deletedbyinference":false,"inferenceprovenance":"","inferred":false,"invisible":false,"provenanceaction":{"classid":"UNKNOWN","classname":"UNKNOWN","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"},"trust":"0.9"},"lastupdatetimestamp":0,"relClass":"provides","relType":"datasourceOrganization","source":"10|4ScienceCRIS::f66f1bd369679b5b077dcdf006089556","subRelType":"provision","target":"20|openaire____::007a4870b31056f89b768cf508e1538e"}
|
||||
{"collectedFrom":[],"dataInfo":{"deletedbyinference":false,"inferenceprovenance":"","inferred":false,"invisible":false,"provenanceaction":{"classid":"UNKNOWN","classname":"UNKNOWN","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"},"trust":"0.9"},"lastupdatetimestamp":0,"relClass":"provides","relType":"datasourceOrganization","source":"10|VTTRsInSsCrs::f66f1bd369679b5b077dcdf006089556","subRelType":"provision","target":"20|openaire____::735915884eb439d42953372eaf934782"}
|
||||
{"collectedFrom":[],"dataInfo":{"deletedbyinference":false,"inferenceprovenance":"","inferred":true,"invisible":false,"provenanceaction":{"classid":"UNKNOWN","classname":"UNKNOWN","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"},"trust":"0.9"},"lastupdatetimestamp":0,"relClass":"provides","relType":"datasourceOrganization","source":"10|CRIS_UNS____::f66f1bd369679b5b077dcdf006089556","subRelType":"provision","target":"20|dedup_wf_001::9ea9c0996c87e1dc7fc69f94b5ed0010"}
|
||||
{"collectedFrom":[],"dataInfo":{"deletedbyinference":false,"inferenceprovenance":"","inferred":false,"invisible":false,"provenanceaction":{"classid":"UNKNOWN","classname":"UNKNOWN","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"},"trust":"0.9"},"lastupdatetimestamp":0,"relClass":"provides","relType":"datasourceOrganization","source":"10|CSC_________::a2b9ce8435390bcbfc05f3cae3948747","subRelType":"provision","target":"20|openaire____::c24a458004a31f9687089ea3d249de51"}
|
||||
{"collectedfrom":[],"dataInfo":{"deletedbyinference":true,"inferenceprovenance":"","inferred":false,"invisible":false,"provenanceaction":{"classid":"UNKNOWN","classname":"UNKNOWN","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"},"trust":"0.9"},"lastupdatetimestamp":0,"relClass":"provides","relType":"datasourceOrganization","source":"10|CRIS_UNS____::f66f1bd369679b5b077dcdf006089556","subRelType":"provision","target":"20|openaire____::d0bbea1f5bed5864d1904eb602e608a6"}
|
||||
{"collectedfrom":[],"dataInfo":{"deletedbyinference":false,"inferenceprovenance":"","inferred":false,"invisible":false,"provenanceaction":{"classid":"UNKNOWN","classname":"UNKNOWN","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"},"trust":"0.9"},"lastupdatetimestamp":0,"relClass":"provides","relType":"datasourceOrganization","source":"10|OpenstarTs__::f66f1bd369679b5b077dcdf006089556","subRelType":"provision","target":"20|openaire____::fc7459b8fed8c0d47947fe04275251c0"}
|
||||
{"collectedfrom":[],"dataInfo":{"deletedbyinference":false,"inferenceprovenance":"","inferred":false,"invisible":false,"provenanceaction":{"classid":"UNKNOWN","classname":"UNKNOWN","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"},"trust":"0.9"},"lastupdatetimestamp":0,"relClass":"provides","relType":"datasourceOrganization","source":"10|NARCIS__cris::f66f1bd369679b5b077dcdf006089556","subRelType":"provision","target":"20|openaire____::c978e29d3b2ddf4f0c2b6e60d6613426"}
|
||||
{"collectedfrom":[],"dataInfo":{"deletedbyinference":true,"inferenceprovenance":"","inferred":false,"invisible":false,"provenanceaction":{"classid":"UNKNOWN","classname":"UNKNOWN","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"},"trust":"0.9"},"lastupdatetimestamp":0,"relClass":"provides","relType":"datasourceOrganization","source":"10|MetisRadboud::f66f1bd369679b5b077dcdf006089556","subRelType":"provision","target":"20|openaire____::b58bdbe8ae5acead04fc76777d2f8017"}
|
||||
{"collectedfrom":[],"dataInfo":{"deletedbyinference":false,"inferenceprovenance":"","inferred":true,"invisible":false,"provenanceaction":{"classid":"UNKNOWN","classname":"UNKNOWN","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"},"trust":"0.9"},"lastupdatetimestamp":0,"relClass":"provides","relType":"datasourceOrganization","source":"10|MetisRadboud::f66f1bd369679b5b077dcdf006089556","subRelType":"provision","target":"20|dedup_wf_001::8de0f5a712997aafe0d794a53e51b75a"}
|
||||
{"collectedfrom":[],"dataInfo":{"deletedbyinference":false,"inferenceprovenance":"","inferred":false,"invisible":false,"provenanceaction":{"classid":"UNKNOWN","classname":"UNKNOWN","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"},"trust":"0.9"},"lastupdatetimestamp":0,"relClass":"provides","relType":"datasourceOrganization","source":"10|UnityFVG____::f66f1bd369679b5b077dcdf006089556","subRelType":"provision","target":"20|openaire____::89bab7c5a227fc27b2b9cadf475a6b71"}
|
||||
{"collectedfrom":[],"dataInfo":{"deletedbyinference":false,"inferenceprovenance":"","inferred":false,"invisible":false,"provenanceaction":{"classid":"UNKNOWN","classname":"UNKNOWN","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"},"trust":"0.9"},"lastupdatetimestamp":0,"relClass":"provides","relType":"datasourceOrganization","source":"10|4ScienceCRIS::f66f1bd369679b5b077dcdf006089556","subRelType":"provision","target":"20|openaire____::007a4870b31056f89b768cf508e1538e"}
|
||||
{"collectedfrom":[],"dataInfo":{"deletedbyinference":false,"inferenceprovenance":"","inferred":false,"invisible":false,"provenanceaction":{"classid":"UNKNOWN","classname":"UNKNOWN","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"},"trust":"0.9"},"lastupdatetimestamp":0,"relClass":"provides","relType":"datasourceOrganization","source":"10|VTTRsInSsCrs::f66f1bd369679b5b077dcdf006089556","subRelType":"provision","target":"20|openaire____::735915884eb439d42953372eaf934782"}
|
||||
{"collectedfrom":[],"dataInfo":{"deletedbyinference":false,"inferenceprovenance":"","inferred":true,"invisible":false,"provenanceaction":{"classid":"UNKNOWN","classname":"UNKNOWN","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"},"trust":"0.9"},"lastupdatetimestamp":0,"relClass":"provides","relType":"datasourceOrganization","source":"10|CRIS_UNS____::f66f1bd369679b5b077dcdf006089556","subRelType":"provision","target":"20|dedup_wf_001::9ea9c0996c87e1dc7fc69f94b5ed0010"}
|
||||
{"collectedfrom":[],"dataInfo":{"deletedbyinference":false,"inferenceprovenance":"","inferred":false,"invisible":false,"provenanceaction":{"classid":"UNKNOWN","classname":"UNKNOWN","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"},"trust":"0.9"},"lastupdatetimestamp":0,"relClass":"provides","relType":"datasourceOrganization","source":"10|CSC_________::a2b9ce8435390bcbfc05f3cae3948747","subRelType":"provision","target":"20|openaire____::c24a458004a31f9687089ea3d249de51"}
|
||||
|
|
|
@ -1,213 +0,0 @@
|
|||
package eu.dnetlib.dhp.migration.actions;
|
||||
|
||||
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.google.common.base.Splitter;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.protobuf.InvalidProtocolBufferException;
|
||||
import eu.dnetlib.data.proto.OafProtos;
|
||||
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
||||
import eu.dnetlib.dhp.schema.action.AtomicAction;
|
||||
import eu.dnetlib.dhp.schema.oaf.*;
|
||||
import eu.dnetlib.dhp.utils.ISLookupClientFactory;
|
||||
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
|
||||
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
|
||||
import java.io.IOException;
|
||||
import java.io.Serializable;
|
||||
import java.util.LinkedList;
|
||||
import java.util.Objects;
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.io.Text;
|
||||
import org.apache.hadoop.io.compress.GzipCodec;
|
||||
import org.apache.spark.SparkConf;
|
||||
import org.apache.spark.api.java.JavaSparkContext;
|
||||
import org.apache.spark.sql.SparkSession;
|
||||
|
||||
public class TransformActions implements Serializable {
|
||||
|
||||
private static final Log log = LogFactory.getLog(TransformActions.class);
|
||||
private static final String SEPARATOR = "/";
|
||||
|
||||
public static void main(String[] args) throws Exception {
|
||||
final ArgumentApplicationParser parser =
|
||||
new ArgumentApplicationParser(
|
||||
IOUtils.toString(
|
||||
MigrateActionSet.class.getResourceAsStream(
|
||||
"/eu/dnetlib/dhp/migration/transform_actionsets_parameters.json")));
|
||||
parser.parseArgument(args);
|
||||
|
||||
new TransformActions().run(parser);
|
||||
}
|
||||
|
||||
private void run(ArgumentApplicationParser parser) throws ISLookUpException, IOException {
|
||||
|
||||
final String isLookupUrl = parser.get("isLookupUrl");
|
||||
log.info("isLookupUrl: " + isLookupUrl);
|
||||
|
||||
final String inputPaths = parser.get("inputPaths");
|
||||
|
||||
if (StringUtils.isBlank(inputPaths)) {
|
||||
throw new RuntimeException("empty inputPaths");
|
||||
}
|
||||
log.info("inputPaths: " + inputPaths);
|
||||
|
||||
final String targetBaseDir = getTargetBaseDir(isLookupUrl);
|
||||
|
||||
try (SparkSession spark = getSparkSession(parser)) {
|
||||
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
|
||||
final FileSystem fs = FileSystem.get(spark.sparkContext().hadoopConfiguration());
|
||||
|
||||
for (String sourcePath : Lists.newArrayList(Splitter.on(",").split(inputPaths))) {
|
||||
|
||||
LinkedList<String> pathQ =
|
||||
Lists.newLinkedList(Splitter.on(SEPARATOR).split(sourcePath));
|
||||
|
||||
final String rawset = pathQ.pollLast();
|
||||
final String actionSetDirectory = pathQ.pollLast();
|
||||
|
||||
final Path targetDirectory =
|
||||
new Path(
|
||||
targetBaseDir
|
||||
+ SEPARATOR
|
||||
+ actionSetDirectory
|
||||
+ SEPARATOR
|
||||
+ rawset);
|
||||
|
||||
if (fs.exists(targetDirectory)) {
|
||||
log.info(String.format("found target directory '%s", targetDirectory));
|
||||
fs.delete(targetDirectory, true);
|
||||
log.info(String.format("deleted target directory '%s", targetDirectory));
|
||||
}
|
||||
|
||||
log.info(
|
||||
String.format(
|
||||
"transforming actions from '%s' to '%s'",
|
||||
sourcePath, targetDirectory));
|
||||
|
||||
sc.sequenceFile(sourcePath, Text.class, Text.class)
|
||||
.map(
|
||||
a ->
|
||||
eu.dnetlib.actionmanager.actions.AtomicAction.fromJSON(
|
||||
a._2().toString()))
|
||||
.map(a -> doTransform(a))
|
||||
.filter(Objects::isNull)
|
||||
.filter(a -> a.getPayload() == null)
|
||||
.map(a -> new ObjectMapper().writeValueAsString(a))
|
||||
.saveAsTextFile(targetDirectory.toString(), GzipCodec.class);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private Text transformAction(eu.dnetlib.actionmanager.actions.AtomicAction aa)
|
||||
throws InvalidProtocolBufferException, JsonProcessingException {
|
||||
final Text out = new Text();
|
||||
final ObjectMapper mapper = new ObjectMapper();
|
||||
if (aa.getTargetValue() != null && aa.getTargetValue().length > 0) {
|
||||
out.set(mapper.writeValueAsString(doTransform(aa)));
|
||||
}
|
||||
return out;
|
||||
}
|
||||
|
||||
private AtomicAction<Relation> getRelationAtomicAction(String atomicaActionId) {
|
||||
final String[] splitId = atomicaActionId.split("@");
|
||||
|
||||
String source = splitId[0];
|
||||
String target = splitId[2];
|
||||
|
||||
String[] relSemantic = splitId[1].split("_");
|
||||
|
||||
Relation rel = new Relation();
|
||||
rel.setSource(source);
|
||||
rel.setTarget(target);
|
||||
rel.setRelType(relSemantic[0]);
|
||||
rel.setSubRelType(relSemantic[1]);
|
||||
rel.setRelClass(relSemantic[2]);
|
||||
|
||||
DataInfo d = new DataInfo();
|
||||
d.setDeletedbyinference(false);
|
||||
d.setInferenceprovenance("deduplication");
|
||||
d.setInferred(true);
|
||||
d.setInvisible(false);
|
||||
Qualifier provenanceaction = new Qualifier();
|
||||
|
||||
provenanceaction.setClassid("deduplication");
|
||||
provenanceaction.setClassname("deduplication");
|
||||
provenanceaction.setSchemeid("dnet:provenanceActions");
|
||||
provenanceaction.setSchemename("dnet:provenanceActions");
|
||||
|
||||
d.setProvenanceaction(provenanceaction);
|
||||
|
||||
rel.setDataInfo(d);
|
||||
|
||||
return new AtomicAction<>(Relation.class, rel);
|
||||
}
|
||||
|
||||
private AtomicAction doTransform(eu.dnetlib.actionmanager.actions.AtomicAction aa)
|
||||
throws InvalidProtocolBufferException {
|
||||
final OafProtos.Oaf proto_oaf = OafProtos.Oaf.parseFrom(aa.getTargetValue());
|
||||
final Oaf oaf = ProtoConverter.convert(proto_oaf);
|
||||
switch (proto_oaf.getKind()) {
|
||||
case entity:
|
||||
switch (proto_oaf.getEntity().getType()) {
|
||||
case datasource:
|
||||
return new AtomicAction<>(Datasource.class, (Datasource) oaf);
|
||||
case organization:
|
||||
return new AtomicAction<>(Organization.class, (Organization) oaf);
|
||||
case project:
|
||||
return new AtomicAction<>(Project.class, (Project) oaf);
|
||||
case result:
|
||||
final String resulttypeid =
|
||||
proto_oaf
|
||||
.getEntity()
|
||||
.getResult()
|
||||
.getMetadata()
|
||||
.getResulttype()
|
||||
.getClassid();
|
||||
switch (resulttypeid) {
|
||||
case "publication":
|
||||
return new AtomicAction<>(Publication.class, (Publication) oaf);
|
||||
case "software":
|
||||
return new AtomicAction<>(Software.class, (Software) oaf);
|
||||
case "other":
|
||||
return new AtomicAction<>(
|
||||
OtherResearchProduct.class, (OtherResearchProduct) oaf);
|
||||
case "dataset":
|
||||
return new AtomicAction<>(Dataset.class, (Dataset) oaf);
|
||||
default:
|
||||
// can be an update, where the resulttype is not specified
|
||||
return new AtomicAction<>(Result.class, (Result) oaf);
|
||||
}
|
||||
default:
|
||||
throw new IllegalArgumentException(
|
||||
"invalid entity type: " + proto_oaf.getEntity().getType());
|
||||
}
|
||||
case relation:
|
||||
return new AtomicAction<>(Relation.class, (Relation) oaf);
|
||||
default:
|
||||
throw new IllegalArgumentException("invalid kind: " + proto_oaf.getKind());
|
||||
}
|
||||
}
|
||||
|
||||
private String getTargetBaseDir(String isLookupUrl) throws ISLookUpException {
|
||||
ISLookUpService isLookUp = ISLookupClientFactory.getLookUpService(isLookupUrl);
|
||||
String XQUERY =
|
||||
"collection('/db/DRIVER/ServiceResources/ActionManagerServiceResourceType')//SERVICE_PROPERTIES/PROPERTY[@key = 'basePath']/@value/string()";
|
||||
return isLookUp.getResourceProfileByQuery(XQUERY);
|
||||
}
|
||||
|
||||
private static SparkSession getSparkSession(ArgumentApplicationParser parser) {
|
||||
SparkConf conf = new SparkConf();
|
||||
|
||||
return SparkSession.builder()
|
||||
.appName(TransformActions.class.getSimpleName())
|
||||
.master(parser.get("master"))
|
||||
.config(conf)
|
||||
.enableHiveSupport()
|
||||
.getOrCreate();
|
||||
}
|
||||
}
|
|
@ -1,10 +0,0 @@
|
|||
[
|
||||
{"paramName":"is", "paramLongName":"isLookupUrl", "paramDescription": "URL of the isLookUp Service", "paramRequired": true},
|
||||
{"paramName":"sn", "paramLongName":"sourceNameNode", "paramDescription": "nameNode of the source cluster", "paramRequired": true},
|
||||
{"paramName":"tn", "paramLongName":"targetNameNode", "paramDescription": "namoNode of the target cluster", "paramRequired": true},
|
||||
{"paramName":"w", "paramLongName":"workingDirectory", "paramDescription": "working directory", "paramRequired": true},
|
||||
{"paramName":"nm", "paramLongName":"distcp_num_maps", "paramDescription": "maximum number of map tasks used in the distcp process", "paramRequired": true},
|
||||
{"paramName":"mm", "paramLongName":"distcp_memory_mb", "paramDescription": "memory for distcp action copying actionsets from remote cluster", "paramRequired": true},
|
||||
{"paramName":"tt", "paramLongName":"distcp_task_timeout", "paramDescription": "timeout for distcp copying actions from remote cluster", "paramRequired": true},
|
||||
{"paramName":"tr", "paramLongName":"transform_only", "paramDescription": "activate tranform-only mode. Only apply transformation step", "paramRequired": true}
|
||||
]
|
|
@ -1,5 +0,0 @@
|
|||
[
|
||||
{"paramName":"mt", "paramLongName":"master", "paramDescription": "should be local or yarn", "paramRequired": true},
|
||||
{"paramName":"is", "paramLongName":"isLookupUrl", "paramDescription": "URL of the isLookUp Service", "paramRequired": true},
|
||||
{"paramName":"i", "paramLongName":"inputPaths", "paramDescription": "URL of the isLookUp Service", "paramRequired": true}
|
||||
]
|
|
@ -5,29 +5,29 @@ import com.fasterxml.jackson.databind.ObjectMapper;
|
|||
import com.google.common.collect.Lists;
|
||||
import eu.dnetlib.dhp.schema.common.ModelSupport;
|
||||
import eu.dnetlib.dhp.schema.oaf.*;
|
||||
import eu.dnetlib.pace.config.DedupConfig;
|
||||
import java.util.Collection;
|
||||
import java.util.Iterator;
|
||||
import org.apache.spark.api.java.function.FilterFunction;
|
||||
import org.apache.spark.api.java.function.MapFunction;
|
||||
import org.apache.spark.api.java.function.MapGroupsFunction;
|
||||
import org.apache.spark.sql.Dataset;
|
||||
import org.apache.spark.sql.Encoders;
|
||||
import org.apache.spark.sql.SparkSession;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import scala.Tuple2;
|
||||
|
||||
public class DedupRecordFactory {
|
||||
|
||||
private static final Logger log = LoggerFactory.getLogger(DedupRecordFactory.class);
|
||||
|
||||
protected static final ObjectMapper OBJECT_MAPPER =
|
||||
new com.fasterxml.jackson.databind.ObjectMapper()
|
||||
.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
|
||||
new ObjectMapper().configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
|
||||
|
||||
public static <T extends OafEntity> Dataset<T> createDedupRecord(
|
||||
final SparkSession spark,
|
||||
final String mergeRelsInputPath,
|
||||
final String entitiesInputPath,
|
||||
final Class<T> clazz,
|
||||
final DedupConfig dedupConf) {
|
||||
final Class<T> clazz) {
|
||||
|
||||
long ts = System.currentTimeMillis();
|
||||
|
||||
|
@ -54,40 +54,39 @@ public class DedupRecordFactory {
|
|||
r -> new Tuple2<>(r.getSource(), r.getTarget()),
|
||||
Encoders.tuple(Encoders.STRING(), Encoders.STRING()));
|
||||
|
||||
// <dedup_id, json_entity_merged>
|
||||
return mergeRels
|
||||
.joinWith(entities, mergeRels.col("_1").equalTo(entities.col("_1")), "left_outer")
|
||||
.filter(
|
||||
(FilterFunction<Tuple2<Tuple2<String, String>, Tuple2<String, T>>>)
|
||||
value -> value._2() != null)
|
||||
.joinWith(entities, mergeRels.col("_2").equalTo(entities.col("_1")), "inner")
|
||||
.map(
|
||||
(MapFunction<Tuple2<Tuple2<String, String>, Tuple2<String, T>>, T>)
|
||||
value -> value._2()._2(),
|
||||
Encoders.kryo(clazz))
|
||||
.groupByKey((MapFunction<T, String>) value -> value.getId(), Encoders.STRING())
|
||||
(MapFunction<
|
||||
Tuple2<Tuple2<String, String>, Tuple2<String, T>>,
|
||||
Tuple2<String, T>>)
|
||||
value -> new Tuple2<>(value._1()._1(), value._2()._2()),
|
||||
Encoders.tuple(Encoders.STRING(), Encoders.kryo(clazz)))
|
||||
.groupByKey(
|
||||
(MapFunction<Tuple2<String, T>, String>) entity -> entity._1(),
|
||||
Encoders.STRING())
|
||||
.mapGroups(
|
||||
(MapGroupsFunction<String, T, T>)
|
||||
(MapGroupsFunction<String, Tuple2<String, T>, T>)
|
||||
(key, values) -> entityMerger(key, values, ts, clazz),
|
||||
Encoders.bean(clazz));
|
||||
}
|
||||
|
||||
private static <T extends OafEntity> T entityMerger(
|
||||
String id, Iterator<T> entities, final long ts, Class<T> clazz) {
|
||||
String id, Iterator<Tuple2<String, T>> entities, long ts, Class<T> clazz) {
|
||||
try {
|
||||
T entity = clazz.newInstance();
|
||||
entity.setId(id);
|
||||
if (entity.getDataInfo() == null) {
|
||||
entity.setDataInfo(new DataInfo());
|
||||
}
|
||||
entity.setDataInfo(new DataInfo());
|
||||
entity.getDataInfo().setTrust("0.9");
|
||||
entity.setLastupdatetimestamp(ts);
|
||||
|
||||
final Collection<String> dates = Lists.newArrayList();
|
||||
entities.forEachRemaining(
|
||||
e -> {
|
||||
entity.mergeFrom(e);
|
||||
if (ModelSupport.isSubClass(e, Result.class)) {
|
||||
Result r1 = (Result) e;
|
||||
t -> {
|
||||
T duplicate = t._2();
|
||||
entity.mergeFrom(duplicate);
|
||||
if (ModelSupport.isSubClass(duplicate, Result.class)) {
|
||||
Result r1 = (Result) duplicate;
|
||||
Result er = (Result) entity;
|
||||
er.setAuthor(DedupUtility.mergeAuthor(er.getAuthor(), r1.getAuthor()));
|
||||
|
||||
|
|
|
@ -11,8 +11,6 @@ import eu.dnetlib.pace.config.DedupConfig;
|
|||
import java.io.IOException;
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import org.apache.spark.SparkConf;
|
||||
import org.apache.spark.api.java.function.MapFunction;
|
||||
import org.apache.spark.sql.Encoders;
|
||||
import org.apache.spark.sql.SaveMode;
|
||||
import org.apache.spark.sql.SparkSession;
|
||||
import org.dom4j.DocumentException;
|
||||
|
@ -71,13 +69,10 @@ public class SparkCreateDedupRecord extends AbstractSparkAction {
|
|||
|
||||
Class<OafEntity> clazz = ModelSupport.entityTypes.get(EntityType.valueOf(subEntity));
|
||||
|
||||
DedupRecordFactory.createDedupRecord(spark, mergeRelPath, entityPath, clazz, dedupConf)
|
||||
.map(
|
||||
(MapFunction<OafEntity, String>)
|
||||
value -> OBJECT_MAPPER.writeValueAsString(value),
|
||||
Encoders.STRING())
|
||||
DedupRecordFactory.createDedupRecord(spark, mergeRelPath, entityPath, clazz)
|
||||
.write()
|
||||
.mode(SaveMode.Overwrite)
|
||||
.option("compression", "gzip")
|
||||
.json(outputPath);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -33,7 +33,7 @@ public class SparkUpdateEntity extends AbstractSparkAction {
|
|||
|
||||
private static final Logger log = LoggerFactory.getLogger(SparkUpdateEntity.class);
|
||||
|
||||
final String IDJSONPATH = "$.id";
|
||||
private static final String IDJSONPATH = "$.id";
|
||||
|
||||
public SparkUpdateEntity(ArgumentApplicationParser parser, SparkSession spark) {
|
||||
super(parser, spark);
|
||||
|
@ -65,27 +65,25 @@ public class SparkUpdateEntity extends AbstractSparkAction {
|
|||
log.info("workingPath: '{}'", workingPath);
|
||||
log.info("dedupGraphPath: '{}'", dedupGraphPath);
|
||||
|
||||
final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
|
||||
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
|
||||
|
||||
// for each entity
|
||||
ModelSupport.entityTypes.forEach(
|
||||
(entity, clazz) -> {
|
||||
final String outputPath = dedupGraphPath + "/" + entity;
|
||||
(type, clazz) -> {
|
||||
final String outputPath = dedupGraphPath + "/" + type;
|
||||
removeOutputDir(spark, outputPath);
|
||||
|
||||
JavaRDD<String> sourceEntity =
|
||||
sc.textFile(
|
||||
DedupUtility.createEntityPath(
|
||||
graphBasePath, entity.toString()));
|
||||
DedupUtility.createEntityPath(graphBasePath, type.toString()));
|
||||
|
||||
if (mergeRelExists(workingPath, entity.toString())) {
|
||||
if (mergeRelExists(workingPath, type.toString())) {
|
||||
|
||||
final String mergeRelPath =
|
||||
DedupUtility.createMergeRelPath(
|
||||
workingPath, "*", entity.toString());
|
||||
DedupUtility.createMergeRelPath(workingPath, "*", type.toString());
|
||||
final String dedupRecordPath =
|
||||
DedupUtility.createDedupRecordPath(
|
||||
workingPath, "*", entity.toString());
|
||||
workingPath, "*", type.toString());
|
||||
|
||||
final Dataset<Relation> rel =
|
||||
spark.read().load(mergeRelPath).as(Encoders.bean(Relation.class));
|
||||
|
@ -107,7 +105,6 @@ public class SparkUpdateEntity extends AbstractSparkAction {
|
|||
MapDocumentUtil.getJPathString(
|
||||
IDJSONPATH, s),
|
||||
s));
|
||||
|
||||
JavaRDD<String> map =
|
||||
entitiesWithId
|
||||
.leftOuterJoin(mergedIds)
|
||||
|
|
|
@ -8,6 +8,7 @@ import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
|||
import eu.dnetlib.dhp.schema.oaf.Relation;
|
||||
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
|
||||
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
|
||||
import eu.dnetlib.pace.util.MapDocumentUtil;
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.io.Serializable;
|
||||
|
@ -16,14 +17,20 @@ import java.nio.file.Paths;
|
|||
import org.apache.commons.io.FileUtils;
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import org.apache.spark.SparkConf;
|
||||
import org.apache.spark.api.java.JavaPairRDD;
|
||||
import org.apache.spark.api.java.JavaRDD;
|
||||
import org.apache.spark.api.java.JavaSparkContext;
|
||||
import org.apache.spark.api.java.function.PairFunction;
|
||||
import org.apache.spark.sql.Dataset;
|
||||
import org.apache.spark.sql.Encoders;
|
||||
import org.apache.spark.sql.Row;
|
||||
import org.apache.spark.sql.SparkSession;
|
||||
import org.junit.jupiter.api.*;
|
||||
import org.junit.jupiter.api.extension.ExtendWith;
|
||||
import org.mockito.Mock;
|
||||
import org.mockito.Mockito;
|
||||
import org.mockito.junit.jupiter.MockitoExtension;
|
||||
import scala.Tuple2;
|
||||
|
||||
@ExtendWith(MockitoExtension.class)
|
||||
@TestMethodOrder(MethodOrderer.OrderAnnotation.class)
|
||||
|
@ -41,7 +48,7 @@ public class SparkDedupTest implements Serializable {
|
|||
private static final String testActionSetId = "test-orchestrator";
|
||||
|
||||
@BeforeAll
|
||||
private static void cleanUp() throws IOException, URISyntaxException {
|
||||
public static void cleanUp() throws IOException, URISyntaxException {
|
||||
|
||||
testGraphBasePath =
|
||||
Paths.get(
|
||||
|
@ -65,7 +72,7 @@ public class SparkDedupTest implements Serializable {
|
|||
|
||||
spark =
|
||||
SparkSession.builder()
|
||||
.appName(SparkCreateSimRels.class.getSimpleName())
|
||||
.appName(SparkDedupTest.class.getSimpleName())
|
||||
.master("local[*]")
|
||||
.config(new SparkConf())
|
||||
.getOrCreate();
|
||||
|
@ -74,7 +81,7 @@ public class SparkDedupTest implements Serializable {
|
|||
}
|
||||
|
||||
@BeforeEach
|
||||
private void setUp() throws IOException, ISLookUpException {
|
||||
public void setUp() throws IOException, ISLookUpException {
|
||||
|
||||
lenient()
|
||||
.when(isLookUpService.getResourceProfileByQuery(Mockito.contains(testActionSetId)))
|
||||
|
@ -96,6 +103,13 @@ public class SparkDedupTest implements Serializable {
|
|||
IOUtils.toString(
|
||||
SparkDedupTest.class.getResourceAsStream(
|
||||
"/eu/dnetlib/dhp/dedup/conf/pub.curr.conf.json")));
|
||||
|
||||
lenient()
|
||||
.when(isLookUpService.getResourceProfileByQuery(Mockito.contains("software")))
|
||||
.thenReturn(
|
||||
IOUtils.toString(
|
||||
SparkDedupTest.class.getResourceAsStream(
|
||||
"/eu/dnetlib/dhp/dedup/conf/sw.curr.conf.json")));
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -109,7 +123,6 @@ public class SparkDedupTest implements Serializable {
|
|||
"/eu/dnetlib/dhp/oa/dedup/createSimRels_parameters.json")));
|
||||
parser.parseArgument(
|
||||
new String[] {
|
||||
"-mt", "local[*]",
|
||||
"-i", testGraphBasePath,
|
||||
"-asi", testActionSetId,
|
||||
"-la", "lookupurl",
|
||||
|
@ -126,9 +139,14 @@ public class SparkDedupTest implements Serializable {
|
|||
spark.read()
|
||||
.load(testOutputBasePath + "/" + testActionSetId + "/publication_simrel")
|
||||
.count();
|
||||
long sw_simrel =
|
||||
spark.read()
|
||||
.load(testOutputBasePath + "/" + testActionSetId + "/software_simrel")
|
||||
.count();
|
||||
|
||||
assertEquals(3288, orgs_simrel);
|
||||
assertEquals(3432, orgs_simrel);
|
||||
assertEquals(7260, pubs_simrel);
|
||||
assertEquals(344, sw_simrel);
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -142,7 +160,6 @@ public class SparkDedupTest implements Serializable {
|
|||
"/eu/dnetlib/dhp/oa/dedup/createCC_parameters.json")));
|
||||
parser.parseArgument(
|
||||
new String[] {
|
||||
"-mt", "local[*]",
|
||||
"-i", testGraphBasePath,
|
||||
"-asi", testActionSetId,
|
||||
"-la", "lookupurl",
|
||||
|
@ -159,9 +176,14 @@ public class SparkDedupTest implements Serializable {
|
|||
spark.read()
|
||||
.load(testOutputBasePath + "/" + testActionSetId + "/publication_mergerel")
|
||||
.count();
|
||||
long sw_mergerel =
|
||||
spark.read()
|
||||
.load(testOutputBasePath + "/" + testActionSetId + "/software_mergerel")
|
||||
.count();
|
||||
|
||||
assertEquals(1244, orgs_mergerel);
|
||||
assertEquals(1276, orgs_mergerel);
|
||||
assertEquals(1460, pubs_mergerel);
|
||||
assertEquals(288, sw_mergerel);
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -175,7 +197,6 @@ public class SparkDedupTest implements Serializable {
|
|||
"/eu/dnetlib/dhp/oa/dedup/createDedupRecord_parameters.json")));
|
||||
parser.parseArgument(
|
||||
new String[] {
|
||||
"-mt", "local[*]",
|
||||
"-i", testGraphBasePath,
|
||||
"-asi", testActionSetId,
|
||||
"-la", "lookupurl",
|
||||
|
@ -198,9 +219,13 @@ public class SparkDedupTest implements Serializable {
|
|||
+ testActionSetId
|
||||
+ "/publication_deduprecord")
|
||||
.count();
|
||||
long sw_deduprecord =
|
||||
jsc.textFile(testOutputBasePath + "/" + testActionSetId + "/software_deduprecord")
|
||||
.count();
|
||||
|
||||
assertEquals(82, orgs_deduprecord);
|
||||
assertEquals(66, pubs_deduprecord);
|
||||
assertEquals(51, sw_deduprecord);
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -214,7 +239,6 @@ public class SparkDedupTest implements Serializable {
|
|||
"/eu/dnetlib/dhp/oa/dedup/updateEntity_parameters.json")));
|
||||
parser.parseArgument(
|
||||
new String[] {
|
||||
"-mt", "local[*]",
|
||||
"-i", testGraphBasePath,
|
||||
"-w", testOutputBasePath,
|
||||
"-o", testDedupGraphBasePath
|
||||
|
@ -224,6 +248,9 @@ public class SparkDedupTest implements Serializable {
|
|||
|
||||
long organizations = jsc.textFile(testDedupGraphBasePath + "/organization").count();
|
||||
long publications = jsc.textFile(testDedupGraphBasePath + "/publication").count();
|
||||
long projects = jsc.textFile(testDedupGraphBasePath + "/project").count();
|
||||
long datasource = jsc.textFile(testDedupGraphBasePath + "/datasource").count();
|
||||
long softwares = jsc.textFile(testDedupGraphBasePath + "/software").count();
|
||||
|
||||
long mergedOrgs =
|
||||
spark.read()
|
||||
|
@ -245,20 +272,40 @@ public class SparkDedupTest implements Serializable {
|
|||
.distinct()
|
||||
.count();
|
||||
|
||||
long mergedSw =
|
||||
spark.read()
|
||||
.load(testOutputBasePath + "/" + testActionSetId + "/software_mergerel")
|
||||
.as(Encoders.bean(Relation.class))
|
||||
.where("relClass=='merges'")
|
||||
.javaRDD()
|
||||
.map(Relation::getTarget)
|
||||
.distinct()
|
||||
.count();
|
||||
|
||||
assertEquals(897, publications);
|
||||
assertEquals(835, organizations);
|
||||
assertEquals(100, projects);
|
||||
assertEquals(100, datasource);
|
||||
assertEquals(200, softwares);
|
||||
|
||||
long deletedOrgs =
|
||||
jsc.textFile(testDedupGraphBasePath + "/organization")
|
||||
.filter(this::isDeletedByInference)
|
||||
.count();
|
||||
|
||||
long deletedPubs =
|
||||
jsc.textFile(testDedupGraphBasePath + "/publication")
|
||||
.filter(this::isDeletedByInference)
|
||||
.count();
|
||||
|
||||
long deletedSw =
|
||||
jsc.textFile(testDedupGraphBasePath + "/software")
|
||||
.filter(this::isDeletedByInference)
|
||||
.count();
|
||||
|
||||
assertEquals(mergedOrgs, deletedOrgs);
|
||||
assertEquals(mergedPubs, deletedPubs);
|
||||
assertEquals(mergedSw, deletedSw);
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -272,7 +319,6 @@ public class SparkDedupTest implements Serializable {
|
|||
"/eu/dnetlib/dhp/oa/dedup/propagateRelation_parameters.json")));
|
||||
parser.parseArgument(
|
||||
new String[] {
|
||||
"-mt", "local[*]",
|
||||
"-i", testGraphBasePath,
|
||||
"-w", testOutputBasePath,
|
||||
"-o", testDedupGraphBasePath
|
||||
|
@ -283,6 +329,43 @@ public class SparkDedupTest implements Serializable {
|
|||
long relations = jsc.textFile(testDedupGraphBasePath + "/relation").count();
|
||||
|
||||
assertEquals(826, relations);
|
||||
|
||||
// check deletedbyinference
|
||||
final Dataset<Relation> mergeRels =
|
||||
spark.read()
|
||||
.load(DedupUtility.createMergeRelPath(testOutputBasePath, "*", "*"))
|
||||
.as(Encoders.bean(Relation.class));
|
||||
final JavaPairRDD<String, String> mergedIds =
|
||||
mergeRels
|
||||
.where("relClass == 'merges'")
|
||||
.select(mergeRels.col("target"))
|
||||
.distinct()
|
||||
.toJavaRDD()
|
||||
.mapToPair(
|
||||
(PairFunction<Row, String, String>)
|
||||
r -> new Tuple2<String, String>(r.getString(0), "d"));
|
||||
|
||||
JavaRDD<String> toCheck =
|
||||
jsc.textFile(testDedupGraphBasePath + "/relation")
|
||||
.mapToPair(
|
||||
json ->
|
||||
new Tuple2<>(
|
||||
MapDocumentUtil.getJPathString("$.source", json),
|
||||
json))
|
||||
.join(mergedIds)
|
||||
.map(t -> t._2()._1())
|
||||
.mapToPair(
|
||||
json ->
|
||||
new Tuple2<>(
|
||||
MapDocumentUtil.getJPathString("$.target", json),
|
||||
json))
|
||||
.join(mergedIds)
|
||||
.map(t -> t._2()._1());
|
||||
|
||||
long deletedbyinference = toCheck.filter(this::isDeletedByInference).count();
|
||||
long updated = toCheck.count();
|
||||
|
||||
assertEquals(updated, deletedbyinference);
|
||||
}
|
||||
|
||||
@AfterAll
|
||||
|
|
|
@ -0,0 +1,118 @@
|
|||
{
|
||||
"wf" : {
|
||||
"threshold" : "0.99",
|
||||
"dedupRun" : "001",
|
||||
"entityType" : "result",
|
||||
"subEntityType" : "resulttype",
|
||||
"subEntityValue" : "dataset",
|
||||
"orderField" : "title",
|
||||
"queueMaxSize" : "2000",
|
||||
"groupMaxSize" : "100",
|
||||
"maxChildren" : "100",
|
||||
"slidingWindowSize" : "200",
|
||||
"rootBuilder" : ["result", "resultProject_outcome_isProducedBy", "resultResult_publicationDataset_isRelatedTo", "resultResult_similarity_isAmongTopNSimilarDocuments", "resultResult_similarity_hasAmongTopNSimilarDocuments", "resultOrganization_affiliation_hasAuthorInstitution", "resultResult_part_hasPart", "resultResult_part_isPartOf", "resultResult_supplement_isSupplementTo", "resultResult_supplement_isSupplementedBy", "resultResult_version_isVersionOf" ],
|
||||
"includeChildren" : "true"
|
||||
},
|
||||
"pace" : {
|
||||
"clustering" : [
|
||||
{ "name" : "ngrampairs", "fields" : [ "title" ], "params" : { "max" : "1", "ngramLen" : "3"} },
|
||||
{ "name" : "suffixprefix", "fields" : [ "title" ], "params" : { "max" : "1", "len" : "3" } },
|
||||
{ "name" : "lowercase", "fields" : [ "doi" ], "params" : { } }
|
||||
],
|
||||
"decisionTree" : {
|
||||
"start" : {
|
||||
"fields": [
|
||||
{
|
||||
"field": "pid",
|
||||
"comparator": "jsonListMatch",
|
||||
"weight": 1.0,
|
||||
"countIfUndefined": "false",
|
||||
"params": {
|
||||
"jpath_value": "$.value",
|
||||
"jpath_classid": "$.qualifier.classid"
|
||||
}
|
||||
}
|
||||
],
|
||||
"threshold": 0.5,
|
||||
"aggregation": "AVG",
|
||||
"positive": "MATCH",
|
||||
"negative": "layer2",
|
||||
"undefined": "layer2",
|
||||
"ignoreUndefined": "true"
|
||||
},
|
||||
"layer2" : {
|
||||
"fields": [
|
||||
{
|
||||
"field": "title",
|
||||
"comparator": "titleVersionMatch",
|
||||
"weight": 1.0,
|
||||
"countIfUndefined": "false",
|
||||
"params": {}
|
||||
},
|
||||
{
|
||||
"field": "authors",
|
||||
"comparator": "sizeMatch",
|
||||
"weight": 1.0,
|
||||
"countIfUndefined": "false",
|
||||
"params": {}
|
||||
}
|
||||
],
|
||||
"threshold": 1.0,
|
||||
"aggregation": "AND",
|
||||
"positive": "layer3",
|
||||
"negative": "NO_MATCH",
|
||||
"undefined": "layer3",
|
||||
"ignoreUndefined": "false"
|
||||
},
|
||||
"layer3" : {
|
||||
"fields": [
|
||||
{
|
||||
"field": "title",
|
||||
"comparator": "levensteinTitle",
|
||||
"weight": 1.0,
|
||||
"countIfUndefined": "true"
|
||||
}
|
||||
],
|
||||
"threshold": 0.99,
|
||||
"aggregation": "AVG",
|
||||
"positive": "MATCH",
|
||||
"negative": "NO_MATCH",
|
||||
"undefined": "NO_MATCH",
|
||||
"ignoreUndefined": "true"
|
||||
}
|
||||
},
|
||||
"model" : [
|
||||
{
|
||||
"name" : "doi",
|
||||
"type" : "String",
|
||||
"path" : "$.pid[@.qualifier.classid = 'doi'].value"
|
||||
},
|
||||
{
|
||||
"name" : "pid",
|
||||
"type" : "JSON",
|
||||
"path" : "$.pid",
|
||||
"overrideMatch" : "true"
|
||||
},
|
||||
{
|
||||
"name" : "title",
|
||||
"type" : "String",
|
||||
"path" : "$.title[@.qualifier.classid = 'main title'].value",
|
||||
"length" : 250,
|
||||
"size" : 5
|
||||
},
|
||||
{
|
||||
"name" : "authors",
|
||||
"type" : "String",
|
||||
"path" : "$.author[*].fullname",
|
||||
"size" : 200
|
||||
},
|
||||
{
|
||||
"name" : "resulttype",
|
||||
"type" : "String",
|
||||
"path" : "$.resulttype.classid"
|
||||
}
|
||||
],
|
||||
"blacklists" : {},
|
||||
"synonyms" : {}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,118 @@
|
|||
{
|
||||
"wf" : {
|
||||
"threshold" : "0.99",
|
||||
"dedupRun" : "001",
|
||||
"entityType" : "result",
|
||||
"subEntityType" : "resulttype",
|
||||
"subEntityValue" : "otherresearchproduct",
|
||||
"orderField" : "title",
|
||||
"queueMaxSize" : "2000",
|
||||
"groupMaxSize" : "100",
|
||||
"maxChildren" : "100",
|
||||
"slidingWindowSize" : "200",
|
||||
"rootBuilder" : [ "result", "resultProject_outcome_isProducedBy", "resultResult_publicationDataset_isRelatedTo", "resultResult_similarity_isAmongTopNSimilarDocuments", "resultResult_similarity_hasAmongTopNSimilarDocuments", "resultOrganization_affiliation_hasAuthorInstitution", "resultResult_part_hasPart", "resultResult_part_isPartOf", "resultResult_supplement_isSupplementTo", "resultResult_supplement_isSupplementedBy", "resultResult_version_isVersionOf" ],
|
||||
"includeChildren" : "true"
|
||||
},
|
||||
"pace" : {
|
||||
"clustering" : [
|
||||
{ "name" : "ngrampairs", "fields" : [ "title" ], "params" : { "max" : "1", "ngramLen" : "3"} },
|
||||
{ "name" : "suffixprefix", "fields" : [ "title" ], "params" : { "max" : "1", "len" : "3" } },
|
||||
{ "name" : "lowercase", "fields" : [ "doi" ], "params" : { } }
|
||||
],
|
||||
"decisionTree" : {
|
||||
"start" : {
|
||||
"fields": [
|
||||
{
|
||||
"field": "pid",
|
||||
"comparator": "jsonListMatch",
|
||||
"weight": 1.0,
|
||||
"countIfUndefined": "false",
|
||||
"params": {
|
||||
"jpath_value": "$.value",
|
||||
"jpath_classid": "$.qualifier.classid"
|
||||
}
|
||||
}
|
||||
],
|
||||
"threshold": 0.5,
|
||||
"aggregation": "AVG",
|
||||
"positive": "MATCH",
|
||||
"negative": "layer2",
|
||||
"undefined": "layer2",
|
||||
"ignoreUndefined": "true"
|
||||
},
|
||||
"layer2" : {
|
||||
"fields": [
|
||||
{
|
||||
"field": "title",
|
||||
"comparator": "titleVersionMatch",
|
||||
"weight": 1.0,
|
||||
"countIfUndefined": "false",
|
||||
"params": {}
|
||||
},
|
||||
{
|
||||
"field": "authors",
|
||||
"comparator": "sizeMatch",
|
||||
"weight": 1.0,
|
||||
"countIfUndefined": "false",
|
||||
"params": {}
|
||||
}
|
||||
],
|
||||
"threshold": 1.0,
|
||||
"aggregation": "AND",
|
||||
"positive": "layer3",
|
||||
"negative": "NO_MATCH",
|
||||
"undefined": "layer3",
|
||||
"ignoreUndefined": "false"
|
||||
},
|
||||
"layer3" : {
|
||||
"fields": [
|
||||
{
|
||||
"field": "title",
|
||||
"comparator": "levensteinTitle",
|
||||
"weight": 1.0,
|
||||
"countIfUndefined": "true"
|
||||
}
|
||||
],
|
||||
"threshold": 0.99,
|
||||
"aggregation": "AVG",
|
||||
"positive": "MATCH",
|
||||
"negative": "NO_MATCH",
|
||||
"undefined": "NO_MATCH",
|
||||
"ignoreUndefined": "true"
|
||||
}
|
||||
},
|
||||
"model" : [
|
||||
{
|
||||
"name" : "doi",
|
||||
"type" : "String",
|
||||
"path" : "$.pid[@.qualifier.classid = 'doi'}].value"
|
||||
},
|
||||
{
|
||||
"name" : "pid",
|
||||
"type" : "JSON",
|
||||
"path" : "$.pid",
|
||||
"overrideMatch" : "true"
|
||||
},
|
||||
{
|
||||
"name" : "title",
|
||||
"type" : "String",
|
||||
"path" : "$.title[@.qualifier.classid = 'main title'].value",
|
||||
"length" : 250,
|
||||
"size" : 5
|
||||
},
|
||||
{
|
||||
"name" : "authors",
|
||||
"type" : "String",
|
||||
"path" : "$.author[*].fullname",
|
||||
"size" : 200
|
||||
},
|
||||
{
|
||||
"name" : "resulttype",
|
||||
"type" : "String",
|
||||
"path" : "$.resulttype.classid"
|
||||
}
|
||||
],
|
||||
"blacklists" : {},
|
||||
"synonyms" : {}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,92 @@
|
|||
{
|
||||
"wf" : {
|
||||
"threshold" : "0.99",
|
||||
"dedupRun" : "001",
|
||||
"entityType" : "result",
|
||||
"subEntityType" : "resulttype",
|
||||
"subEntityValue" : "software",
|
||||
"orderField" : "title",
|
||||
"queueMaxSize" : "2000",
|
||||
"groupMaxSize" : "100",
|
||||
"maxChildren" : "100",
|
||||
"slidingWindowSize" : "200",
|
||||
"rootBuilder" : [ "result", "resultProject_outcome_isProducedBy", "resultResult_publicationDataset_isRelatedTo", "resultResult_similarity_isAmongTopNSimilarDocuments", "resultResult_similarity_hasAmongTopNSimilarDocuments", "resultOrganization_affiliation_hasAuthorInstitution", "resultResult_part_hasPart", "resultResult_part_isPartOf", "resultResult_supplement_isSupplementTo", "resultResult_supplement_isSupplementedBy", "resultResult_version_isVersionOf" ],
|
||||
"includeChildren" : "true"
|
||||
},
|
||||
"pace" : {
|
||||
"clustering" : [
|
||||
{ "name" : "ngrampairs", "fields" : [ "title" ], "params" : { "max" : "1", "ngramLen" : "3"} },
|
||||
{ "name" : "suffixprefix", "fields" : [ "title" ], "params" : { "max" : "1", "len" : "3" } },
|
||||
{ "name" : "lowercase", "fields" : [ "doi", "url" ], "params" : { } }
|
||||
],
|
||||
"decisionTree": {
|
||||
"start": {
|
||||
"fields": [
|
||||
{
|
||||
"field": "doi",
|
||||
"comparator": "exactMatch",
|
||||
"weight": 1,
|
||||
"countIfUndefined": "false",
|
||||
"params": {}
|
||||
},
|
||||
{
|
||||
"field": "url",
|
||||
"comparator": "exactMatch",
|
||||
"weight": 1,
|
||||
"countIfUndefined": "false",
|
||||
"params": {}
|
||||
}
|
||||
],
|
||||
"threshold": 1,
|
||||
"aggregation": "OR",
|
||||
"positive": "MATCH",
|
||||
"negative": "layer2",
|
||||
"undefined": "layer2",
|
||||
"ignoreUndefined": "false"
|
||||
},
|
||||
"layer2": {
|
||||
"fields": [
|
||||
{
|
||||
"field": "title",
|
||||
"comparator": "levensteinTitleIgnoreVersion",
|
||||
"weight": 1,
|
||||
"countIfUndefined": "false",
|
||||
"params": {}
|
||||
}
|
||||
],
|
||||
"threshold": 0.99,
|
||||
"aggregation": "AVG",
|
||||
"positive": "MATCH",
|
||||
"negative": "NO_MATCH",
|
||||
"undefined": "NO_MATCH",
|
||||
"ignoreUndefined": "false"
|
||||
}
|
||||
},
|
||||
"model" : [
|
||||
{
|
||||
"name" : "doi",
|
||||
"type" : "String",
|
||||
"path" : "$.pid[?(@.qualifier.classid == 'doi')].value"
|
||||
},
|
||||
{
|
||||
"name" : "title",
|
||||
"type" : "String",
|
||||
"path" : "$.title[?(@.qualifier.classid == 'main title')].value",
|
||||
"length" : 250,
|
||||
"size" : 5
|
||||
},
|
||||
{
|
||||
"name" : "url",
|
||||
"type" : "String",
|
||||
"path" : "$.instance.url"
|
||||
},
|
||||
{
|
||||
"name" : "resulttype",
|
||||
"type" : "String",
|
||||
"path" : "$.resulttype.classid"
|
||||
}
|
||||
],
|
||||
"blacklists" : {},
|
||||
"synonyms": {}
|
||||
}
|
||||
}
|
File diff suppressed because one or more lines are too long
File diff suppressed because one or more lines are too long
File diff suppressed because one or more lines are too long
File diff suppressed because one or more lines are too long
|
@ -14,6 +14,7 @@
|
|||
<SCAN_SEQUENCE>
|
||||
<SCAN id="organization"/>
|
||||
<SCAN id="publication"/>
|
||||
<SCAN id="software"/>
|
||||
</SCAN_SEQUENCE>
|
||||
</DEDUPLICATION>
|
||||
</CONFIGURATION>
|
||||
|
|
|
@ -1,9 +1,38 @@
|
|||
package eu.dnetlib.dhp.oa.graph.raw;
|
||||
|
||||
import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.*;
|
||||
import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.createOpenaireId;
|
||||
import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.dataInfo;
|
||||
import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.field;
|
||||
import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.journal;
|
||||
import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.keyValue;
|
||||
import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.listFields;
|
||||
import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.oaiIProvenance;
|
||||
import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.qualifier;
|
||||
import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.structuredProperty;
|
||||
|
||||
import eu.dnetlib.dhp.schema.oaf.*;
|
||||
import java.util.*;
|
||||
import eu.dnetlib.dhp.schema.oaf.Author;
|
||||
import eu.dnetlib.dhp.schema.oaf.DataInfo;
|
||||
import eu.dnetlib.dhp.schema.oaf.Dataset;
|
||||
import eu.dnetlib.dhp.schema.oaf.Field;
|
||||
import eu.dnetlib.dhp.schema.oaf.GeoLocation;
|
||||
import eu.dnetlib.dhp.schema.oaf.Instance;
|
||||
import eu.dnetlib.dhp.schema.oaf.Journal;
|
||||
import eu.dnetlib.dhp.schema.oaf.KeyValue;
|
||||
import eu.dnetlib.dhp.schema.oaf.OAIProvenance;
|
||||
import eu.dnetlib.dhp.schema.oaf.Oaf;
|
||||
import eu.dnetlib.dhp.schema.oaf.OtherResearchProduct;
|
||||
import eu.dnetlib.dhp.schema.oaf.Publication;
|
||||
import eu.dnetlib.dhp.schema.oaf.Qualifier;
|
||||
import eu.dnetlib.dhp.schema.oaf.Relation;
|
||||
import eu.dnetlib.dhp.schema.oaf.Result;
|
||||
import eu.dnetlib.dhp.schema.oaf.Software;
|
||||
import eu.dnetlib.dhp.schema.oaf.StructuredProperty;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Date;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.dom4j.Document;
|
||||
import org.dom4j.DocumentFactory;
|
||||
|
@ -29,6 +58,12 @@ public abstract class AbstractMdRecordToOafMapper {
|
|||
qualifier("software", "software", "dnet:result_typologies", "dnet:result_typologies");
|
||||
protected static final Qualifier OTHER_RESULTTYPE_QUALIFIER =
|
||||
qualifier("other", "other", "dnet:result_typologies", "dnet:result_typologies");
|
||||
protected static final Qualifier REPOSITORY_QUALIFIER =
|
||||
qualifier(
|
||||
"sysimport:crosswalk:repository",
|
||||
"sysimport:crosswalk:repository",
|
||||
"dnet:provenanceActions",
|
||||
"dnet:provenanceActions");
|
||||
|
||||
protected AbstractMdRecordToOafMapper(final Map<String, String> code2name) {
|
||||
this.code2name = code2name;
|
||||
|
@ -55,13 +90,13 @@ public abstract class AbstractMdRecordToOafMapper {
|
|||
final String type = doc.valueOf("//dr:CobjCategory/@type");
|
||||
final KeyValue collectedFrom =
|
||||
keyValue(
|
||||
doc.valueOf("//oaf:collectedFrom/@id"),
|
||||
createOpenaireId(10, doc.valueOf("//oaf:collectedFrom/@id"), true),
|
||||
doc.valueOf("//oaf:collectedFrom/@name"));
|
||||
final KeyValue hostedBy =
|
||||
StringUtils.isBlank(doc.valueOf("//oaf:hostedBy/@id"))
|
||||
? collectedFrom
|
||||
: keyValue(
|
||||
doc.valueOf("//oaf:hostedBy/@id"),
|
||||
createOpenaireId(10, doc.valueOf("//oaf:hostedBy/@id"), true),
|
||||
doc.valueOf("//oaf:hostedBy/@name"));
|
||||
|
||||
final DataInfo info = prepareDataInfo(doc);
|
||||
|
@ -154,7 +189,7 @@ public abstract class AbstractMdRecordToOafMapper {
|
|||
r1.setRelClass("isProducedBy");
|
||||
r1.setSource(docId);
|
||||
r1.setTarget(projectId);
|
||||
r1.setCollectedFrom(Arrays.asList(collectedFrom));
|
||||
r1.setCollectedfrom(Arrays.asList(collectedFrom));
|
||||
r1.setDataInfo(info);
|
||||
r1.setLastupdatetimestamp(lastUpdateTimestamp);
|
||||
res.add(r1);
|
||||
|
@ -165,7 +200,7 @@ public abstract class AbstractMdRecordToOafMapper {
|
|||
r2.setRelClass("produces");
|
||||
r2.setSource(projectId);
|
||||
r2.setTarget(docId);
|
||||
r2.setCollectedFrom(Arrays.asList(collectedFrom));
|
||||
r2.setCollectedfrom(Arrays.asList(collectedFrom));
|
||||
r2.setDataInfo(info);
|
||||
r2.setLastupdatetimestamp(lastUpdateTimestamp);
|
||||
res.add(r2);
|
||||
|
@ -398,7 +433,7 @@ public abstract class AbstractMdRecordToOafMapper {
|
|||
final Node n = doc.selectSingleNode("//oaf:datainfo");
|
||||
|
||||
if (n == null) {
|
||||
return null;
|
||||
return dataInfo(false, null, false, false, REPOSITORY_QUALIFIER, "0.9");
|
||||
}
|
||||
|
||||
final String paClassId = n.valueOf("./oaf:provenanceaction/@classid");
|
||||
|
|
|
@ -1,11 +1,35 @@
|
|||
package eu.dnetlib.dhp.oa.graph.raw;
|
||||
|
||||
import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.*;
|
||||
import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.asString;
|
||||
import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.createOpenaireId;
|
||||
import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.dataInfo;
|
||||
import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.field;
|
||||
import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.journal;
|
||||
import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.listFields;
|
||||
import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.listKeyValues;
|
||||
import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.qualifier;
|
||||
import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.structuredProperty;
|
||||
|
||||
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
||||
import eu.dnetlib.dhp.oa.graph.raw.common.AbstractMigrationApplication;
|
||||
import eu.dnetlib.dhp.oa.graph.raw.common.DbClient;
|
||||
import eu.dnetlib.dhp.schema.oaf.*;
|
||||
import eu.dnetlib.dhp.schema.oaf.Context;
|
||||
import eu.dnetlib.dhp.schema.oaf.DataInfo;
|
||||
import eu.dnetlib.dhp.schema.oaf.Dataset;
|
||||
import eu.dnetlib.dhp.schema.oaf.Datasource;
|
||||
import eu.dnetlib.dhp.schema.oaf.Field;
|
||||
import eu.dnetlib.dhp.schema.oaf.Journal;
|
||||
import eu.dnetlib.dhp.schema.oaf.KeyValue;
|
||||
import eu.dnetlib.dhp.schema.oaf.Oaf;
|
||||
import eu.dnetlib.dhp.schema.oaf.Organization;
|
||||
import eu.dnetlib.dhp.schema.oaf.OtherResearchProduct;
|
||||
import eu.dnetlib.dhp.schema.oaf.Project;
|
||||
import eu.dnetlib.dhp.schema.oaf.Publication;
|
||||
import eu.dnetlib.dhp.schema.oaf.Qualifier;
|
||||
import eu.dnetlib.dhp.schema.oaf.Relation;
|
||||
import eu.dnetlib.dhp.schema.oaf.Result;
|
||||
import eu.dnetlib.dhp.schema.oaf.Software;
|
||||
import eu.dnetlib.dhp.schema.oaf.StructuredProperty;
|
||||
import java.io.Closeable;
|
||||
import java.io.IOException;
|
||||
import java.sql.Array;
|
||||
|
@ -119,7 +143,8 @@ public class MigrateDbEntitiesApplication extends AbstractMigrationApplication
|
|||
ds.setOriginalId(Arrays.asList(rs.getString("datasourceid")));
|
||||
ds.setCollectedfrom(
|
||||
listKeyValues(
|
||||
rs.getString("collectedfromid"), rs.getString("collectedfromname")));
|
||||
createOpenaireId(10, rs.getString("collectedfromid"), true),
|
||||
rs.getString("collectedfromname")));
|
||||
ds.setPid(new ArrayList<>());
|
||||
ds.setDateofcollection(asString(rs.getDate("dateofcollection")));
|
||||
ds.setDateoftransformation(null); // Value not returned by the SQL query
|
||||
|
@ -185,7 +210,8 @@ public class MigrateDbEntitiesApplication extends AbstractMigrationApplication
|
|||
p.setOriginalId(Arrays.asList(rs.getString("projectid")));
|
||||
p.setCollectedfrom(
|
||||
listKeyValues(
|
||||
rs.getString("collectedfromid"), rs.getString("collectedfromname")));
|
||||
createOpenaireId(10, rs.getString("collectedfromid"), true),
|
||||
rs.getString("collectedfromname")));
|
||||
p.setPid(new ArrayList<>());
|
||||
p.setDateofcollection(asString(rs.getDate("dateofcollection")));
|
||||
p.setDateoftransformation(asString(rs.getDate("dateoftransformation")));
|
||||
|
@ -240,7 +266,8 @@ public class MigrateDbEntitiesApplication extends AbstractMigrationApplication
|
|||
o.setOriginalId(Arrays.asList(rs.getString("organizationid")));
|
||||
o.setCollectedfrom(
|
||||
listKeyValues(
|
||||
rs.getString("collectedfromid"), rs.getString("collectedfromname")));
|
||||
createOpenaireId(10, rs.getString("collectedfromid"), true),
|
||||
rs.getString("collectedfromname")));
|
||||
o.setPid(new ArrayList<>());
|
||||
o.setDateofcollection(asString(rs.getDate("dateofcollection")));
|
||||
o.setDateoftransformation(asString(rs.getDate("dateoftransformation")));
|
||||
|
@ -285,7 +312,8 @@ public class MigrateDbEntitiesApplication extends AbstractMigrationApplication
|
|||
final String dsId = createOpenaireId(10, rs.getString("datasource"), true);
|
||||
final List<KeyValue> collectedFrom =
|
||||
listKeyValues(
|
||||
rs.getString("collectedfromid"), rs.getString("collectedfromname"));
|
||||
createOpenaireId(10, rs.getString("collectedfromid"), true),
|
||||
rs.getString("collectedfromname"));
|
||||
|
||||
final Relation r1 = new Relation();
|
||||
r1.setRelType("datasourceOrganization");
|
||||
|
@ -293,7 +321,7 @@ public class MigrateDbEntitiesApplication extends AbstractMigrationApplication
|
|||
r1.setRelClass("isProvidedBy");
|
||||
r1.setSource(dsId);
|
||||
r1.setTarget(orgId);
|
||||
r1.setCollectedFrom(collectedFrom);
|
||||
r1.setCollectedfrom(collectedFrom);
|
||||
r1.setDataInfo(info);
|
||||
r1.setLastupdatetimestamp(lastUpdateTimestamp);
|
||||
|
||||
|
@ -303,7 +331,7 @@ public class MigrateDbEntitiesApplication extends AbstractMigrationApplication
|
|||
r2.setRelClass("provides");
|
||||
r2.setSource(orgId);
|
||||
r2.setTarget(dsId);
|
||||
r2.setCollectedFrom(collectedFrom);
|
||||
r2.setCollectedfrom(collectedFrom);
|
||||
r2.setDataInfo(info);
|
||||
r2.setLastupdatetimestamp(lastUpdateTimestamp);
|
||||
|
||||
|
@ -320,7 +348,8 @@ public class MigrateDbEntitiesApplication extends AbstractMigrationApplication
|
|||
final String projectId = createOpenaireId(40, rs.getString("project"), true);
|
||||
final List<KeyValue> collectedFrom =
|
||||
listKeyValues(
|
||||
rs.getString("collectedfromid"), rs.getString("collectedfromname"));
|
||||
createOpenaireId(10, rs.getString("collectedfromid"), true),
|
||||
rs.getString("collectedfromname"));
|
||||
|
||||
final Relation r1 = new Relation();
|
||||
r1.setRelType("projectOrganization");
|
||||
|
@ -328,7 +357,7 @@ public class MigrateDbEntitiesApplication extends AbstractMigrationApplication
|
|||
r1.setRelClass("isParticipant");
|
||||
r1.setSource(projectId);
|
||||
r1.setTarget(orgId);
|
||||
r1.setCollectedFrom(collectedFrom);
|
||||
r1.setCollectedfrom(collectedFrom);
|
||||
r1.setDataInfo(info);
|
||||
r1.setLastupdatetimestamp(lastUpdateTimestamp);
|
||||
|
||||
|
@ -338,7 +367,7 @@ public class MigrateDbEntitiesApplication extends AbstractMigrationApplication
|
|||
r2.setRelClass("hasParticipant");
|
||||
r2.setSource(orgId);
|
||||
r2.setTarget(projectId);
|
||||
r2.setCollectedFrom(collectedFrom);
|
||||
r2.setCollectedfrom(collectedFrom);
|
||||
r2.setDataInfo(info);
|
||||
r2.setLastupdatetimestamp(lastUpdateTimestamp);
|
||||
|
||||
|
@ -363,6 +392,9 @@ public class MigrateDbEntitiesApplication extends AbstractMigrationApplication
|
|||
"dnet:provenanceActions"),
|
||||
"0.9");
|
||||
|
||||
final List<KeyValue> collectedFrom =
|
||||
listKeyValues(createOpenaireId(10, "infrastruct_::openaire", true), "OpenAIRE");
|
||||
|
||||
try {
|
||||
|
||||
if (rs.getString("source_type").equals("context")) {
|
||||
|
@ -381,6 +413,7 @@ public class MigrateDbEntitiesApplication extends AbstractMigrationApplication
|
|||
r.setLastupdatetimestamp(lastUpdateTimestamp);
|
||||
r.setContext(prepareContext(rs.getString("source_id"), info));
|
||||
r.setDataInfo(info);
|
||||
r.setCollectedfrom(collectedFrom);
|
||||
|
||||
return Arrays.asList(r);
|
||||
} else {
|
||||
|
@ -395,18 +428,22 @@ public class MigrateDbEntitiesApplication extends AbstractMigrationApplication
|
|||
final Relation r2 = new Relation();
|
||||
|
||||
if (rs.getString("source_type").equals("project")) {
|
||||
r1.setCollectedfrom(collectedFrom);
|
||||
r1.setRelType("resultProject");
|
||||
r1.setSubRelType("outcome");
|
||||
r1.setRelClass("produces");
|
||||
|
||||
r2.setCollectedfrom(collectedFrom);
|
||||
r2.setRelType("resultProject");
|
||||
r2.setSubRelType("outcome");
|
||||
r2.setRelClass("isProducedBy");
|
||||
} else {
|
||||
r1.setCollectedfrom(collectedFrom);
|
||||
r1.setRelType("resultResult");
|
||||
r1.setSubRelType("relationship");
|
||||
r1.setRelClass("isRelatedTo");
|
||||
|
||||
r2.setCollectedfrom(collectedFrom);
|
||||
r2.setRelType("resultResult");
|
||||
r2.setSubRelType("relationship");
|
||||
r2.setRelClass("isRelatedTo");
|
||||
|
|
|
@ -232,7 +232,7 @@ public class OafToOafMapper extends AbstractMdRecordToOafMapper {
|
|||
r1.setRelClass("isRelatedTo");
|
||||
r1.setSource(docId);
|
||||
r1.setTarget(otherId);
|
||||
r1.setCollectedFrom(Arrays.asList(collectedFrom));
|
||||
r1.setCollectedfrom(Arrays.asList(collectedFrom));
|
||||
r1.setDataInfo(info);
|
||||
r1.setLastupdatetimestamp(lastUpdateTimestamp);
|
||||
res.add(r1);
|
||||
|
@ -243,7 +243,7 @@ public class OafToOafMapper extends AbstractMdRecordToOafMapper {
|
|||
r2.setRelClass("isRelatedTo");
|
||||
r2.setSource(otherId);
|
||||
r2.setTarget(docId);
|
||||
r2.setCollectedFrom(Arrays.asList(collectedFrom));
|
||||
r2.setCollectedfrom(Arrays.asList(collectedFrom));
|
||||
r2.setDataInfo(info);
|
||||
r2.setLastupdatetimestamp(lastUpdateTimestamp);
|
||||
res.add(r2);
|
||||
|
|
|
@ -334,7 +334,7 @@ public class OdfToOafMapper extends AbstractMdRecordToOafMapper {
|
|||
r.setRelClass(relClass);
|
||||
r.setSource(source);
|
||||
r.setTarget(target);
|
||||
r.setCollectedFrom(Arrays.asList(collectedFrom));
|
||||
r.setCollectedfrom(Arrays.asList(collectedFrom));
|
||||
r.setDataInfo(info);
|
||||
r.setLastupdatetimestamp(lastUpdateTimestamp);
|
||||
return r;
|
||||
|
|
|
@ -186,7 +186,7 @@ public abstract class AbstractScholexplorerParser {
|
|||
r.setTarget(targetId);
|
||||
r.setRelType(relationSemantic);
|
||||
r.setRelClass("datacite");
|
||||
r.setCollectedFrom(parsedObject.getCollectedfrom());
|
||||
r.setCollectedfrom(parsedObject.getCollectedfrom());
|
||||
r.setDataInfo(di);
|
||||
rels.add(r);
|
||||
r = new DLIRelation();
|
||||
|
@ -195,7 +195,7 @@ public abstract class AbstractScholexplorerParser {
|
|||
r.setTarget(parsedObject.getId());
|
||||
r.setRelType(inverseRelation);
|
||||
r.setRelClass("datacite");
|
||||
r.setCollectedFrom(parsedObject.getCollectedfrom());
|
||||
r.setCollectedfrom(parsedObject.getCollectedfrom());
|
||||
r.setDateOfCollection(dateOfCollection);
|
||||
rels.add(r);
|
||||
if ("unknown".equalsIgnoreCase(relatedType))
|
||||
|
|
|
@ -21,6 +21,28 @@
|
|||
<name>sparkExecutorCores</name>
|
||||
<description>number of cores used by single executor</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>oozieActionShareLibForSpark2</name>
|
||||
<description>oozie action sharelib for spark 2.*</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>spark2ExtraListeners</name>
|
||||
<value>com.cloudera.spark.lineage.NavigatorAppListener</value>
|
||||
<description>spark 2.* extra listeners classname</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>spark2SqlQueryExecutionListeners</name>
|
||||
<value>com.cloudera.spark.lineage.NavigatorQueryListener</value>
|
||||
<description>spark 2.* sql query execution listeners classname</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>spark2YarnHistoryServerAddress</name>
|
||||
<description>spark 2.* yarn history server address</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>spark2EventLogDir</name>
|
||||
<description>spark 2.* event log dir location</description>
|
||||
</property>
|
||||
</parameters>
|
||||
|
||||
<global>
|
||||
|
@ -35,6 +57,10 @@
|
|||
<name>oozie.launcher.mapred.job.queue.name</name>
|
||||
<value>${oozieLauncherQueueName}</value>
|
||||
</property>
|
||||
<property>
|
||||
<name>oozie.action.sharelib.for.spark</name>
|
||||
<value>${oozieActionShareLibForSpark2}</value>
|
||||
</property>
|
||||
</configuration>
|
||||
</global>
|
||||
|
||||
|
@ -52,14 +78,15 @@
|
|||
<class>eu.dnetlib.dhp.oa.graph.GraphHiveImporterJob</class>
|
||||
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
|
||||
<spark-opts>
|
||||
--executor-memory ${sparkExecutorMemory}
|
||||
--executor-cores ${sparkExecutorCores}
|
||||
--executor-memory=${sparkExecutorMemory}
|
||||
--executor-cores=${sparkExecutorCores}
|
||||
--driver-memory=${sparkDriverMemory}
|
||||
--conf spark.extraListeners="com.cloudera.spark.lineage.NavigatorAppListener"
|
||||
--conf spark.sql.queryExecutionListeners="com.cloudera.spark.lineage.NavigatorQueryListener"
|
||||
--conf spark.sql.warehouse.dir="/user/hive/warehouse"
|
||||
--conf spark.extraListeners=${spark2ExtraListeners}
|
||||
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
||||
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
||||
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
||||
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
|
||||
</spark-opts>
|
||||
<arg>-mt</arg> <arg>yarn-cluster</arg>
|
||||
<arg>--sourcePath</arg><arg>${sourcePath}</arg>
|
||||
<arg>--hive_db_name</arg><arg>${hive_db_name}</arg>
|
||||
<arg>--hive_metastore_uris</arg><arg>${hive_metastore_uris}</arg>
|
||||
|
|
|
@ -1,11 +1,16 @@
|
|||
package eu.dnetlib.dhp.oa.graph.raw;
|
||||
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertNotNull;
|
||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
import static org.mockito.ArgumentMatchers.anyString;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
import eu.dnetlib.dhp.schema.oaf.*;
|
||||
import eu.dnetlib.dhp.schema.oaf.Dataset;
|
||||
import eu.dnetlib.dhp.schema.oaf.Oaf;
|
||||
import eu.dnetlib.dhp.schema.oaf.Publication;
|
||||
import eu.dnetlib.dhp.schema.oaf.Relation;
|
||||
import eu.dnetlib.dhp.schema.oaf.Software;
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
@ -43,6 +48,7 @@ public class MappersTest {
|
|||
final Relation r2 = (Relation) list.get(2);
|
||||
|
||||
assertValidId(p.getId());
|
||||
assertValidId(p.getCollectedfrom().get(0).getKey());
|
||||
assertTrue(StringUtils.isNotBlank(p.getTitle().get(0).getValue()));
|
||||
assertTrue(p.getAuthor().size() > 0);
|
||||
assertTrue(p.getSubject().size() > 0);
|
||||
|
@ -50,13 +56,24 @@ public class MappersTest {
|
|||
assertTrue(StringUtils.isNotBlank(p.getJournal().getName()));
|
||||
|
||||
assertValidId(r1.getSource());
|
||||
assertValidId(r1.getTarget());
|
||||
assertValidId(r2.getSource());
|
||||
assertValidId(r2.getTarget());
|
||||
assertValidId(r1.getCollectedfrom().get(0).getKey());
|
||||
assertValidId(r2.getCollectedfrom().get(0).getKey());
|
||||
assertNotNull(r1.getDataInfo());
|
||||
assertNotNull(r2.getDataInfo());
|
||||
assertNotNull(r1.getDataInfo().getTrust());
|
||||
assertNotNull(r2.getDataInfo().getTrust());
|
||||
assertEquals(r1.getSource(), r2.getTarget());
|
||||
assertEquals(r2.getSource(), r1.getTarget());
|
||||
assertTrue(StringUtils.isNotBlank(r1.getRelClass()));
|
||||
assertTrue(StringUtils.isNotBlank(r2.getRelClass()));
|
||||
assertTrue(StringUtils.isNotBlank(r1.getRelType()));
|
||||
assertTrue(StringUtils.isNotBlank(r2.getRelType()));
|
||||
|
||||
// System.out.println(new ObjectMapper().writeValueAsString(r1));
|
||||
// System.out.println(new ObjectMapper().writeValueAsString(r2));
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -65,15 +82,35 @@ public class MappersTest {
|
|||
|
||||
final List<Oaf> list = new OdfToOafMapper(code2name).processMdRecord(xml);
|
||||
|
||||
assertEquals(1, list.size());
|
||||
assertEquals(3, list.size());
|
||||
assertTrue(list.get(0) instanceof Dataset);
|
||||
assertTrue(list.get(1) instanceof Relation);
|
||||
assertTrue(list.get(2) instanceof Relation);
|
||||
|
||||
final Dataset d = (Dataset) list.get(0);
|
||||
final Relation r1 = (Relation) list.get(1);
|
||||
final Relation r2 = (Relation) list.get(2);
|
||||
|
||||
assertValidId(d.getId());
|
||||
assertValidId(d.getCollectedfrom().get(0).getKey());
|
||||
assertTrue(StringUtils.isNotBlank(d.getTitle().get(0).getValue()));
|
||||
assertTrue(d.getAuthor().size() > 0);
|
||||
assertTrue(d.getSubject().size() > 0);
|
||||
|
||||
assertValidId(r1.getSource());
|
||||
assertValidId(r1.getTarget());
|
||||
assertValidId(r2.getSource());
|
||||
assertValidId(r2.getTarget());
|
||||
assertNotNull(r1.getDataInfo());
|
||||
assertNotNull(r2.getDataInfo());
|
||||
assertNotNull(r1.getDataInfo().getTrust());
|
||||
assertNotNull(r2.getDataInfo().getTrust());
|
||||
assertEquals(r1.getSource(), r2.getTarget());
|
||||
assertEquals(r2.getSource(), r1.getTarget());
|
||||
assertTrue(StringUtils.isNotBlank(r1.getRelClass()));
|
||||
assertTrue(StringUtils.isNotBlank(r2.getRelClass()));
|
||||
assertTrue(StringUtils.isNotBlank(r1.getRelType()));
|
||||
assertTrue(StringUtils.isNotBlank(r2.getRelType()));
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -88,6 +125,7 @@ public class MappersTest {
|
|||
final Software s = (Software) list.get(0);
|
||||
|
||||
assertValidId(s.getId());
|
||||
assertValidId(s.getCollectedfrom().get(0).getKey());
|
||||
assertTrue(StringUtils.isNotBlank(s.getTitle().get(0).getValue()));
|
||||
assertTrue(s.getAuthor().size() > 0);
|
||||
assertTrue(s.getSubject().size() > 0);
|
||||
|
|
|
@ -1,10 +1,17 @@
|
|||
package eu.dnetlib.dhp.oa.graph.raw;
|
||||
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertNotNull;
|
||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
|
||||
import com.fasterxml.jackson.core.type.TypeReference;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import eu.dnetlib.dhp.schema.oaf.*;
|
||||
import eu.dnetlib.dhp.schema.oaf.Datasource;
|
||||
import eu.dnetlib.dhp.schema.oaf.Oaf;
|
||||
import eu.dnetlib.dhp.schema.oaf.Organization;
|
||||
import eu.dnetlib.dhp.schema.oaf.Project;
|
||||
import eu.dnetlib.dhp.schema.oaf.Relation;
|
||||
import eu.dnetlib.dhp.schema.oaf.Result;
|
||||
import java.io.IOException;
|
||||
import java.sql.Array;
|
||||
import java.sql.Date;
|
||||
|
@ -13,6 +20,7 @@ import java.sql.SQLException;
|
|||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.api.extension.ExtendWith;
|
||||
|
@ -42,14 +50,13 @@ public class MigrateDbEntitiesApplicationTest {
|
|||
|
||||
final Datasource ds = (Datasource) list.get(0);
|
||||
assertValidId(ds.getId());
|
||||
assertValidId(ds.getCollectedfrom().get(0).getKey());
|
||||
assertEquals(ds.getOfficialname().getValue(), getValueAsString("officialname", fields));
|
||||
assertEquals(ds.getEnglishname().getValue(), getValueAsString("englishname", fields));
|
||||
assertEquals(ds.getContactemail().getValue(), getValueAsString("contactemail", fields));
|
||||
assertEquals(ds.getWebsiteurl().getValue(), getValueAsString("websiteurl", fields));
|
||||
assertEquals(
|
||||
ds.getNamespaceprefix().getValue(), getValueAsString("namespaceprefix", fields));
|
||||
assertEquals(
|
||||
ds.getCollectedfrom().get(0).getKey(), getValueAsString("collectedfromid", fields));
|
||||
assertEquals(
|
||||
ds.getCollectedfrom().get(0).getValue(),
|
||||
getValueAsString("collectedfromname", fields));
|
||||
|
@ -65,10 +72,9 @@ public class MigrateDbEntitiesApplicationTest {
|
|||
|
||||
final Project p = (Project) list.get(0);
|
||||
assertValidId(p.getId());
|
||||
assertValidId(p.getCollectedfrom().get(0).getKey());
|
||||
assertEquals(p.getAcronym().getValue(), getValueAsString("acronym", fields));
|
||||
assertEquals(p.getTitle().getValue(), getValueAsString("title", fields));
|
||||
assertEquals(
|
||||
p.getCollectedfrom().get(0).getKey(), getValueAsString("collectedfromid", fields));
|
||||
assertEquals(
|
||||
p.getCollectedfrom().get(0).getValue(),
|
||||
getValueAsString("collectedfromname", fields));
|
||||
|
@ -86,6 +92,7 @@ public class MigrateDbEntitiesApplicationTest {
|
|||
|
||||
final Organization o = (Organization) list.get(0);
|
||||
assertValidId(o.getId());
|
||||
assertValidId(o.getCollectedfrom().get(0).getKey());
|
||||
assertEquals(o.getLegalshortname().getValue(), getValueAsString("legalshortname", fields));
|
||||
assertEquals(o.getLegalname().getValue(), getValueAsString("legalname", fields));
|
||||
assertEquals(o.getWebsiteurl().getValue(), getValueAsString("websiteurl", fields));
|
||||
|
@ -98,8 +105,6 @@ public class MigrateDbEntitiesApplicationTest {
|
|||
assertEquals(
|
||||
o.getCountry().getSchemename(),
|
||||
getValueAsString("country", fields).split("@@@")[3]);
|
||||
assertEquals(
|
||||
o.getCollectedfrom().get(0).getKey(), getValueAsString("collectedfromid", fields));
|
||||
assertEquals(
|
||||
o.getCollectedfrom().get(0).getValue(),
|
||||
getValueAsString("collectedfromname", fields));
|
||||
|
@ -137,6 +142,8 @@ public class MigrateDbEntitiesApplicationTest {
|
|||
assertValidId(r2.getSource());
|
||||
assertEquals(r1.getSource(), r2.getTarget());
|
||||
assertEquals(r2.getSource(), r1.getTarget());
|
||||
assertValidId(r1.getCollectedfrom().get(0).getKey());
|
||||
assertValidId(r2.getCollectedfrom().get(0).getKey());
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -146,7 +153,12 @@ public class MigrateDbEntitiesApplicationTest {
|
|||
final List<Oaf> list = app.processClaims(rs);
|
||||
|
||||
assertEquals(1, list.size());
|
||||
assertTrue(list.get(0) instanceof Result);
|
||||
final Result r = (Result) list.get(0);
|
||||
|
||||
verifyMocks(fields);
|
||||
|
||||
assertValidId(r.getCollectedfrom().get(0).getKey());
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -157,6 +169,33 @@ public class MigrateDbEntitiesApplicationTest {
|
|||
|
||||
assertEquals(2, list.size());
|
||||
verifyMocks(fields);
|
||||
|
||||
assertTrue(list.get(0) instanceof Relation);
|
||||
assertTrue(list.get(1) instanceof Relation);
|
||||
|
||||
final Relation r1 = (Relation) list.get(0);
|
||||
final Relation r2 = (Relation) list.get(1);
|
||||
|
||||
assertValidId(r1.getSource());
|
||||
assertValidId(r1.getTarget());
|
||||
assertValidId(r2.getSource());
|
||||
assertValidId(r2.getTarget());
|
||||
assertNotNull(r1.getDataInfo());
|
||||
assertNotNull(r2.getDataInfo());
|
||||
assertNotNull(r1.getDataInfo().getTrust());
|
||||
assertNotNull(r2.getDataInfo().getTrust());
|
||||
assertEquals(r1.getSource(), r2.getTarget());
|
||||
assertEquals(r2.getSource(), r1.getTarget());
|
||||
assertTrue(StringUtils.isNotBlank(r1.getRelClass()));
|
||||
assertTrue(StringUtils.isNotBlank(r2.getRelClass()));
|
||||
assertTrue(StringUtils.isNotBlank(r1.getRelType()));
|
||||
assertTrue(StringUtils.isNotBlank(r2.getRelType()));
|
||||
|
||||
assertValidId(r1.getCollectedfrom().get(0).getKey());
|
||||
assertValidId(r2.getCollectedfrom().get(0).getKey());
|
||||
|
||||
// System.out.println(new ObjectMapper().writeValueAsString(r1));
|
||||
// System.out.println(new ObjectMapper().writeValueAsString(r2));
|
||||
}
|
||||
|
||||
private List<TypedField> prepareMocks(final String jsonFile) throws IOException, SQLException {
|
||||
|
|
|
@ -87,6 +87,7 @@
|
|||
<oaf:language>und</oaf:language>
|
||||
<oaf:concept id="https://zenodo.org/communities/epfl"/>
|
||||
<oaf:hostedBy id="re3data_____::r3d100010468" name="Zenodo"/>
|
||||
<oaf:projectid>corda_______::226852</oaf:projectid>
|
||||
<oaf:collectedFrom id="re3data_____::r3d100010468" name="Zenodo"/>
|
||||
</metadata>
|
||||
<about xmlns:dc="http://purl.org/dc/elements/1.1/"
|
||||
|
|
Binary file not shown.
|
@ -48,7 +48,7 @@ public class Scholix implements Serializable {
|
|||
if (scholixSummary.getDate() != null && scholixSummary.getDate().size() > 0)
|
||||
s.setPublicationDate(scholixSummary.getDate().get(0));
|
||||
s.setLinkprovider(
|
||||
rel.getCollectedFrom().stream()
|
||||
rel.getCollectedfrom().stream()
|
||||
.map(
|
||||
cf ->
|
||||
new ScholixEntityId(
|
||||
|
@ -73,7 +73,7 @@ public class Scholix implements Serializable {
|
|||
if (scholixSummary.getDate() != null && scholixSummary.getDate().size() > 0)
|
||||
s.setPublicationDate(scholixSummary.getDate().get(0));
|
||||
s.setLinkprovider(
|
||||
rel.getCollectedFrom().stream()
|
||||
rel.getCollectedfrom().stream()
|
||||
.map(
|
||||
cf ->
|
||||
new ScholixEntityId(
|
||||
|
|
|
@ -1 +1 @@
|
|||
{"dataInfo":{"invisible":false,"inferred":null,"deletedbyinference":false,"trust":"0.9","inferenceprovenance":null,"provenanceaction":null},"lastupdatetimestamp":null,"relType":"IsReferencedBy","subRelType":null,"relClass":"datacite","source":"50|dedup_______::4f00e4f0e82bb4cbb35261478e55568e","target":"60|97519e00ee2cddfa1f5bcb5220429b8f","collectedFrom":[{"key":"dli_________::europe_pmc__","value":"Europe PMC","dataInfo":null}]}
|
||||
{"dataInfo":{"invisible":false,"inferred":null,"deletedbyinference":false,"trust":"0.9","inferenceprovenance":null,"provenanceaction":null},"lastupdatetimestamp":null,"relType":"IsReferencedBy","subRelType":null,"relClass":"datacite","source":"50|dedup_______::4f00e4f0e82bb4cbb35261478e55568e","target":"60|97519e00ee2cddfa1f5bcb5220429b8f","collectedfrom":[{"key":"dli_________::europe_pmc__","value":"Europe PMC","dataInfo":null}]}
|
|
@ -11,11 +11,13 @@ import eu.dnetlib.dhp.oa.provision.model.RelatedEntity;
|
|||
import eu.dnetlib.dhp.oa.provision.model.SortableRelation;
|
||||
import eu.dnetlib.dhp.schema.common.EntityType;
|
||||
import eu.dnetlib.dhp.schema.common.ModelSupport;
|
||||
import eu.dnetlib.dhp.schema.oaf.OafEntity;
|
||||
import eu.dnetlib.dhp.schema.oaf.*;
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
import java.util.Optional;
|
||||
import java.util.stream.Collectors;
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import org.apache.spark.SparkConf;
|
||||
import org.apache.spark.api.java.function.FilterFunction;
|
||||
import org.apache.spark.api.java.function.MapFunction;
|
||||
import org.apache.spark.sql.Dataset;
|
||||
import org.apache.spark.sql.Encoders;
|
||||
|
@ -104,16 +106,12 @@ public class CreateRelatedEntitiesJob_phase1 {
|
|||
SparkSession spark,
|
||||
String inputRelationsPath,
|
||||
String inputEntityPath,
|
||||
Class<E> entityClazz,
|
||||
Class<E> clazz,
|
||||
String outputPath) {
|
||||
|
||||
Dataset<Tuple2<String, SortableRelation>> relsByTarget =
|
||||
readPathRelation(spark, inputRelationsPath)
|
||||
.filter(
|
||||
(FilterFunction<SortableRelation>)
|
||||
value ->
|
||||
value.getDataInfo().getDeletedbyinference()
|
||||
== false)
|
||||
.filter("dataInfo.deletedbyinference == false")
|
||||
.map(
|
||||
(MapFunction<SortableRelation, Tuple2<String, SortableRelation>>)
|
||||
r -> new Tuple2<>(r.getTarget(), r),
|
||||
|
@ -122,10 +120,11 @@ public class CreateRelatedEntitiesJob_phase1 {
|
|||
.cache();
|
||||
|
||||
Dataset<Tuple2<String, RelatedEntity>> entities =
|
||||
readPathEntity(spark, inputEntityPath, entityClazz)
|
||||
readPathEntity(spark, inputEntityPath, clazz)
|
||||
.filter("dataInfo.invisible == false")
|
||||
.map(
|
||||
(MapFunction<E, RelatedEntity>)
|
||||
value -> asRelatedEntity(value, entityClazz),
|
||||
value -> asRelatedEntity(value, clazz),
|
||||
Encoders.bean(RelatedEntity.class))
|
||||
.map(
|
||||
(MapFunction<RelatedEntity, Tuple2<String, RelatedEntity>>)
|
||||
|
@ -146,7 +145,7 @@ public class CreateRelatedEntitiesJob_phase1 {
|
|||
Encoders.bean(EntityRelEntity.class))
|
||||
.write()
|
||||
.mode(SaveMode.Overwrite)
|
||||
.parquet(outputPath + "/" + EntityType.fromClass(entityClazz));
|
||||
.parquet(outputPath + "/" + EntityType.fromClass(clazz));
|
||||
}
|
||||
|
||||
private static <E extends OafEntity> Dataset<E> readPathEntity(
|
||||
|
@ -161,6 +160,81 @@ public class CreateRelatedEntitiesJob_phase1 {
|
|||
Encoders.bean(entityClazz));
|
||||
}
|
||||
|
||||
public static <E extends OafEntity> RelatedEntity asRelatedEntity(E entity, Class<E> clazz) {
|
||||
|
||||
final RelatedEntity re = new RelatedEntity();
|
||||
re.setId(entity.getId());
|
||||
re.setType(EntityType.fromClass(clazz).name());
|
||||
|
||||
re.setPid(entity.getPid());
|
||||
re.setCollectedfrom(entity.getCollectedfrom());
|
||||
|
||||
switch (EntityType.fromClass(clazz)) {
|
||||
case publication:
|
||||
case dataset:
|
||||
case otherresearchproduct:
|
||||
case software:
|
||||
Result result = (Result) entity;
|
||||
|
||||
if (result.getTitle() != null && !result.getTitle().isEmpty()) {
|
||||
re.setTitle(result.getTitle().stream().findFirst().get());
|
||||
}
|
||||
|
||||
re.setDateofacceptance(getValue(result.getDateofacceptance()));
|
||||
re.setPublisher(getValue(result.getPublisher()));
|
||||
re.setResulttype(result.getResulttype());
|
||||
re.setInstances(result.getInstance());
|
||||
|
||||
// TODO still to be mapped
|
||||
// re.setCodeRepositoryUrl(j.read("$.coderepositoryurl"));
|
||||
|
||||
break;
|
||||
case datasource:
|
||||
Datasource d = (Datasource) entity;
|
||||
|
||||
re.setOfficialname(getValue(d.getOfficialname()));
|
||||
re.setWebsiteurl(getValue(d.getWebsiteurl()));
|
||||
re.setDatasourcetype(d.getDatasourcetype());
|
||||
re.setOpenairecompatibility(d.getOpenairecompatibility());
|
||||
|
||||
break;
|
||||
case organization:
|
||||
Organization o = (Organization) entity;
|
||||
|
||||
re.setLegalname(getValue(o.getLegalname()));
|
||||
re.setLegalshortname(getValue(o.getLegalshortname()));
|
||||
re.setCountry(o.getCountry());
|
||||
re.setWebsiteurl(getValue(o.getWebsiteurl()));
|
||||
break;
|
||||
case project:
|
||||
Project p = (Project) entity;
|
||||
|
||||
re.setProjectTitle(getValue(p.getTitle()));
|
||||
re.setCode(getValue(p.getCode()));
|
||||
re.setAcronym(getValue(p.getAcronym()));
|
||||
re.setContracttype(p.getContracttype());
|
||||
|
||||
List<Field<String>> f = p.getFundingtree();
|
||||
if (!f.isEmpty()) {
|
||||
re.setFundingtree(
|
||||
f.stream().map(s -> s.getValue()).collect(Collectors.toList()));
|
||||
}
|
||||
break;
|
||||
}
|
||||
return re;
|
||||
}
|
||||
|
||||
private static String getValue(Field<String> field) {
|
||||
return getFieldValueWithDefault(field, "");
|
||||
}
|
||||
|
||||
private static <T> T getFieldValueWithDefault(Field<T> f, T defaultValue) {
|
||||
return Optional.ofNullable(f)
|
||||
.filter(Objects::nonNull)
|
||||
.map(x -> x.getValue())
|
||||
.orElse(defaultValue);
|
||||
}
|
||||
|
||||
/**
|
||||
* Reads a Dataset of eu.dnetlib.dhp.oa.provision.model.SortableRelation objects from a newline
|
||||
* delimited json text file,
|
||||
|
|
|
@ -76,9 +76,6 @@ public class PrepareRelationsJob {
|
|||
String outputPath = parser.get("outputPath");
|
||||
log.info("outputPath: {}", outputPath);
|
||||
|
||||
int numPartitions = Integer.parseInt(parser.get("relPartitions"));
|
||||
log.info("relPartitions: {}", numPartitions);
|
||||
|
||||
SparkConf conf = new SparkConf();
|
||||
|
||||
runWithSparkSession(
|
||||
|
@ -86,16 +83,14 @@ public class PrepareRelationsJob {
|
|||
isSparkSessionManaged,
|
||||
spark -> {
|
||||
removeOutputDir(spark, outputPath);
|
||||
prepareRelationsFromPaths(spark, inputRelationsPath, outputPath, numPartitions);
|
||||
prepareRelationsFromPaths(spark, inputRelationsPath, outputPath);
|
||||
});
|
||||
}
|
||||
|
||||
private static void prepareRelationsFromPaths(
|
||||
SparkSession spark, String inputRelationsPath, String outputPath, int numPartitions) {
|
||||
SparkSession spark, String inputRelationsPath, String outputPath) {
|
||||
readPathRelation(spark, inputRelationsPath)
|
||||
.filter(
|
||||
(FilterFunction<SortableRelation>)
|
||||
value -> value.getDataInfo().getDeletedbyinference() == false)
|
||||
.filter("dataInfo.deletedbyinference == false")
|
||||
.groupByKey(
|
||||
(MapFunction<SortableRelation, String>) value -> value.getSource(),
|
||||
Encoders.STRING())
|
||||
|
@ -103,7 +98,6 @@ public class PrepareRelationsJob {
|
|||
(FlatMapGroupsFunction<String, SortableRelation, SortableRelation>)
|
||||
(key, values) -> Iterators.limit(values, MAX_RELS),
|
||||
Encoders.bean(SortableRelation.class))
|
||||
.repartition(numPartitions)
|
||||
.write()
|
||||
.mode(SaveMode.Overwrite)
|
||||
.parquet(outputPath);
|
||||
|
|
|
@ -3,14 +3,8 @@ package eu.dnetlib.dhp.oa.provision.utils;
|
|||
import static org.apache.commons.lang3.StringUtils.substringAfter;
|
||||
|
||||
import com.google.common.collect.Sets;
|
||||
import eu.dnetlib.dhp.oa.provision.model.RelatedEntity;
|
||||
import eu.dnetlib.dhp.schema.common.EntityType;
|
||||
import eu.dnetlib.dhp.schema.oaf.*;
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
public class GraphMappingUtils {
|
||||
|
||||
|
@ -18,81 +12,6 @@ public class GraphMappingUtils {
|
|||
|
||||
public static Set<String> authorPidTypes = Sets.newHashSet("orcid", "magidentifier");
|
||||
|
||||
public static <E extends OafEntity> RelatedEntity asRelatedEntity(E entity, Class<E> clazz) {
|
||||
|
||||
final RelatedEntity re = new RelatedEntity();
|
||||
re.setId(entity.getId());
|
||||
re.setType(EntityType.fromClass(clazz).name());
|
||||
|
||||
re.setPid(entity.getPid());
|
||||
re.setCollectedfrom(entity.getCollectedfrom());
|
||||
|
||||
switch (EntityType.fromClass(clazz)) {
|
||||
case publication:
|
||||
case dataset:
|
||||
case otherresearchproduct:
|
||||
case software:
|
||||
Result result = (Result) entity;
|
||||
|
||||
if (result.getTitle() == null && !result.getTitle().isEmpty()) {
|
||||
re.setTitle(result.getTitle().stream().findFirst().get());
|
||||
}
|
||||
|
||||
re.setDateofacceptance(getValue(result.getDateofacceptance()));
|
||||
re.setPublisher(getValue(result.getPublisher()));
|
||||
re.setResulttype(result.getResulttype());
|
||||
re.setInstances(result.getInstance());
|
||||
|
||||
// TODO still to be mapped
|
||||
// re.setCodeRepositoryUrl(j.read("$.coderepositoryurl"));
|
||||
|
||||
break;
|
||||
case datasource:
|
||||
Datasource d = (Datasource) entity;
|
||||
|
||||
re.setOfficialname(getValue(d.getOfficialname()));
|
||||
re.setWebsiteurl(getValue(d.getWebsiteurl()));
|
||||
re.setDatasourcetype(d.getDatasourcetype());
|
||||
re.setOpenairecompatibility(d.getOpenairecompatibility());
|
||||
|
||||
break;
|
||||
case organization:
|
||||
Organization o = (Organization) entity;
|
||||
|
||||
re.setLegalname(getValue(o.getLegalname()));
|
||||
re.setLegalshortname(getValue(o.getLegalshortname()));
|
||||
re.setCountry(o.getCountry());
|
||||
re.setWebsiteurl(getValue(o.getWebsiteurl()));
|
||||
break;
|
||||
case project:
|
||||
Project p = (Project) entity;
|
||||
|
||||
re.setProjectTitle(getValue(p.getTitle()));
|
||||
re.setCode(getValue(p.getCode()));
|
||||
re.setAcronym(getValue(p.getAcronym()));
|
||||
re.setContracttype(p.getContracttype());
|
||||
|
||||
List<Field<String>> f = p.getFundingtree();
|
||||
if (!f.isEmpty()) {
|
||||
re.setFundingtree(
|
||||
f.stream().map(s -> s.getValue()).collect(Collectors.toList()));
|
||||
}
|
||||
break;
|
||||
}
|
||||
return re;
|
||||
}
|
||||
|
||||
private static String getValue(Field<String> field) {
|
||||
return getFieldValueWithDefault(field, "");
|
||||
}
|
||||
|
||||
private static <T> T getFieldValueWithDefault(Field<T> f, T defaultValue) {
|
||||
return Optional.ofNullable(f)
|
||||
.filter(Objects::nonNull)
|
||||
.map(x -> x.getValue())
|
||||
.orElse(defaultValue);
|
||||
}
|
||||
|
||||
public static String removePrefix(final String s) {
|
||||
if (s.contains("|")) return substringAfter(s, "|");
|
||||
return s;
|
||||
|
|
|
@ -22,6 +22,7 @@ import java.io.IOException;
|
|||
import java.io.Serializable;
|
||||
import java.io.StringReader;
|
||||
import java.io.StringWriter;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
@ -41,6 +42,7 @@ import org.dom4j.io.XMLWriter;
|
|||
|
||||
public class XmlRecordFactory implements Serializable {
|
||||
|
||||
public static final String REL_SUBTYPE_DEDUP = "dedup";
|
||||
private Map<String, LongAccumulator> accumulators;
|
||||
|
||||
private Set<String> specialDatasourceTypes;
|
||||
|
@ -91,7 +93,14 @@ public class XmlRecordFactory implements Serializable {
|
|||
// rels has to be processed before the contexts because they enrich the contextMap with
|
||||
// the
|
||||
// funding info.
|
||||
final List<String> relations = listRelations(je, templateFactory, contexts);
|
||||
final List<String> relations =
|
||||
je.getLinks().stream()
|
||||
.filter(
|
||||
t ->
|
||||
!REL_SUBTYPE_DEDUP.equalsIgnoreCase(
|
||||
t.getRelation().getSubRelType()))
|
||||
.map(link -> mapRelation(link, templateFactory, contexts))
|
||||
.collect(Collectors.toCollection(ArrayList::new));
|
||||
|
||||
final String mainType = ModelSupport.getMainType(type);
|
||||
metadata.addAll(buildContexts(mainType, contexts));
|
||||
|
@ -102,7 +111,7 @@ public class XmlRecordFactory implements Serializable {
|
|||
mainType,
|
||||
metadata,
|
||||
relations,
|
||||
listChildren(entity, je.getEntity().getType(), templateFactory),
|
||||
listChildren(entity, je, templateFactory),
|
||||
listExtraInfo(entity));
|
||||
|
||||
return printXML(templateFactory.buildRecord(entity, schemaLocation, body), indent);
|
||||
|
@ -919,171 +928,149 @@ public class XmlRecordFactory implements Serializable {
|
|||
metadata.add(XmlSerializationUtils.mapQualifier("datasourcetypeui", dsType));
|
||||
}
|
||||
|
||||
private Qualifier getBestAccessright(final Result r) {
|
||||
Qualifier bestAccessRight = new Qualifier();
|
||||
bestAccessRight.setClassid("UNKNOWN");
|
||||
bestAccessRight.setClassname("not available");
|
||||
bestAccessRight.setSchemeid("dnet:access_modes");
|
||||
bestAccessRight.setSchemename("dnet:access_modes");
|
||||
private String mapRelation(Tuple2 link, TemplateFactory templateFactory, Set<String> contexts) {
|
||||
final Relation rel = link.getRelation();
|
||||
final RelatedEntity re = link.getRelatedEntity();
|
||||
final String targetType = link.getRelatedEntity().getType();
|
||||
|
||||
final LicenseComparator lc = new LicenseComparator();
|
||||
for (final Instance instance : r.getInstance()) {
|
||||
if (lc.compare(bestAccessRight, instance.getAccessright()) > 0) {
|
||||
bestAccessRight = instance.getAccessright();
|
||||
}
|
||||
final List<String> metadata = Lists.newArrayList();
|
||||
switch (EntityType.valueOf(targetType)) {
|
||||
case publication:
|
||||
case dataset:
|
||||
case otherresearchproduct:
|
||||
case software:
|
||||
if (re.getTitle() != null && isNotBlank(re.getTitle().getValue())) {
|
||||
metadata.add(
|
||||
XmlSerializationUtils.mapStructuredProperty("title", re.getTitle()));
|
||||
}
|
||||
if (isNotBlank(re.getDateofacceptance())) {
|
||||
metadata.add(
|
||||
XmlSerializationUtils.asXmlElement(
|
||||
"dateofacceptance", re.getDateofacceptance()));
|
||||
}
|
||||
if (isNotBlank(re.getPublisher())) {
|
||||
metadata.add(
|
||||
XmlSerializationUtils.asXmlElement("publisher", re.getPublisher()));
|
||||
}
|
||||
if (isNotBlank(re.getCodeRepositoryUrl())) {
|
||||
metadata.add(
|
||||
XmlSerializationUtils.asXmlElement(
|
||||
"coderepositoryurl", re.getCodeRepositoryUrl()));
|
||||
}
|
||||
if (re.getResulttype() != null & re.getResulttype().isBlank()) {
|
||||
metadata.add(
|
||||
XmlSerializationUtils.mapQualifier("resulttype", re.getResulttype()));
|
||||
}
|
||||
if (re.getCollectedfrom() != null) {
|
||||
metadata.addAll(
|
||||
re.getCollectedfrom().stream()
|
||||
.map(
|
||||
kv ->
|
||||
XmlSerializationUtils.mapKeyValue(
|
||||
"collectedfrom", kv))
|
||||
.collect(Collectors.toList()));
|
||||
}
|
||||
if (re.getPid() != null) {
|
||||
metadata.addAll(
|
||||
re.getPid().stream()
|
||||
.map(p -> XmlSerializationUtils.mapStructuredProperty("pid", p))
|
||||
.collect(Collectors.toList()));
|
||||
}
|
||||
break;
|
||||
case datasource:
|
||||
if (isNotBlank(re.getOfficialname())) {
|
||||
metadata.add(
|
||||
XmlSerializationUtils.asXmlElement(
|
||||
"officialname", re.getOfficialname()));
|
||||
}
|
||||
if (re.getDatasourcetype() != null & !re.getDatasourcetype().isBlank()) {
|
||||
mapDatasourceType(metadata, re.getDatasourcetype());
|
||||
}
|
||||
if (re.getOpenairecompatibility() != null
|
||||
& !re.getOpenairecompatibility().isBlank()) {
|
||||
metadata.add(
|
||||
XmlSerializationUtils.mapQualifier(
|
||||
"openairecompatibility", re.getOpenairecompatibility()));
|
||||
}
|
||||
break;
|
||||
case organization:
|
||||
if (isNotBlank(re.getLegalname())) {
|
||||
metadata.add(
|
||||
XmlSerializationUtils.asXmlElement("legalname", re.getLegalname()));
|
||||
}
|
||||
if (isNotBlank(re.getLegalshortname())) {
|
||||
metadata.add(
|
||||
XmlSerializationUtils.asXmlElement(
|
||||
"legalshortname", re.getLegalshortname()));
|
||||
}
|
||||
if (re.getCountry() != null & !re.getCountry().isBlank()) {
|
||||
metadata.add(XmlSerializationUtils.mapQualifier("country", re.getCountry()));
|
||||
}
|
||||
break;
|
||||
case project:
|
||||
if (isNotBlank(re.getProjectTitle())) {
|
||||
metadata.add(XmlSerializationUtils.asXmlElement("title", re.getProjectTitle()));
|
||||
}
|
||||
if (isNotBlank(re.getCode())) {
|
||||
metadata.add(XmlSerializationUtils.asXmlElement("code", re.getCode()));
|
||||
}
|
||||
if (isNotBlank(re.getAcronym())) {
|
||||
metadata.add(XmlSerializationUtils.asXmlElement("acronym", re.getAcronym()));
|
||||
}
|
||||
if (re.getContracttype() != null & !re.getContracttype().isBlank()) {
|
||||
metadata.add(
|
||||
XmlSerializationUtils.mapQualifier(
|
||||
"contracttype", re.getContracttype()));
|
||||
}
|
||||
if (re.getFundingtree() != null & contexts != null) {
|
||||
metadata.addAll(
|
||||
re.getFundingtree().stream()
|
||||
.peek(ft -> fillContextMap(ft, contexts))
|
||||
.map(ft -> getRelFundingTree(ft))
|
||||
.collect(Collectors.toList()));
|
||||
}
|
||||
break;
|
||||
default:
|
||||
throw new IllegalArgumentException("invalid target type: " + targetType);
|
||||
}
|
||||
return bestAccessRight;
|
||||
}
|
||||
final DataInfo info = rel.getDataInfo();
|
||||
final String scheme = ModelSupport.getScheme(re.getType(), targetType);
|
||||
|
||||
private List<String> listRelations(
|
||||
final JoinedEntity je, TemplateFactory templateFactory, final Set<String> contexts) {
|
||||
final List<String> rels = Lists.newArrayList();
|
||||
|
||||
for (final Tuple2 link : je.getLinks()) {
|
||||
|
||||
final Relation rel = link.getRelation();
|
||||
final RelatedEntity re = link.getRelatedEntity();
|
||||
final String targetType = link.getRelatedEntity().getType();
|
||||
|
||||
final List<String> metadata = Lists.newArrayList();
|
||||
switch (EntityType.valueOf(targetType)) {
|
||||
case publication:
|
||||
case dataset:
|
||||
case otherresearchproduct:
|
||||
case software:
|
||||
if (re.getTitle() != null && isNotBlank(re.getTitle().getValue())) {
|
||||
metadata.add(
|
||||
XmlSerializationUtils.mapStructuredProperty(
|
||||
"title", re.getTitle()));
|
||||
}
|
||||
if (isNotBlank(re.getDateofacceptance())) {
|
||||
metadata.add(
|
||||
XmlSerializationUtils.asXmlElement(
|
||||
"dateofacceptance", re.getDateofacceptance()));
|
||||
}
|
||||
if (isNotBlank(re.getPublisher())) {
|
||||
metadata.add(
|
||||
XmlSerializationUtils.asXmlElement("publisher", re.getPublisher()));
|
||||
}
|
||||
if (isNotBlank(re.getCodeRepositoryUrl())) {
|
||||
metadata.add(
|
||||
XmlSerializationUtils.asXmlElement(
|
||||
"coderepositoryurl", re.getCodeRepositoryUrl()));
|
||||
}
|
||||
if (re.getResulttype() != null & re.getResulttype().isBlank()) {
|
||||
metadata.add(
|
||||
XmlSerializationUtils.mapQualifier(
|
||||
"resulttype", re.getResulttype()));
|
||||
}
|
||||
if (re.getCollectedfrom() != null) {
|
||||
metadata.addAll(
|
||||
re.getCollectedfrom().stream()
|
||||
.map(
|
||||
kv ->
|
||||
XmlSerializationUtils.mapKeyValue(
|
||||
"collectedfrom", kv))
|
||||
.collect(Collectors.toList()));
|
||||
}
|
||||
if (re.getPid() != null) {
|
||||
metadata.addAll(
|
||||
re.getPid().stream()
|
||||
.map(
|
||||
p ->
|
||||
XmlSerializationUtils.mapStructuredProperty(
|
||||
"pid", p))
|
||||
.collect(Collectors.toList()));
|
||||
}
|
||||
break;
|
||||
case datasource:
|
||||
if (isNotBlank(re.getOfficialname())) {
|
||||
metadata.add(
|
||||
XmlSerializationUtils.asXmlElement(
|
||||
"officialname", re.getOfficialname()));
|
||||
}
|
||||
if (re.getDatasourcetype() != null & !re.getDatasourcetype().isBlank()) {
|
||||
mapDatasourceType(metadata, re.getDatasourcetype());
|
||||
}
|
||||
if (re.getOpenairecompatibility() != null
|
||||
& !re.getOpenairecompatibility().isBlank()) {
|
||||
metadata.add(
|
||||
XmlSerializationUtils.mapQualifier(
|
||||
"openairecompatibility", re.getOpenairecompatibility()));
|
||||
}
|
||||
break;
|
||||
case organization:
|
||||
if (isNotBlank(re.getLegalname())) {
|
||||
metadata.add(
|
||||
XmlSerializationUtils.asXmlElement("legalname", re.getLegalname()));
|
||||
}
|
||||
if (isNotBlank(re.getLegalshortname())) {
|
||||
metadata.add(
|
||||
XmlSerializationUtils.asXmlElement(
|
||||
"legalshortname", re.getLegalshortname()));
|
||||
}
|
||||
if (re.getCountry() != null & !re.getCountry().isBlank()) {
|
||||
metadata.add(
|
||||
XmlSerializationUtils.mapQualifier("country", re.getCountry()));
|
||||
}
|
||||
break;
|
||||
case project:
|
||||
if (isNotBlank(re.getProjectTitle())) {
|
||||
metadata.add(
|
||||
XmlSerializationUtils.asXmlElement("title", re.getProjectTitle()));
|
||||
}
|
||||
if (isNotBlank(re.getCode())) {
|
||||
metadata.add(XmlSerializationUtils.asXmlElement("code", re.getCode()));
|
||||
}
|
||||
if (isNotBlank(re.getAcronym())) {
|
||||
metadata.add(
|
||||
XmlSerializationUtils.asXmlElement("acronym", re.getAcronym()));
|
||||
}
|
||||
if (re.getContracttype() != null & !re.getContracttype().isBlank()) {
|
||||
metadata.add(
|
||||
XmlSerializationUtils.mapQualifier(
|
||||
"contracttype", re.getContracttype()));
|
||||
}
|
||||
if (re.getFundingtree() != null) {
|
||||
metadata.addAll(
|
||||
re.getFundingtree().stream()
|
||||
.peek(ft -> fillContextMap(ft, contexts))
|
||||
.map(ft -> getRelFundingTree(ft))
|
||||
.collect(Collectors.toList()));
|
||||
}
|
||||
break;
|
||||
default:
|
||||
throw new IllegalArgumentException("invalid target type: " + targetType);
|
||||
}
|
||||
final DataInfo info = rel.getDataInfo();
|
||||
final String scheme = ModelSupport.getScheme(re.getType(), targetType);
|
||||
|
||||
if (StringUtils.isBlank(scheme)) {
|
||||
throw new IllegalArgumentException(
|
||||
String.format("missing scheme for: <%s - %s>", re.getType(), targetType));
|
||||
}
|
||||
|
||||
final String accumulatorName =
|
||||
getRelDescriptor(rel.getRelType(), rel.getSubRelType(), rel.getRelClass());
|
||||
if (accumulators.containsKey(accumulatorName)) {
|
||||
accumulators.get(accumulatorName).add(1);
|
||||
}
|
||||
|
||||
rels.add(
|
||||
templateFactory.getRel(
|
||||
targetType,
|
||||
rel.getTarget(),
|
||||
Sets.newHashSet(metadata),
|
||||
rel.getRelClass(),
|
||||
scheme,
|
||||
info));
|
||||
if (StringUtils.isBlank(scheme)) {
|
||||
throw new IllegalArgumentException(
|
||||
String.format("missing scheme for: <%s - %s>", re.getType(), targetType));
|
||||
}
|
||||
return rels;
|
||||
|
||||
final String accumulatorName =
|
||||
getRelDescriptor(rel.getRelType(), rel.getSubRelType(), rel.getRelClass());
|
||||
if (accumulators.containsKey(accumulatorName)) {
|
||||
accumulators.get(accumulatorName).add(1);
|
||||
}
|
||||
|
||||
return templateFactory.getRel(
|
||||
targetType,
|
||||
rel.getTarget(),
|
||||
Sets.newHashSet(metadata),
|
||||
rel.getRelClass(),
|
||||
scheme,
|
||||
info);
|
||||
}
|
||||
|
||||
private List<String> listChildren(
|
||||
final OafEntity entity, String type, TemplateFactory templateFactory) {
|
||||
final OafEntity entity, JoinedEntity je, TemplateFactory templateFactory) {
|
||||
|
||||
final List<String> children = Lists.newArrayList();
|
||||
EntityType entityType = EntityType.valueOf(type);
|
||||
EntityType entityType = EntityType.valueOf(je.getEntity().getType());
|
||||
|
||||
children.addAll(
|
||||
je.getLinks().stream()
|
||||
.filter(
|
||||
link ->
|
||||
REL_SUBTYPE_DEDUP.equalsIgnoreCase(
|
||||
link.getRelation().getSubRelType()))
|
||||
.map(link -> mapRelation(link, templateFactory, null))
|
||||
.collect(Collectors.toCollection(ArrayList::new)));
|
||||
|
||||
if (MainEntityType.result.toString().equals(ModelSupport.getMainType(entityType))) {
|
||||
final List<Instance> instances = ((Result) entity).getInstance();
|
||||
if (instances != null) {
|
||||
|
|
|
@ -98,6 +98,7 @@
|
|||
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
||||
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
||||
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
||||
--conf spark.sql.shuffle.partitions=3840
|
||||
</spark-opts>
|
||||
<arg>--inputRelationsPath</arg><arg>${inputGraphRootPath}/relation</arg>
|
||||
<arg>--outputPath</arg><arg>${workingDir}/relation</arg>
|
||||
|
|
|
@ -262,7 +262,8 @@
|
|||
sparkDriverMemory,sparkExecutorMemory,sparkExecutorCores,
|
||||
oozie.wf.application.path,projectVersion,oozie.use.system.libpath,
|
||||
oozieActionShareLibForSpark1,spark1YarnHistoryServerAddress,spark1EventLogDir,
|
||||
oozieActionShareLibForSpark2,spark2YarnHistoryServerAddress,spark2EventLogDir
|
||||
oozieActionShareLibForSpark2,spark2YarnHistoryServerAddress,spark2EventLogDir,
|
||||
sparkSqlWarehouseDir
|
||||
</include>
|
||||
<includeSystemProperties>true</includeSystemProperties>
|
||||
<includePropertyKeysFromFiles>
|
||||
|
|
Loading…
Reference in New Issue