merge branch with master

This commit is contained in:
Miriam Baglioni 2020-11-16 14:15:39 +01:00
commit 19167c9b9d
54 changed files with 1045 additions and 603 deletions

View File

@ -15,12 +15,12 @@
<snapshotRepository>
<id>dnet45-snapshots</id>
<name>DNet45 Snapshots</name>
<url>http://maven.research-infrastructures.eu/nexus/content/repositories/dnet45-snapshots</url>
<url>https://maven.d4science.org/nexus/content/repositories/dnet45-snapshots</url>
<layout>default</layout>
</snapshotRepository>
<repository>
<id>dnet45-releases</id>
<url>http://maven.research-infrastructures.eu/nexus/content/repositories/dnet45-releases</url>
<url>https://maven.d4science.org/nexus/content/repositories/dnet45-releases</url>
</repository>
</distributionManagement>

View File

@ -1,5 +1,5 @@
package eu.dnetlib.dhp.oa.graph.raw.common;
package eu.dnetlib.dhp.schema.oaf;
import java.util.ArrayList;
import java.util.Arrays;
@ -13,19 +13,43 @@ import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import eu.dnetlib.dhp.schema.oaf.DataInfo;
import eu.dnetlib.dhp.schema.oaf.ExtraInfo;
import eu.dnetlib.dhp.schema.oaf.Field;
import eu.dnetlib.dhp.schema.oaf.Journal;
import eu.dnetlib.dhp.schema.oaf.KeyValue;
import eu.dnetlib.dhp.schema.oaf.OAIProvenance;
import eu.dnetlib.dhp.schema.oaf.OriginDescription;
import eu.dnetlib.dhp.schema.oaf.Qualifier;
import eu.dnetlib.dhp.schema.oaf.StructuredProperty;
import eu.dnetlib.dhp.schema.common.ModelSupport;
import eu.dnetlib.dhp.utils.DHPUtils;
public class OafMapperUtils {
public static Oaf merge(final Oaf o1, final Oaf o2) {
if (ModelSupport.isSubClass(o1, OafEntity.class)) {
if (ModelSupport.isSubClass(o1, Result.class)) {
return mergeResults((Result) o1, (Result) o2);
} else if (ModelSupport.isSubClass(o1, Datasource.class)) {
((Datasource) o1).mergeFrom((Datasource) o2);
} else if (ModelSupport.isSubClass(o1, Organization.class)) {
((Organization) o1).mergeFrom((Organization) o2);
} else if (ModelSupport.isSubClass(o1, Project.class)) {
((Project) o1).mergeFrom((Project) o2);
} else {
throw new RuntimeException("invalid OafEntity subtype:" + o1.getClass().getCanonicalName());
}
} else if (ModelSupport.isSubClass(o1, Relation.class)) {
((Relation) o1).mergeFrom((Relation) o2);
} else {
throw new RuntimeException("invalid Oaf type:" + o1.getClass().getCanonicalName());
}
return o1;
}
public static Result mergeResults(Result r1, Result r2) {
if (new ResultTypeComparator().compare(r1, r2) < 0) {
r1.mergeFrom(r2);
return r1;
} else {
r2.mergeFrom(r1);
return r2;
}
}
public static KeyValue keyValue(final String k, final String v) {
final KeyValue kv = new KeyValue();
kv.setKey(k);

View File

@ -0,0 +1,49 @@
package eu.dnetlib.dhp.schema.oaf;
import java.util.Comparator;
import eu.dnetlib.dhp.schema.common.ModelConstants;
public class ResultTypeComparator implements Comparator<Result> {
@Override
public int compare(Result left, Result right) {
if (left == null && right == null)
return 0;
if (left == null)
return 1;
if (right == null)
return -1;
String lClass = left.getResulttype().getClassid();
String rClass = right.getResulttype().getClassid();
if (lClass.equals(rClass))
return 0;
if (lClass.equals(ModelConstants.PUBLICATION_RESULTTYPE_CLASSID))
return -1;
if (rClass.equals(ModelConstants.PUBLICATION_RESULTTYPE_CLASSID))
return 1;
if (lClass.equals(ModelConstants.DATASET_RESULTTYPE_CLASSID))
return -1;
if (rClass.equals(ModelConstants.DATASET_RESULTTYPE_CLASSID))
return 1;
if (lClass.equals(ModelConstants.SOFTWARE_RESULTTYPE_CLASSID))
return -1;
if (rClass.equals(ModelConstants.SOFTWARE_RESULTTYPE_CLASSID))
return 1;
if (lClass.equals(ModelConstants.ORP_RESULTTYPE_CLASSID))
return -1;
if (rClass.equals(ModelConstants.ORP_RESULTTYPE_CLASSID))
return 1;
// Else (but unlikely), lexicographical ordering will do.
return lClass.compareTo(rClass);
}
}

View File

@ -5,6 +5,7 @@ import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.nio.charset.StandardCharsets;
import java.security.MessageDigest;
import java.util.List;
import java.util.zip.GZIPInputStream;
import java.util.zip.GZIPOutputStream;
@ -15,9 +16,15 @@ import org.apache.commons.codec.binary.Hex;
import com.jayway.jsonpath.JsonPath;
import net.minidev.json.JSONArray;
import scala.collection.JavaConverters;
import scala.collection.Seq;
public class DHPUtils {
public static Seq<String> toSeq(List<String> list) {
return JavaConverters.asScalaIteratorConverter(list.iterator()).asScala().toSeq();
}
public static String md5(final String s) {
try {
final MessageDigest md = MessageDigest.getInstance("MD5");

View File

@ -1,6 +1,6 @@
package eu.dnetlib.dhp.doiboost
import eu.dnetlib.dhp.schema.oaf.Publication
import eu.dnetlib.dhp.schema.oaf.{Publication, Relation}
import org.apache.spark.SparkContext
import org.apache.spark.sql.{Dataset, Encoder, Encoders, SparkSession}
import org.codehaus.jackson.map.{ObjectMapper, SerializationConfig}
@ -21,6 +21,13 @@ class QueryTest {
}
def has_ands(r:Relation) :Boolean = {
r.getCollectedfrom!= null && r.getCollectedfrom.asScala.count(k => k.getValue.contains("Australian")) > 0
}
def hasInstanceWithUrl(p:Publication):Boolean = {

View File

@ -3,14 +3,12 @@ package eu.dnetlib.dhp.oa.graph.clean;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import java.io.BufferedInputStream;
import java.util.Objects;
import java.util.Optional;
import java.util.stream.Collectors;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FilterFunction;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
@ -23,11 +21,9 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.common.HdfsSupport;
import eu.dnetlib.dhp.oa.graph.raw.AbstractMdRecordToOafMapper;
import eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils;
import eu.dnetlib.dhp.oa.graph.raw.common.VocabularyGroup;
import eu.dnetlib.dhp.schema.common.ModelConstants;
import eu.dnetlib.dhp.schema.oaf.*;
import eu.dnetlib.dhp.schema.oaf.Oaf;
import eu.dnetlib.dhp.schema.oaf.OafEntity;
import eu.dnetlib.dhp.utils.ISLookupClientFactory;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
@ -75,12 +71,12 @@ public class CleanGraphSparkJob {
conf,
isSparkSessionManaged,
spark -> {
removeOutputDir(spark, outputPath);
fixGraphTable(spark, vocs, inputPath, entityClazz, outputPath);
HdfsSupport.remove(outputPath, spark.sparkContext().hadoopConfiguration());
cleanGraphTable(spark, vocs, inputPath, entityClazz, outputPath);
});
}
private static <T extends Oaf> void fixGraphTable(
private static <T extends Oaf> void cleanGraphTable(
SparkSession spark,
VocabularyGroup vocs,
String inputPath,
@ -106,13 +102,15 @@ public class CleanGraphSparkJob {
return spark
.read()
.textFile(inputEntityPath)
.filter((FilterFunction<String>) s -> isEntityType(s, clazz))
.map((MapFunction<String, String>) s -> StringUtils.substringAfter(s, "|"), Encoders.STRING())
.map(
(MapFunction<String, T>) value -> OBJECT_MAPPER.readValue(value, clazz),
Encoders.bean(clazz));
}
private static void removeOutputDir(SparkSession spark, String path) {
HdfsSupport.remove(path, spark.sparkContext().hadoopConfiguration());
private static <T extends Oaf> boolean isEntityType(final String s, final Class<T> clazz) {
return StringUtils.substringBefore(s, "|").equals(clazz.getName());
}
}

View File

@ -11,7 +11,6 @@ import org.apache.commons.lang3.StringUtils;
import com.clearspring.analytics.util.Lists;
import eu.dnetlib.dhp.oa.graph.raw.AbstractMdRecordToOafMapper;
import eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils;
import eu.dnetlib.dhp.schema.common.ModelConstants;
import eu.dnetlib.dhp.schema.oaf.*;
@ -115,7 +114,7 @@ public class CleaningFunctions {
.stream()
.filter(Objects::nonNull)
.filter(sp -> StringUtils.isNotBlank(StringUtils.trim(sp.getValue())))
.filter(sp -> NONE.equalsIgnoreCase(sp.getValue()))
.filter(sp -> !NONE.equalsIgnoreCase(sp.getValue().trim()))
.filter(sp -> Objects.nonNull(sp.getQualifier()))
.filter(sp -> StringUtils.isNotBlank(sp.getQualifier().getClassid()))
.map(sp -> {

View File

@ -0,0 +1,206 @@
package eu.dnetlib.dhp.oa.graph.clean;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import static eu.dnetlib.dhp.utils.DHPUtils.toSeq;
import java.io.IOException;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.stream.Collectors;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FilterFunction;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.*;
import org.apache.spark.sql.expressions.Aggregator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.jayway.jsonpath.Configuration;
import com.jayway.jsonpath.DocumentContext;
import com.jayway.jsonpath.JsonPath;
import com.jayway.jsonpath.Option;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.common.HdfsSupport;
import eu.dnetlib.dhp.schema.common.ModelSupport;
import eu.dnetlib.dhp.schema.oaf.*;
import scala.Tuple2;
/**
* Groups the graph content by entity identifier to ensure ID uniqueness
*/
public class GroupEntitiesAndRelationsSparkJob {
private static final Logger log = LoggerFactory.getLogger(GroupEntitiesAndRelationsSparkJob.class);
private final static String ID_JPATH = "$.id";
private final static String SOURCE_JPATH = "$.source";
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
public static void main(String[] args) throws Exception {
String jsonConfiguration = IOUtils
.toString(
GroupEntitiesAndRelationsSparkJob.class
.getResourceAsStream(
"/eu/dnetlib/dhp/oa/graph/group_graph_entities_parameters.json"));
final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
parser.parseArgument(args);
Boolean isSparkSessionManaged = Optional
.ofNullable(parser.get("isSparkSessionManaged"))
.map(Boolean::valueOf)
.orElse(Boolean.TRUE);
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
String graphInputPath = parser.get("graphInputPath");
log.info("graphInputPath: {}", graphInputPath);
String outputPath = parser.get("outputPath");
log.info("outputPath: {}", outputPath);
SparkConf conf = new SparkConf();
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
conf.registerKryoClasses(ModelSupport.getOafModelClasses());
runWithSparkSession(
conf,
isSparkSessionManaged,
spark -> {
HdfsSupport.remove(outputPath, spark.sparkContext().hadoopConfiguration());
groupEntitiesAndRelations(spark, graphInputPath, outputPath);
});
}
private static void groupEntitiesAndRelations(
SparkSession spark,
String inputPath,
String outputPath) {
TypedColumn<Oaf, Oaf> aggregator = new GroupingAggregator().toColumn();
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
spark
.read()
.textFile(toSeq(listPaths(inputPath, sc)))
.map((MapFunction<String, Oaf>) s -> parseOaf(s), Encoders.kryo(Oaf.class))
.filter((FilterFunction<Oaf>) oaf -> StringUtils.isNotBlank(ModelSupport.idFn().apply(oaf)))
.groupByKey((MapFunction<Oaf, String>) oaf -> ModelSupport.idFn().apply(oaf), Encoders.STRING())
.agg(aggregator)
.map(
(MapFunction<Tuple2<String, Oaf>, String>) t -> t._2().getClass().getName() +
"|" + OBJECT_MAPPER.writeValueAsString(t._2()),
Encoders.STRING())
.write()
.option("compression", "gzip")
.mode(SaveMode.Overwrite)
.text(outputPath);
}
public static class GroupingAggregator extends Aggregator<Oaf, Oaf, Oaf> {
@Override
public Oaf zero() {
return null;
}
@Override
public Oaf reduce(Oaf b, Oaf a) {
return mergeAndGet(b, a);
}
private Oaf mergeAndGet(Oaf b, Oaf a) {
if (Objects.nonNull(a) && Objects.nonNull(b)) {
return OafMapperUtils.merge(b, a);
}
return Objects.isNull(a) ? b : a;
}
@Override
public Oaf merge(Oaf b, Oaf a) {
return mergeAndGet(b, a);
}
@Override
public Oaf finish(Oaf j) {
return j;
}
@Override
public Encoder<Oaf> bufferEncoder() {
return Encoders.kryo(Oaf.class);
}
@Override
public Encoder<Oaf> outputEncoder() {
return Encoders.kryo(Oaf.class);
}
}
private static Oaf parseOaf(String s) {
DocumentContext dc = JsonPath
.parse(s, Configuration.defaultConfiguration().addOptions(Option.SUPPRESS_EXCEPTIONS));
final String id = dc.read(ID_JPATH);
if (StringUtils.isNotBlank(id)) {
String prefix = StringUtils.substringBefore(id, "|");
switch (prefix) {
case "10":
return parse(s, Datasource.class);
case "20":
return parse(s, Organization.class);
case "40":
return parse(s, Project.class);
case "50":
String resultType = dc.read("$.resulttype.classid");
switch (resultType) {
case "publication":
return parse(s, Publication.class);
case "dataset":
return parse(s, eu.dnetlib.dhp.schema.oaf.Dataset.class);
case "software":
return parse(s, Software.class);
case "other":
return parse(s, OtherResearchProduct.class);
default:
throw new IllegalArgumentException(String.format("invalid resultType: '%s'", resultType));
}
default:
throw new IllegalArgumentException(String.format("invalid id prefix: '%s'", prefix));
}
} else {
String source = dc.read(SOURCE_JPATH);
if (StringUtils.isNotBlank(source)) {
return parse(s, Relation.class);
} else {
throw new IllegalArgumentException(String.format("invalid oaf: '%s'", s));
}
}
}
private static <T extends Oaf> Oaf parse(String s, Class<T> clazz) {
try {
return OBJECT_MAPPER.readValue(s, clazz);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
private static List<String> listPaths(String inputPath, JavaSparkContext sc) {
return HdfsSupport
.listFiles(inputPath, sc.hadoopConfiguration())
.stream()
.collect(Collectors.toList());
}
}

View File

@ -33,9 +33,9 @@ import scala.Tuple2;
* are picked preferring those from the BETA aggregator rather then from PROD. The identity of a relationship is defined
* by eu.dnetlib.dhp.schema.common.ModelSupport#idFn()
*/
public class MergeGraphSparkJob {
public class MergeGraphTableSparkJob {
private static final Logger log = LoggerFactory.getLogger(CleanGraphSparkJob.class);
private static final Logger log = LoggerFactory.getLogger(MergeGraphTableSparkJob.class);
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();

View File

@ -1,8 +1,8 @@
package eu.dnetlib.dhp.oa.graph.raw;
import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.*;
import static eu.dnetlib.dhp.schema.common.ModelConstants.*;
import static eu.dnetlib.dhp.schema.oaf.OafMapperUtils.*;
import java.util.*;

View File

@ -29,16 +29,7 @@ import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.common.HdfsSupport;
import eu.dnetlib.dhp.oa.graph.raw.common.VocabularyGroup;
import eu.dnetlib.dhp.schema.common.ModelSupport;
import eu.dnetlib.dhp.schema.oaf.Dataset;
import eu.dnetlib.dhp.schema.oaf.Datasource;
import eu.dnetlib.dhp.schema.oaf.Oaf;
import eu.dnetlib.dhp.schema.oaf.OafEntity;
import eu.dnetlib.dhp.schema.oaf.Organization;
import eu.dnetlib.dhp.schema.oaf.OtherResearchProduct;
import eu.dnetlib.dhp.schema.oaf.Project;
import eu.dnetlib.dhp.schema.oaf.Publication;
import eu.dnetlib.dhp.schema.oaf.Relation;
import eu.dnetlib.dhp.schema.oaf.Software;
import eu.dnetlib.dhp.schema.oaf.*;
import eu.dnetlib.dhp.utils.ISLookupClientFactory;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
import scala.Tuple2;
@ -78,7 +69,7 @@ public class GenerateEntitiesApplication {
final SparkConf conf = new SparkConf();
runWithSparkSession(conf, isSparkSessionManaged, spark -> {
removeOutputDir(spark, targetPath);
HdfsSupport.remove(targetPath, spark.sparkContext().hadoopConfiguration());
generateEntities(spark, vocs, sourcePaths, targetPath);
});
}
@ -92,7 +83,7 @@ public class GenerateEntitiesApplication {
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
final List<String> existingSourcePaths = Arrays
.stream(sourcePaths.split(","))
.filter(p -> exists(sc, p))
.filter(p -> HdfsSupport.exists(p, sc.hadoopConfiguration()))
.collect(Collectors.toList());
log.info("Generate entities from files:");
@ -113,7 +104,7 @@ public class GenerateEntitiesApplication {
inputRdd
.mapToPair(oaf -> new Tuple2<>(ModelSupport.idFn().apply(oaf), oaf))
.reduceByKey((o1, o2) -> merge(o1, o2))
.reduceByKey((o1, o2) -> OafMapperUtils.merge(o1, o2))
.map(Tuple2::_2)
.map(
oaf -> oaf.getClass().getSimpleName().toLowerCase()
@ -122,17 +113,6 @@ public class GenerateEntitiesApplication {
.saveAsTextFile(targetPath, GzipCodec.class);
}
private static Oaf merge(final Oaf o1, final Oaf o2) {
if (ModelSupport.isSubClass(o1, OafEntity.class)) {
((OafEntity) o1).mergeFrom((OafEntity) o2);
} else if (ModelSupport.isSubClass(o1, Relation.class)) {
((Relation) o1).mergeFrom((Relation) o2);
} else {
throw new RuntimeException("invalid Oaf type:" + o1.getClass().getCanonicalName());
}
return o1;
}
private static List<Oaf> convertToListOaf(
final String id,
final String s,
@ -181,17 +161,4 @@ public class GenerateEntitiesApplication {
}
}
private static boolean exists(final JavaSparkContext context, final String pathToFile) {
try {
final FileSystem hdfs = FileSystem.get(context.hadoopConfiguration());
final Path path = new Path(pathToFile);
return hdfs.exists(path);
} catch (final IOException e) {
throw new RuntimeException(e);
}
}
private static void removeOutputDir(final SparkSession spark, final String path) {
HdfsSupport.remove(path, spark.sparkContext().hadoopConfiguration());
}
}

View File

@ -1,15 +1,6 @@
package eu.dnetlib.dhp.oa.graph.raw;
import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.asString;
import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.createOpenaireId;
import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.dataInfo;
import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.field;
import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.journal;
import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.listFields;
import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.listKeyValues;
import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.qualifier;
import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.structuredProperty;
import static eu.dnetlib.dhp.schema.common.ModelConstants.DATASET_DEFAULT_RESULTTYPE;
import static eu.dnetlib.dhp.schema.common.ModelConstants.DATASOURCE_ORGANIZATION;
import static eu.dnetlib.dhp.schema.common.ModelConstants.DNET_PROVENANCE_ACTIONS;
@ -32,13 +23,18 @@ import static eu.dnetlib.dhp.schema.common.ModelConstants.RESULT_PROJECT;
import static eu.dnetlib.dhp.schema.common.ModelConstants.RESULT_RESULT;
import static eu.dnetlib.dhp.schema.common.ModelConstants.SOFTWARE_DEFAULT_RESULTTYPE;
import static eu.dnetlib.dhp.schema.common.ModelConstants.USER_CLAIM;
import static eu.dnetlib.dhp.schema.oaf.OafMapperUtils.*;
import java.io.Closeable;
import java.io.IOException;
import java.sql.Array;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.*;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Date;
import java.util.List;
import java.util.Objects;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Predicate;
@ -174,7 +170,8 @@ public class MigrateDbEntitiesApplication extends AbstractMigrationApplication i
execute(sqlFile, producer, oaf -> true);
}
public void execute(final String sqlFile, final Function<ResultSet, List<Oaf>> producer,
public void execute(final String sqlFile,
final Function<ResultSet, List<Oaf>> producer,
final Predicate<Oaf> predicate)
throws Exception {
final String sql = IOUtils.toString(getClass().getResourceAsStream("/eu/dnetlib/dhp/oa/graph/sql/" + sqlFile));
@ -198,8 +195,7 @@ public class MigrateDbEntitiesApplication extends AbstractMigrationApplication i
ds
.setOriginalId(
Arrays
.asList(
(String[]) rs.getArray("identities").getArray())
.asList((String[]) rs.getArray("identities").getArray())
.stream()
.filter(StringUtils::isNotBlank)
.collect(Collectors.toList()));
@ -250,11 +246,8 @@ public class MigrateDbEntitiesApplication extends AbstractMigrationApplication i
ds
.setJournal(
journal(
rs.getString("officialname"),
rs.getString("issnPrinted"),
rs.getString("issnOnline"),
rs.getString("issnLinking"),
info)); // Journal
rs.getString("officialname"), rs.getString("issnPrinted"), rs.getString("issnOnline"),
rs.getString("issnLinking"), info)); // Journal
ds.setDataInfo(info);
ds.setLastupdatetimestamp(lastUpdateTimestamp);
@ -332,7 +325,7 @@ public class MigrateDbEntitiesApplication extends AbstractMigrationApplication i
listKeyValues(
createOpenaireId(10, rs.getString("collectedfromid"), true),
rs.getString("collectedfromname")));
o.setPid(new ArrayList<>());
o.setPid(prepareListOfStructProps(rs.getArray("pid"), info));
o.setDateofcollection(asString(rs.getDate("dateofcollection")));
o.setDateoftransformation(asString(rs.getDate("dateoftransformation")));
o.setExtraInfo(new ArrayList<>()); // Values not present in the DB
@ -526,9 +519,12 @@ public class MigrateDbEntitiesApplication extends AbstractMigrationApplication i
final Boolean deletedbyinference = rs.getBoolean("deletedbyinference");
final String inferenceprovenance = rs.getString("inferenceprovenance");
final Boolean inferred = rs.getBoolean("inferred");
final String trust = rs.getString("trust");
final double trust = rs.getDouble("trust");
return dataInfo(
deletedbyinference, inferenceprovenance, inferred, false, ENTITYREGISTRY_PROVENANCE_ACTION, trust);
deletedbyinference, inferenceprovenance, inferred, false, ENTITYREGISTRY_PROVENANCE_ACTION,
String.format("%.3f", trust));
}
private Qualifier prepareQualifierSplitting(final String s) {

View File

@ -1,10 +1,10 @@
package eu.dnetlib.dhp.oa.graph.raw;
import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.createOpenaireId;
import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.field;
import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.structuredProperty;
import static eu.dnetlib.dhp.schema.common.ModelConstants.*;
import static eu.dnetlib.dhp.schema.oaf.OafMapperUtils.createOpenaireId;
import static eu.dnetlib.dhp.schema.oaf.OafMapperUtils.field;
import static eu.dnetlib.dhp.schema.oaf.OafMapperUtils.structuredProperty;
import java.util.ArrayList;
import java.util.List;

View File

@ -1,10 +1,10 @@
package eu.dnetlib.dhp.oa.graph.raw;
import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.createOpenaireId;
import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.field;
import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.structuredProperty;
import static eu.dnetlib.dhp.schema.common.ModelConstants.*;
import static eu.dnetlib.dhp.schema.oaf.OafMapperUtils.createOpenaireId;
import static eu.dnetlib.dhp.schema.oaf.OafMapperUtils.field;
import static eu.dnetlib.dhp.schema.oaf.OafMapperUtils.structuredProperty;
import java.util.ArrayList;
import java.util.Arrays;

View File

@ -10,6 +10,7 @@ import org.apache.commons.lang3.StringUtils;
import com.google.common.collect.Maps;
import eu.dnetlib.dhp.schema.oaf.OafMapperUtils;
import eu.dnetlib.dhp.schema.oaf.Qualifier;
public class Vocabulary implements Serializable {

View File

@ -7,6 +7,7 @@ import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import eu.dnetlib.dhp.schema.oaf.OafMapperUtils;
import eu.dnetlib.dhp.schema.oaf.Qualifier;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;

View File

@ -0,0 +1,3 @@
package eu.dnetlib.dhp.sx.graph
case class IdReplace(newId:String, oldId:String) {}

View File

@ -1,12 +1,15 @@
package eu.dnetlib.dhp.sx.graph
import eu.dnetlib.dhp.application.ArgumentApplicationParser
import eu.dnetlib.dhp.schema.oaf.{Oaf, Relation}
import eu.dnetlib.dhp.schema.oaf.{Oaf, Relation, Result}
import eu.dnetlib.dhp.schema.scholexplorer.{DLIDataset, DLIPublication, DLIUnknown}
import eu.dnetlib.dhp.sx.ebi.EBIAggregator
import org.apache.commons.io.IOUtils
import org.apache.commons.lang3.StringUtils
import org.apache.spark.sql.{Dataset, Encoder, Encoders, SaveMode, SparkSession}
import org.slf4j.LoggerFactory
import org.apache.spark.sql.functions.col
object SparkSplitOafTODLIEntities {
@ -83,14 +86,42 @@ object SparkSplitOafTODLIEntities {
}
def extract_ids(o:Oaf) :(String, String) = {
o match {
case p: DLIPublication =>
val prefix = StringUtils.substringBefore(p.getId, "|")
val original = StringUtils.substringAfter(p.getOriginalObjIdentifier, "::")
(p.getId, s"$prefix|$original")
case p: DLIDataset =>
val prefix = StringUtils.substringBefore(p.getId, "|")
val original = StringUtils.substringAfter(p.getOriginalObjIdentifier, "::")
(p.getId, s"$prefix|$original")
case _ =>null
}
}
def extract_relations(spark:SparkSession, workingPath:String) :Unit = {
implicit val oafEncoder: Encoder[Oaf] = Encoders.kryo[Oaf]
implicit val relEncoder: Encoder[Relation] = Encoders.kryo[Relation]
import spark.implicits._
val OAFDataset:Dataset[Oaf] = spark.read.load(s"$workingPath/input/OAFDataset").as[Oaf]
val ebi_relation:Dataset[Relation] = spark.read.load(s"$workingPath/ebi/baseline_relation_ebi").as[Relation].repartition(2000)
OAFDataset
.filter(o => o.isInstanceOf[Result])
.map(extract_ids)(Encoders.tuple(Encoders.STRING, Encoders.STRING))
.filter(r => r != null)
.where("_1 != _2")
.select(col("_1").alias("newId"), col("_2").alias("oldId"))
.distinct()
.map(f => IdReplace(f.getString(0), f.getString(1)))
.write.mode(SaveMode.Overwrite).save(s"$workingPath/graph/id_replace")
OAFDataset
.filter(s => s != null && s.isInstanceOf[Relation])
.map(s =>s.asInstanceOf[Relation])
@ -100,7 +131,41 @@ object SparkSplitOafTODLIEntities {
.agg(EBIAggregator.getRelationAggregator().toColumn)
.map(p => p._2)
.repartition(4000)
.write.mode(SaveMode.Overwrite).save(s"$workingPath/graph/relation")
.write.mode(SaveMode.Overwrite).save(s"$workingPath/graph/relation_unfixed")
val relations = spark.read.load(s"$workingPath/graph/relation_unfixed").as[Relation]
val ids = spark.read.load(s"$workingPath/graph/id_replace").as[IdReplace]
relations
.map(r => (r.getSource, r))(Encoders.tuple(Encoders.STRING, relEncoder))
.joinWith(ids, col("_1").equalTo(ids("oldId")), "left")
.map(i =>{
val r = i._1._2
if (i._2 != null)
{
val id = i._2.newId
r.setSource(id)
}
r
}).write.mode(SaveMode.Overwrite).save(s"$workingPath/graph/rel_f_source")
val rel_source:Dataset[Relation] = spark.read.load(s"$workingPath/graph/rel_f_source").as[Relation]
rel_source
.map(r => (r.getTarget, r))(Encoders.tuple(Encoders.STRING, relEncoder))
.joinWith(ids, col("_1").equalTo(ids("oldId")), "left")
.map(i =>{
val r:Relation = i._1._2
if (i._2 != null)
{
val id = i._2.newId
r.setTarget(id)
}
r
}).write.mode(SaveMode.Overwrite).save(s"$workingPath/graph/relation")
}

View File

@ -50,12 +50,36 @@
</property>
</parameters>
<start to="fork_clean_graph"/>
<start to="group_entities"/>
<kill name="Kill">
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<action name="group_entities">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>group graph entities and relations</name>
<class>eu.dnetlib.dhp.oa.graph.clean.GroupEntitiesAndRelationsSparkJob</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemory}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=7680
</spark-opts>
<arg>--graphInputPath</arg><arg>${graphInputPath}</arg>
<arg>--outputPath</arg><arg>${workingDir}/grouped_entities</arg>
</spark>
<ok to="fork_clean_graph"/>
<error to="Kill"/>
</action>
<fork name="fork_clean_graph">
<path start="clean_publication"/>
<path start="clean_dataset"/>
@ -84,7 +108,7 @@
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=7680
</spark-opts>
<arg>--inputPath</arg><arg>${graphInputPath}/publication</arg>
<arg>--inputPath</arg><arg>${workingDir}/grouped_entities</arg>
<arg>--outputPath</arg><arg>${graphOutputPath}/publication</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Publication</arg>
<arg>--isLookupUrl</arg><arg>${isLookupUrl}</arg>
@ -110,7 +134,7 @@
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=7680
</spark-opts>
<arg>--inputPath</arg><arg>${graphInputPath}/dataset</arg>
<arg>--inputPath</arg><arg>${workingDir}/grouped_entities</arg>
<arg>--outputPath</arg><arg>${graphOutputPath}/dataset</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Dataset</arg>
<arg>--isLookupUrl</arg><arg>${isLookupUrl}</arg>
@ -136,7 +160,7 @@
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=7680
</spark-opts>
<arg>--inputPath</arg><arg>${graphInputPath}/otherresearchproduct</arg>
<arg>--inputPath</arg><arg>${workingDir}/grouped_entities</arg>
<arg>--outputPath</arg><arg>${graphOutputPath}/otherresearchproduct</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.OtherResearchProduct</arg>
<arg>--isLookupUrl</arg><arg>${isLookupUrl}</arg>
@ -162,7 +186,7 @@
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=7680
</spark-opts>
<arg>--inputPath</arg><arg>${graphInputPath}/software</arg>
<arg>--inputPath</arg><arg>${workingDir}/grouped_entities</arg>
<arg>--outputPath</arg><arg>${graphOutputPath}/software</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Software</arg>
<arg>--isLookupUrl</arg><arg>${isLookupUrl}</arg>
@ -188,7 +212,7 @@
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=7680
</spark-opts>
<arg>--inputPath</arg><arg>${graphInputPath}/datasource</arg>
<arg>--inputPath</arg><arg>${workingDir}/grouped_entities</arg>
<arg>--outputPath</arg><arg>${graphOutputPath}/datasource</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Datasource</arg>
<arg>--isLookupUrl</arg><arg>${isLookupUrl}</arg>
@ -214,7 +238,7 @@
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=7680
</spark-opts>
<arg>--inputPath</arg><arg>${graphInputPath}/organization</arg>
<arg>--inputPath</arg><arg>${workingDir}/grouped_entities</arg>
<arg>--outputPath</arg><arg>${graphOutputPath}/organization</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Organization</arg>
<arg>--isLookupUrl</arg><arg>${isLookupUrl}</arg>
@ -240,7 +264,7 @@
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=7680
</spark-opts>
<arg>--inputPath</arg><arg>${graphInputPath}/project</arg>
<arg>--inputPath</arg><arg>${workingDir}/grouped_entities</arg>
<arg>--outputPath</arg><arg>${graphOutputPath}/project</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Project</arg>
<arg>--isLookupUrl</arg><arg>${isLookupUrl}</arg>
@ -266,7 +290,7 @@
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=7680
</spark-opts>
<arg>--inputPath</arg><arg>${graphInputPath}/relation</arg>
<arg>--inputPath</arg><arg>${workingDir}/grouped_entities</arg>
<arg>--outputPath</arg><arg>${graphOutputPath}/relation</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Relation</arg>
<arg>--isLookupUrl</arg><arg>${isLookupUrl}</arg>

View File

@ -0,0 +1,20 @@
[
{
"paramName": "issm",
"paramLongName": "isSparkSessionManaged",
"paramDescription": "when true will stop SparkSession after job execution",
"paramRequired": false
},
{
"paramName": "gin",
"paramLongName": "graphInputPath",
"paramDescription": "the graph root path",
"paramRequired": true
},
{
"paramName": "out",
"paramLongName": "outputPath",
"paramDescription": "the output merged graph root path",
"paramRequired": true
}
]

View File

@ -2,11 +2,11 @@
<parameters>
<property>
<name>betaInputGgraphPath</name>
<name>betaInputGraphPath</name>
<description>the beta graph root path</description>
</property>
<property>
<name>prodInputGgraphPath</name>
<name>prodInputGraphPath</name>
<description>the production graph root path</description>
</property>
<property>
@ -76,7 +76,7 @@
<master>yarn</master>
<mode>cluster</mode>
<name>Merge publications</name>
<class>eu.dnetlib.dhp.oa.graph.merge.MergeGraphSparkJob</class>
<class>eu.dnetlib.dhp.oa.graph.merge.MergeGraphTableSparkJob</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCores}
@ -88,8 +88,8 @@
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=7680
</spark-opts>
<arg>--betaInputPath</arg><arg>${betaInputGgraphPath}/publication</arg>
<arg>--prodInputPath</arg><arg>${prodInputGgraphPath}/publication</arg>
<arg>--betaInputPath</arg><arg>${betaInputGraphPath}/publication</arg>
<arg>--prodInputPath</arg><arg>${prodInputGraphPath}/publication</arg>
<arg>--outputPath</arg><arg>${graphOutputPath}/publication</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Publication</arg>
<arg>--priority</arg><arg>${priority}</arg>
@ -103,7 +103,7 @@
<master>yarn</master>
<mode>cluster</mode>
<name>Merge datasets</name>
<class>eu.dnetlib.dhp.oa.graph.merge.MergeGraphSparkJob</class>
<class>eu.dnetlib.dhp.oa.graph.merge.MergeGraphTableSparkJob</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCores}
@ -115,8 +115,8 @@
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=7680
</spark-opts>
<arg>--betaInputPath</arg><arg>${betaInputGgraphPath}/dataset</arg>
<arg>--prodInputPath</arg><arg>${prodInputGgraphPath}/dataset</arg>
<arg>--betaInputPath</arg><arg>${betaInputGraphPath}/dataset</arg>
<arg>--prodInputPath</arg><arg>${prodInputGraphPath}/dataset</arg>
<arg>--outputPath</arg><arg>${graphOutputPath}/dataset</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Dataset</arg>
<arg>--priority</arg><arg>${priority}</arg>
@ -130,7 +130,7 @@
<master>yarn</master>
<mode>cluster</mode>
<name>Merge otherresearchproducts</name>
<class>eu.dnetlib.dhp.oa.graph.merge.MergeGraphSparkJob</class>
<class>eu.dnetlib.dhp.oa.graph.merge.MergeGraphTableSparkJob</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCores}
@ -142,8 +142,8 @@
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=7680
</spark-opts>
<arg>--betaInputPath</arg><arg>${betaInputGgraphPath}/otherresearchproduct</arg>
<arg>--prodInputPath</arg><arg>${prodInputGgraphPath}/otherresearchproduct</arg>
<arg>--betaInputPath</arg><arg>${betaInputGraphPath}/otherresearchproduct</arg>
<arg>--prodInputPath</arg><arg>${prodInputGraphPath}/otherresearchproduct</arg>
<arg>--outputPath</arg><arg>${graphOutputPath}/otherresearchproduct</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.OtherResearchProduct</arg>
<arg>--priority</arg><arg>${priority}</arg>
@ -157,7 +157,7 @@
<master>yarn</master>
<mode>cluster</mode>
<name>Merge softwares</name>
<class>eu.dnetlib.dhp.oa.graph.merge.MergeGraphSparkJob</class>
<class>eu.dnetlib.dhp.oa.graph.merge.MergeGraphTableSparkJob</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCores}
@ -169,8 +169,8 @@
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=7680
</spark-opts>
<arg>--betaInputPath</arg><arg>${betaInputGgraphPath}/software</arg>
<arg>--prodInputPath</arg><arg>${prodInputGgraphPath}/software</arg>
<arg>--betaInputPath</arg><arg>${betaInputGraphPath}/software</arg>
<arg>--prodInputPath</arg><arg>${prodInputGraphPath}/software</arg>
<arg>--outputPath</arg><arg>${graphOutputPath}/software</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Software</arg>
<arg>--priority</arg><arg>${priority}</arg>
@ -184,7 +184,7 @@
<master>yarn</master>
<mode>cluster</mode>
<name>Merge datasources</name>
<class>eu.dnetlib.dhp.oa.graph.merge.MergeGraphSparkJob</class>
<class>eu.dnetlib.dhp.oa.graph.merge.MergeGraphTableSparkJob</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCores}
@ -196,8 +196,8 @@
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=7680
</spark-opts>
<arg>--betaInputPath</arg><arg>${betaInputGgraphPath}/datasource</arg>
<arg>--prodInputPath</arg><arg>${prodInputGgraphPath}/datasource</arg>
<arg>--betaInputPath</arg><arg>${betaInputGraphPath}/datasource</arg>
<arg>--prodInputPath</arg><arg>${prodInputGraphPath}/datasource</arg>
<arg>--outputPath</arg><arg>${graphOutputPath}/datasource</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Datasource</arg>
<arg>--priority</arg><arg>${priority}</arg>
@ -211,7 +211,7 @@
<master>yarn</master>
<mode>cluster</mode>
<name>Merge organizations</name>
<class>eu.dnetlib.dhp.oa.graph.merge.MergeGraphSparkJob</class>
<class>eu.dnetlib.dhp.oa.graph.merge.MergeGraphTableSparkJob</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCores}
@ -223,8 +223,8 @@
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=7680
</spark-opts>
<arg>--betaInputPath</arg><arg>${betaInputGgraphPath}/organization</arg>
<arg>--prodInputPath</arg><arg>${prodInputGgraphPath}/organization</arg>
<arg>--betaInputPath</arg><arg>${betaInputGraphPath}/organization</arg>
<arg>--prodInputPath</arg><arg>${prodInputGraphPath}/organization</arg>
<arg>--outputPath</arg><arg>${graphOutputPath}/organization</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Organization</arg>
<arg>--priority</arg><arg>${priority}</arg>
@ -238,7 +238,7 @@
<master>yarn</master>
<mode>cluster</mode>
<name>Merge projects</name>
<class>eu.dnetlib.dhp.oa.graph.merge.MergeGraphSparkJob</class>
<class>eu.dnetlib.dhp.oa.graph.merge.MergeGraphTableSparkJob</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCores}
@ -250,8 +250,8 @@
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=7680
</spark-opts>
<arg>--betaInputPath</arg><arg>${betaInputGgraphPath}/project</arg>
<arg>--prodInputPath</arg><arg>${prodInputGgraphPath}/project</arg>
<arg>--betaInputPath</arg><arg>${betaInputGraphPath}/project</arg>
<arg>--prodInputPath</arg><arg>${prodInputGraphPath}/project</arg>
<arg>--outputPath</arg><arg>${graphOutputPath}/project</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Project</arg>
<arg>--priority</arg><arg>${priority}</arg>
@ -265,7 +265,7 @@
<master>yarn</master>
<mode>cluster</mode>
<name>Merge relations</name>
<class>eu.dnetlib.dhp.oa.graph.merge.MergeGraphSparkJob</class>
<class>eu.dnetlib.dhp.oa.graph.merge.MergeGraphTableSparkJob</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCores}
@ -277,8 +277,8 @@
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=7680
</spark-opts>
<arg>--betaInputPath</arg><arg>${betaInputGgraphPath}/relation</arg>
<arg>--prodInputPath</arg><arg>${prodInputGgraphPath}/relation</arg>
<arg>--betaInputPath</arg><arg>${betaInputGraphPath}/relation</arg>
<arg>--prodInputPath</arg><arg>${prodInputGraphPath}/relation</arg>
<arg>--outputPath</arg><arg>${graphOutputPath}/relation</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Relation</arg>
<arg>--priority</arg><arg>${priority}</arg>

View File

@ -24,12 +24,30 @@ SELECT
d.officialname AS collectedfromname,
o.country || '@@@dnet:countries' AS country,
'sysimport:crosswalk:entityregistry@@@dnet:provenance_actions' AS provenanceaction,
ARRAY[]::text[] AS pid
array_remove(array_agg(DISTINCT i.pid || '###' || i.issuertype), NULL) AS pid
FROM dsm_organizations o
LEFT OUTER JOIN dsm_datasources d ON (d.id = o.collectedfrom)
LEFT OUTER JOIN dsm_organizationpids p ON (p.organization = o.id)
LEFT OUTER JOIN dsm_identities i ON (i.pid = p.pid)
GROUP BY
o.id,
o.legalshortname,
o.legalname,
o.websiteurl,
o.logourl,
o.ec_legalbody,
o.ec_legalperson,
o.ec_nonprofit,
o.ec_researchorganization,
o.ec_highereducation,
o.ec_internationalorganizationeurinterests,
o.ec_internationalorganization,
o.ec_enterprise,
o.ec_smevalidated,
o.ec_nutscode,
o.dateofcollection,
o.lastupdate,
o.trust,
d.id,
d.officialname,
o.country

View File

@ -15,7 +15,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.schema.oaf.Datasource;
public class MergeGraphSparkJobTest {
public class MergeGraphTableSparkJobTest {
private ObjectMapper mapper;
@ -28,7 +28,7 @@ public class MergeGraphSparkJobTest {
public void testMergeDatasources() throws IOException {
assertEquals(
"openaire-cris_1.1",
MergeGraphSparkJob
MergeGraphTableSparkJob
.mergeDatasource(
d("datasource_cris.json"),
d("datasource_UNKNOWN.json"))
@ -36,7 +36,7 @@ public class MergeGraphSparkJobTest {
.getClassid());
assertEquals(
"openaire-cris_1.1",
MergeGraphSparkJob
MergeGraphTableSparkJob
.mergeDatasource(
d("datasource_UNKNOWN.json"),
d("datasource_cris.json"))
@ -44,7 +44,7 @@ public class MergeGraphSparkJobTest {
.getClassid());
assertEquals(
"driver-openaire2.0",
MergeGraphSparkJob
MergeGraphTableSparkJob
.mergeDatasource(
d("datasource_native.json"),
d("datasource_driver-openaire2.0.json"))
@ -52,7 +52,7 @@ public class MergeGraphSparkJobTest {
.getClassid());
assertEquals(
"driver-openaire2.0",
MergeGraphSparkJob
MergeGraphTableSparkJob
.mergeDatasource(
d("datasource_driver-openaire2.0.json"),
d("datasource_native.json"))
@ -60,7 +60,7 @@ public class MergeGraphSparkJobTest {
.getClassid());
assertEquals(
"openaire4.0",
MergeGraphSparkJob
MergeGraphTableSparkJob
.mergeDatasource(
d("datasource_notCompatible.json"),
d("datasource_openaire4.0.json"))
@ -68,7 +68,7 @@ public class MergeGraphSparkJobTest {
.getClassid());
assertEquals(
"notCompatible",
MergeGraphSparkJob
MergeGraphTableSparkJob
.mergeDatasource(
d("datasource_notCompatible.json"),
d("datasource_UNKNOWN.json"))

View File

@ -0,0 +1,99 @@
package eu.dnetlib.dhp.oa.graph.raw;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.lenient;
import java.io.IOException;
import java.util.List;
import org.apache.commons.io.IOUtils;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import eu.dnetlib.dhp.oa.graph.clean.CleaningFunctionTest;
import eu.dnetlib.dhp.oa.graph.raw.common.VocabularyGroup;
import eu.dnetlib.dhp.schema.common.ModelConstants;
import eu.dnetlib.dhp.schema.oaf.*;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
@ExtendWith(MockitoExtension.class)
public class GenerateEntitiesApplicationTest {
@Mock
private ISLookUpService isLookUpService;
@Mock
private VocabularyGroup vocs;
@BeforeEach
public void setUp() throws IOException, ISLookUpException {
lenient().when(isLookUpService.quickSearchProfile(VocabularyGroup.VOCABULARIES_XQUERY)).thenReturn(vocs());
lenient()
.when(isLookUpService.quickSearchProfile(VocabularyGroup.VOCABULARY_SYNONYMS_XQUERY))
.thenReturn(synonyms());
vocs = VocabularyGroup.loadVocsFromIS(isLookUpService);
}
@Test
public void testMergeResult() throws IOException {
Result publication = getResult("oaf_record.xml", Publication.class);
Result dataset = getResult("odf_dataset.xml", Dataset.class);
Result software = getResult("odf_software.xml", Software.class);
Result orp = getResult("oaf_orp.xml", OtherResearchProduct.class);
verifyMerge(publication, dataset, Publication.class, ModelConstants.PUBLICATION_RESULTTYPE_CLASSID);
verifyMerge(dataset, publication, Publication.class, ModelConstants.PUBLICATION_RESULTTYPE_CLASSID);
verifyMerge(publication, software, Publication.class, ModelConstants.PUBLICATION_RESULTTYPE_CLASSID);
verifyMerge(software, publication, Publication.class, ModelConstants.PUBLICATION_RESULTTYPE_CLASSID);
verifyMerge(publication, orp, Publication.class, ModelConstants.PUBLICATION_RESULTTYPE_CLASSID);
verifyMerge(orp, publication, Publication.class, ModelConstants.PUBLICATION_RESULTTYPE_CLASSID);
verifyMerge(dataset, software, Dataset.class, ModelConstants.DATASET_RESULTTYPE_CLASSID);
verifyMerge(software, dataset, Dataset.class, ModelConstants.DATASET_RESULTTYPE_CLASSID);
verifyMerge(dataset, orp, Dataset.class, ModelConstants.DATASET_RESULTTYPE_CLASSID);
verifyMerge(orp, dataset, Dataset.class, ModelConstants.DATASET_RESULTTYPE_CLASSID);
verifyMerge(software, orp, Software.class, ModelConstants.SOFTWARE_RESULTTYPE_CLASSID);
verifyMerge(orp, software, Software.class, ModelConstants.SOFTWARE_RESULTTYPE_CLASSID);
}
protected <T extends Result> void verifyMerge(Result publication, Result dataset, Class<T> clazz,
String resultType) {
final Result merge = OafMapperUtils.mergeResults(publication, dataset);
assertTrue(clazz.isAssignableFrom(merge.getClass()));
assertEquals(resultType, merge.getResulttype().getClassid());
}
protected <T extends Result> Result getResult(String xmlFileName, Class<T> clazz) throws IOException {
final String xml = IOUtils.toString(getClass().getResourceAsStream(xmlFileName));
return new OdfToOafMapper(vocs, false)
.processMdRecord(xml)
.stream()
.filter(s -> clazz.isAssignableFrom(s.getClass()))
.map(s -> (Result) s)
.findFirst()
.get();
}
private List<String> vocs() throws IOException {
return IOUtils
.readLines(CleaningFunctionTest.class.getResourceAsStream("/eu/dnetlib/dhp/oa/graph/clean/terms.txt"));
}
private List<String> synonyms() throws IOException {
return IOUtils
.readLines(CleaningFunctionTest.class.getResourceAsStream("/eu/dnetlib/dhp/oa/graph/clean/synonyms.txt"));
}
}

View File

@ -27,14 +27,8 @@ import org.mockito.junit.jupiter.MockitoExtension;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils;
import eu.dnetlib.dhp.oa.graph.raw.common.VocabularyGroup;
import eu.dnetlib.dhp.schema.oaf.Datasource;
import eu.dnetlib.dhp.schema.oaf.Oaf;
import eu.dnetlib.dhp.schema.oaf.Organization;
import eu.dnetlib.dhp.schema.oaf.Project;
import eu.dnetlib.dhp.schema.oaf.Relation;
import eu.dnetlib.dhp.schema.oaf.Result;
import eu.dnetlib.dhp.schema.oaf.*;
@ExtendWith(MockitoExtension.class)
public class MigrateDbEntitiesApplicationTest {

View File

@ -12,6 +12,7 @@ import com.fasterxml.jackson.databind.SerializationFeature;
import eu.dnetlib.dhp.schema.oaf.Oaf;
import eu.dnetlib.dhp.sx.graph.parser.DatasetScholexplorerParser;
import eu.dnetlib.dhp.sx.graph.parser.PublicationScholexplorerParser;
import eu.dnetlib.scholexplorer.relation.RelationMapper;
public class ScholexplorerParserTest {
@ -37,4 +38,26 @@ public class ScholexplorerParserTest {
}
});
}
@Test
public void testPublicationParser() throws Exception {
String xml = IOUtils.toString(this.getClass().getResourceAsStream("pmf.xml"));
PublicationScholexplorerParser p = new PublicationScholexplorerParser();
List<Oaf> oaves = p.parseObject(xml, RelationMapper.load());
ObjectMapper m = new ObjectMapper();
m.enable(SerializationFeature.INDENT_OUTPUT);
oaves
.forEach(
oaf -> {
try {
System.out.println(m.writeValueAsString(oaf));
System.out.println("----------------------------");
} catch (JsonProcessingException e) {
}
});
}
}

View File

@ -31,8 +31,8 @@
},
{
"field": "trust",
"type": "string",
"value": "0.9"
"type": "double",
"value": 0.9
},
{
"field": "inferenceprovenance",

View File

@ -114,8 +114,8 @@
},
{
"field": "trust",
"type": "string",
"value": "0.9"
"type": "double",
"value": 0.9
},
{
"field": "inferenceprovenance",

View File

@ -0,0 +1,84 @@
<?xml version="1.0" encoding="UTF-8"?>
<record xmlns:dc="http://purl.org/dc/elements/1.1/"
xmlns:dr="http://www.driver-repository.eu/namespace/dr"
xmlns:dri="http://www.driver-repository.eu/namespace/dri"
xmlns:oaf="http://namespace.openaire.eu/oaf"
xmlns:prov="http://www.openarchives.org/OAI/2.0/provenance" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
<header xmlns="http://namespace.openaire.eu/">
<dri:objIdentifier>pensoft_____::00ea4a1cd53806a97d62ea6bf268f2a2</dri:objIdentifier>
<dri:recordIdentifier>10.3897/oneeco.2.e13718</dri:recordIdentifier>
<dri:dateOfCollection/>
<dri:mdFormat/>
<dri:mdFormatInterpretation/>
<dri:repositoryId/>
<dr:objectIdentifier/>
<dr:dateOfCollection>2020-03-23T00:20:51.392Z</dr:dateOfCollection>
<dr:dateOfTransformation>2020-03-23T00:26:59.078Z</dr:dateOfTransformation>
<oaf:datasourceprefix>pensoft_____</oaf:datasourceprefix>
</header>
<metadata xmlns="http://namespace.openaire.eu/">
<dc:title>Ecosystem Service capacity is higher in areas of multiple designation types</dc:title>
<dc:creator>Nikolaidou,Charitini</dc:creator>
<dc:creator nameIdentifier="0000-0001-6651-1178" nameIdentifierScheme="ORCID">Votsi,Nefta</dc:creator>
<dc:creator>Sgardelis,Steanos</dc:creator>
<dc:creator>Halley,John</dc:creator>
<dc:creator>Pantis,John</dc:creator>
<dc:creator>Tsiafouli,Maria</dc:creator>
<dc:date>2017</dc:date>
<dc:description>The implementation of the Ecosystem Service (ES) concept into practice might be a challenging task as it has to take into account previous “traditional” policies and approaches that have evaluated nature and biodiversity differently. Among them the Habitat (92/43/EC) and Bird Directives (79/409/EC), the Water Framework Directive (2000/60/EC), and the Noise Directive (2002/49/EC) have led to the evaluation/designation of areas in Europe with different criteria. In this study our goal was to understand how the ES capacity of an area is related to its designation and if areas with multiple designations have higher capacity in providing ES. We selected four catchments in Greece with a great variety of characteristics covering over 25% of the national territory. Inside the catchments we assessed the ES capacity (following the methodology of Burkhard et al. 2009) of areas designated as Natura 2000 sites, Quiet areas and Wetlands or Water bodies and found those areas that have multiple designations. Data were analyzed by GLM to reveal differences regarding the ES capacity among the different types of areas. We also investigated by PCA synergies and trade-offs among different kinds of ES and tested for correlations among landscape properties, such as elevation, aspect and slope and the ES potential. Our results show that areas with different types or multiple designations have a different capacity in providing ES. Areas of one designation type (Protected or Quiet Areas) had in general intermediate scores in most ES but scores were higher compared to areas with no designation, which displayed stronger capacity in provisioning services. Among Protected Areas and Quiet Areas the latter scored better in general. Areas that combined both designation types (Protected and Quiet Areas) showed the highest capacity in 13 out of 29 ES, that were mostly linked with natural and forest ecosystems. We found significant synergies among most regulating, supporting and cultural ES which in turn display trade-offs with provisioning services. The different ES are spatially related and display strong correlation with landscape properties, such as elevation and slope. We suggest that the designation status of an area can be used as an alternative tool for environmental policy, indicating the capacity for ES provision. Multiple designations of areas can be used as proxies for locating ES “hotspots”. This integration of “traditional” evaluation and designation and the “newer” ES concept forms a time- and cost-effective way to be adopted by stakeholders and policy-makers in order to start complying with new standards and demands for nature conservation and environmental management.</dc:description>
<dc:format>text/html</dc:format>
<dc:identifier>https://doi.org/10.3897/oneeco.2.e13718</dc:identifier>
<dc:identifier>https://oneecosystem.pensoft.net/article/13718/</dc:identifier>
<dc:language>eng</dc:language>
<dc:publisher>Pensoft Publishers</dc:publisher>
<dc:relation>info:eu-repo/semantics/altIdentifier/eissn/2367-8194</dc:relation>
<dc:relation>info:eu-repo/grantAgreement/EC/FP7/226852</dc:relation>
<dc:source>One Ecosystem 2: e13718</dc:source>
<dc:source>One Ecosystem 2: e13718</dc:source>
<dc:source>One Ecosystem 2: e13718</dc:source>
<dc:subject>Ecosystem Services hotspots</dc:subject>
<dc:subject>Natura 2000</dc:subject>
<dc:subject>Quiet Protected Areas</dc:subject>
<dc:subject>Biodiversity</dc:subject>
<dc:subject>Agriculture</dc:subject>
<dc:subject>Elevation</dc:subject>
<dc:subject>Slope</dc:subject>
<dc:subject>Ecosystem Service trade-offs and synergies</dc:subject>
<dc:subject> cultural services</dc:subject>
<dc:subject>provisioning services</dc:subject>
<dc:subject>regulating services</dc:subject>
<dc:subject>supporting services</dc:subject>
<dc:type>Research Article</dc:type>
<!--<dr:CobjCategory type="publication">0001</dr:CobjCategory>-->
<dr:CobjCategory>0020</dr:CobjCategory>
<oaf:dateAccepted>2017-01-01</oaf:dateAccepted>
<oaf:projectid>corda_______::226852</oaf:projectid>
<oaf:accessrights>OPEN</oaf:accessrights>
<oaf:hostedBy id="openaire____::issn226852" name="One Ecosystem"/>
<oaf:collectedFrom
id="openaire____::45e3c7b69bcee6cc5fa945c9e183deb9" name="Pensoft"/>
<oaf:identifier identifierType="doi">10.3897/oneeco.2.e13718</oaf:identifier>
<oaf:fulltext>https://oneecosystem.pensoft.net/article/13718/</oaf:fulltext>
<oaf:journal eissn="2367-8194" issn="">One Ecosystem</oaf:journal>
<oaf:refereed>0001</oaf:refereed>
</metadata>
<about xmlns:oai="http://www.openarchives.org/OAI/2.0/">
<provenance xmlns="http://www.openarchives.org/OAI/2.0/provenance" xsi:schemaLocation="http://www.openarchives.org/OAI/2.0/provenance http://www.openarchives.org/OAI/2.0/provenance.xsd">
<originDescription altered="true" harvestDate="2020-03-23T00:20:51.392Z">
<baseURL>http%3A%2F%2Fzookeys.pensoft.net%2Foai.php</baseURL>
<identifier>10.3897/oneeco.2.e13718</identifier>
<datestamp>2017-09-08</datestamp>
<metadataNamespace>http://www.openarchives.org/OAI/2.0/oai_dc/</metadataNamespace>
</originDescription>
</provenance>
<oaf:datainfo>
<oaf:inferred>false</oaf:inferred>
<oaf:deletedbyinference>false</oaf:deletedbyinference>
<oaf:trust>0.9</oaf:trust>
<oaf:inferenceprovenance/>
<oaf:provenanceaction classid="sysimport:crosswalk:repository"
classname="sysimport:crosswalk:repository"
schemeid="dnet:provenanceActions" schemename="dnet:provenanceActions"/>
</oaf:datainfo>
</about>
</record>

View File

@ -96,8 +96,8 @@
},
{
"field": "trust",
"type": "string",
"value": "0.9"
"type": "double",
"value": 0.9
},
{
"field": "inferenceprovenance",

View File

@ -41,8 +41,8 @@
},
{
"field": "trust",
"type": "string",
"value": "0.9"
"type": "double",
"value": 0.9
},
{
"field": "inferenceprovenance",

View File

@ -86,8 +86,8 @@
},
{
"field": "trust",
"type": "string",
"value": "0.9"
"type": "double",
"value": 0.9
},
{
"field": "inferenceprovenance",

View File

@ -1,51 +1,38 @@
<?xml version="1.0" encoding="UTF-8"?>
<oai:record xmlns:oai="http://www.openarchives.org/OAI/2.0/"
xmlns:oaf="http://namespace.openaire.eu/oaf"
xmlns:dri="http://www.driver-repository.eu/namespace/dri"
xmlns:dc="http://purl.org/dc/elements/1.1/">
<oai:header>
<dri:repositoryId>aaadf8b3-01a8-4cc2-9964-63cfb19df3b4_UmVwb3NpdG9yeVNlcnZpY2VSZXNvdXJjZXMvUmVwb3NpdG9yeVNlcnZpY2VSZXNvdXJjZVR5cGU=</dri:repositoryId>
<dri:recordIdentifier>oai:pangaea.de:doi:10.1594/PANGAEA.432865</dri:recordIdentifier>
<dri:datasourceprefix>r3d100010134</dri:datasourceprefix>
<dri:objIdentifier>r3d100010134::00002f60593fd1f758fb838fafb46795</dri:objIdentifier>
<dri:dateOfCollection>2020-02-18T03:05:02.534Z</dri:dateOfCollection>
<oaf:datasourceprefix/>
<identifier>oai:pangaea.de:doi:10.1594/PANGAEA.432865</identifier>
<setSpec>citable topicOceans</setSpec>
xmlns="http://namespace.openaire.eu/">
<oai:header xmlns="">
<dri:objIdentifier xmlns:dri="http://www.driver-repository.eu/namespace/dri">r3d100010464::0002882a9d38c4f4612e7666ad768ccd</dri:objIdentifier>
<dri:recordIdentifier xmlns:dri="http://www.driver-repository.eu/namespace/dri">https://research.jcu.edu.au/researchdata/published/detail/9079e05370d830eb8d416c77c0b761ce::url</dri:recordIdentifier>
<dri:dateOfCollection xmlns:dri="http://www.driver-repository.eu/namespace/dri">2020-11-02T16:14:07.831Z</dri:dateOfCollection>
<dri:repositoryId xmlns:dri="http://www.driver-repository.eu/namespace/dri">ands_UmVwb3NpdG9yeVNlcnZpY2VSZXNvdXJjZXMvUmVwb3NpdG9yeVNlcnZpY2VSZXNvdXJjZVR5cGU=</dri:repositoryId>
<dri:datasourceprefix xmlns:dri="http://www.driver-repository.eu/namespace/dri">r3d100010464</dri:datasourceprefix>
</oai:header>
<oai:metadata>
<resource xmlns="http://datacite.org/schema/kernel-3">
<identifier identifierType="doi">10.1594/pangaea.432865</identifier>
<titles xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
<title>Daily sea level from coastal tide gauge station Woods_Hole in 1978 (Research quality database)</title>
<metadata xmlns="">
<resource xmlns="http://datacite.org/schema/kernel-3"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://datacite.org/schema/kernel-3 http://schema.datacite.org/meta/kernel-3/metadata.xsd">
<identifier xmlns="" identifierType="url">https://research.jcu.edu.au/researchdata/published/detail/9079e05370d830eb8d416c77c0b761ce</identifier>
<titles xmlns="">
<title>Vertebrate monitoring in the Australian Wet Tropics rainforest at CU6A1 (145.30367623, -16.57767628, 600.0m above MSL) collected by Reptile Surveys</title>
</titles>
<publisher xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">PANGAEA - Data Publisher for Earth &amp; Environmental Science</publisher>
<publicationYear xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">2006</publicationYear>
<dates xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
<date dateType="Collected">1978-01-01T12:00:00/1978-12-31T12:00:00</date>
<publisher xmlns="">James Cook University</publisher>
<dates xmlns="">
<date dateType="Collected">2013-05-07</date>
</dates>
<creators xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
<creator>
<creatorName>WOCE Sea Level, WSL</creatorName>
</creator>
</creators>
<subjects xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
<subject subjectScheme="Parameter">DATE/TIME</subject>
<subject subjectScheme="Parameter">Sea level</subject>
<subject subjectScheme="Method">Tide gauge station</subject>
<subject subjectScheme="Campaign">SeaLevel</subject>
<subject subjectScheme="Project">World Ocean Circulation Experiment (WOCE)</subject>
</subjects>
<resourceType resourceTypeGeneral="Dataset"/>
<relatedIdentifiers>
<relatedIdentifier relatedIdentifierType="URL" relationType="isDocumentedBy"
inverseRelationType="documents">http://store.pangaea.de/Projects/WOCE/SeaLevel_rqds/Woods_Hole.txt</relatedIdentifier>
<creators xmlns=""/>
<resourceType xmlns="" resourceTypeGeneral="Dataset">Dataset</resourceType>
<relatedIdentifiers xmlns="">
<relatedIdentifier entityType="publication" inverseRelationType="related"
relatedIdentifierType="dnet"
relationType="IsRelatedTo">r3d100010464::57793c5aa995172db237d9da17353f8b</relatedIdentifier>
</relatedIdentifiers>
</resource>
</oai:metadata>
<oaf:about>
</metadata>
<oaf:about xmlns:oaf="http://namespace.dnet.eu/oaf" xmlns="">
<oaf:datainfo>
<oaf:collectedFrom completionStatus="complete" id="dli_________::r3d100010134" name="Pangaea"/>
<oaf:collectedFrom completionStatus="complete" id="dli_________::r3d100010464"
name="Australian National Data Service"/>
<oaf:completionStatus>complete</oaf:completionStatus>
<oaf:provisionMode>collected</oaf:provisionMode>
</oaf:datainfo>

View File

@ -0,0 +1,25 @@
<?xml version="1.0" encoding="UTF-8"?>
<oai:record xmlns:oai="http://www.openarchives.org/OAI/2.0/"
xmlns="http://namespace.openaire.eu/">
<oai:header xmlns="">
<dri:objIdentifier xmlns:dri="http://www.driver-repository.eu/namespace/dri">r3d100010464::57793c5aa995172db237d9da17353f8b</dri:objIdentifier>
<dri:recordIdentifier xmlns:dri="http://www.driver-repository.eu/namespace/dri">10.1111/j.1365-2486.2005.00995.x::doi</dri:recordIdentifier>
<dri:dateOfCollection xmlns:dri="http://www.driver-repository.eu/namespace/dri">2020-11-02T16:14:07.831Z</dri:dateOfCollection>
<dri:repositoryId xmlns:dri="http://www.driver-repository.eu/namespace/dri">ands_UmVwb3NpdG9yeVNlcnZpY2VSZXNvdXJjZXMvUmVwb3NpdG9yeVNlcnZpY2VSZXNvdXJjZVR5cGU=</dri:repositoryId>
<dri:datasourceprefix xmlns:dri="http://www.driver-repository.eu/namespace/dri">r3d100010464</dri:datasourceprefix>
</oai:header>
<metadata xmlns="">
<oaf:pid xmlns:oaf="http://namespace.dnet.eu/oaf" type="doi">10.1111/j.1365-2486.2005.00995.x</oaf:pid>
<dc:identifier xmlns:dc="http://purl.org/dc/elements/1.1/">10.1111/j.1365-2486.2005.00995.x</dc:identifier>
<dc:title xmlns:dc="http://purl.org/dc/elements/1.1/">Potential decoupling of trends in distribution area and population size of species with climate change.</dc:title>
<dc:type xmlns:dc="http://purl.org/dc/elements/1.1/">publication</dc:type>
</metadata>
<oaf:about xmlns:oaf="http://namespace.dnet.eu/oaf" xmlns="">
<oaf:datainfo>
<oaf:collectedFrom completionStatus="complete" id="dli_________::r3d100010464"
name="Australian National Data Service"/>
<oaf:completionStatus>complete</oaf:completionStatus>
<oaf:provisionMode>collected</oaf:provisionMode>
</oaf:datainfo>
</oaf:about>
</oai:record>

View File

@ -97,12 +97,17 @@ public class Scholix implements Serializable {
}
private List<ScholixEntityId> mergeScholixEntityId(final List<ScholixEntityId> a, final List<ScholixEntityId> b) {
final List<ScholixEntityId> m = new ArrayList<>(a);
final List<ScholixEntityId> m = a != null ? new ArrayList<>(a) : new ArrayList<>();
if (b != null)
b.forEach(s -> {
int tt = (int) m.stream().filter(t -> t.getName().equalsIgnoreCase(s.getName())).count();
if (tt == 0) {
m.add(s);
if (s != null) {
int tt = (int) m
.stream()
.filter(t -> t != null && t.getName() != null && t.getName().equalsIgnoreCase(s.getName()))
.count();
if (tt == 0) {
m.add(s);
}
}
});
return m;
@ -110,7 +115,7 @@ public class Scholix implements Serializable {
private List<ScholixIdentifier> mergeScholixIdnetifier(final List<ScholixIdentifier> a,
final List<ScholixIdentifier> b) {
final List<ScholixIdentifier> m = new ArrayList<>(a);
final List<ScholixIdentifier> m = a != null ? new ArrayList<>(a) : new ArrayList<>();
if (b != null)
b.forEach(s -> {
int tt = (int) m.stream().filter(t -> t.getIdentifier().equalsIgnoreCase(s.getIdentifier())).count();
@ -123,7 +128,7 @@ public class Scholix implements Serializable {
private List<ScholixCollectedFrom> mergeScholixCollectedFrom(final List<ScholixCollectedFrom> a,
final List<ScholixCollectedFrom> b) {
final List<ScholixCollectedFrom> m = new ArrayList<>(a);
final List<ScholixCollectedFrom> m = a != null ? new ArrayList<>(a) : new ArrayList<>();
if (b != null)
b.forEach(s -> {
int tt = (int) m
@ -139,14 +144,15 @@ public class Scholix implements Serializable {
private ScholixRelationship mergeRelationships(final ScholixRelationship a, final ScholixRelationship b) {
ScholixRelationship result = new ScholixRelationship();
result.setName(StringUtils.isEmpty(a.getName()) ? b.getName() : a.getName());
result.setInverse(StringUtils.isEmpty(a.getInverse()) ? b.getInverse() : a.getInverse());
result.setSchema(StringUtils.isEmpty(a.getSchema()) ? b.getSchema() : a.getSchema());
result.setName(a == null || StringUtils.isEmpty(a.getName()) ? b.getName() : a.getName());
result.setInverse(a == null || StringUtils.isEmpty(a.getInverse()) ? b.getInverse() : a.getInverse());
result.setSchema(a == null || StringUtils.isEmpty(a.getSchema()) ? b.getSchema() : a.getSchema());
return result;
}
private ScholixResource mergeResource(final ScholixResource a, final ScholixResource b) {
if (a == null)
return b;
final ScholixResource result = new ScholixResource();
result.setCollectedFrom(mergeScholixCollectedFrom(a.getCollectedFrom(), b.getCollectedFrom()));
result.setCreator(mergeScholixEntityId(a.getCreator(), b.getCreator()));

View File

@ -7,4 +7,8 @@
<name>oozie.action.sharelib.for.spark</name>
<value>spark2</value>
</property>
<property>
<name>oozie.launcher.mapreduce.user.classpath.first</name>
<value>true</value>
</property>
</configuration>

View File

@ -1,9 +1,17 @@
<workflow-app name="Index graph to ElasticSearch" xmlns="uri:oozie:workflow:0.5">
<workflow-app name="Materialize and Index graph to ElasticSearch" xmlns="uri:oozie:workflow:0.5">
<parameters>
<property>
<name>workingDirPath</name>
<description>the source path</description>
</property>
<property>
<name>index</name>
<description>the index name</description>
</property>
<property>
<name>esCluster</name>
<description>the Index cluster</description>
</property>
<property>
<name>sparkDriverMemory</name>
<description>memory for driver process</description>
@ -12,39 +20,43 @@
<name>sparkExecutorMemory</name>
<description>memory for individual executor</description>
</property>
<property>
<name>index</name>
<description>index name</description>
</property>
<property>
<name>indexHost</name>
<description>index host name</description>
</property>
</parameters>
<start to="indexSummary"/>
<start to="DropAndCreateIndex"/>
<kill name="Kill">
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<action name="DropAndCreateIndex">
<java>
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<main-class>eu.dnetlib.dhp.provision.DropAndCreateESIndex</main-class>
<arg>-i</arg><arg>${index}</arg>
<arg>-c</arg><arg>${esCluster}</arg>
</java>
<ok to="indexSummary"/>
<error to="Kill"/>
</action>
<action name="indexSummary">
<spark xmlns="uri:oozie:spark-action:0.2">
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<master>yarn-cluster</master>
<mode>cluster</mode>
<name>index Summary</name>
<name>index summary</name>
<class>eu.dnetlib.dhp.provision.SparkIndexCollectionOnES</class>
<jar>dhp-graph-provision-scholexplorer-${projectVersion}.jar</jar>
<spark-opts>--executor-memory ${sparkExecutorMemory} --driver-memory=${sparkDriverMemory} ${sparkExtraOPT} --conf spark.dynamicAllocation.maxExecutors="32" </spark-opts>
<spark-opts>--executor-memory ${sparkExecutorMemory} --driver-memory=${sparkDriverMemory} ${sparkExtraOPT} --conf spark.dynamicAllocation.maxExecutors="8" </spark-opts>
<arg>-mt</arg> <arg>yarn-cluster</arg>
<arg>--sourcePath</arg><arg>${workingDirPath}/summary</arg>
<arg>--sourcePath</arg><arg>${workingDirPath}/summary_json</arg>
<arg>--index</arg><arg>${index}_object</arg>
<arg>--esHost</arg><arg>${indexHost}</arg>
<arg>--idPath</arg><arg>id</arg>
<arg>--type</arg><arg>summary</arg>
<arg>--cluster</arg><arg>${esCluster}</arg>
</spark>
<ok to="indexScholix"/>
<error to="Kill"/>
@ -63,9 +75,8 @@
<arg>-mt</arg> <arg>yarn-cluster</arg>
<arg>--sourcePath</arg><arg>${workingDirPath}/scholix_json</arg>
<arg>--index</arg><arg>${index}_scholix</arg>
<arg>--esHost</arg><arg>${indexHost}</arg>
<arg>--idPath</arg><arg>identifier</arg>
<arg>--type</arg><arg>scholix</arg>
<arg>--cluster</arg><arg>${esCluster}</arg>
</spark>
<ok to="End"/>
<error to="Kill"/>

View File

@ -112,59 +112,5 @@
<error to="Kill"/>
</action>
<action name="DropAndCreateIndex">
<java>
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<main-class>eu.dnetlib.dhp.provision.DropAndCreateESIndex</main-class>
<arg>-i</arg><arg>${index}</arg>
<arg>-c</arg><arg>${esCluster}</arg>
</java>
<ok to="indexSummary"/>
<error to="Kill"/>
</action>
<action name="indexSummary">
<spark xmlns="uri:oozie:spark-action:0.2">
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<master>yarn-cluster</master>
<mode>cluster</mode>
<name>index summary</name>
<class>eu.dnetlib.dhp.provision.SparkIndexCollectionOnES</class>
<jar>dhp-graph-provision-scholexplorer-${projectVersion}.jar</jar>
<spark-opts>--executor-memory ${sparkExecutorMemory} --driver-memory=${sparkDriverMemory} ${sparkExtraOPT} --conf spark.dynamicAllocation.maxExecutors="8" </spark-opts>
<arg>-mt</arg> <arg>yarn-cluster</arg>
<arg>--sourcePath</arg><arg>${workingDirPath}/summary_json</arg>
<arg>--index</arg><arg>${index}_object</arg>
<arg>--idPath</arg><arg>id</arg>
<arg>--cluster</arg><arg>${esCluster}</arg>
</spark>
<ok to="indexScholix"/>
<error to="Kill"/>
</action>
<action name="indexScholix">
<spark xmlns="uri:oozie:spark-action:0.2">
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<master>yarn-cluster</master>
<mode>cluster</mode>
<name>index scholix</name>
<class>eu.dnetlib.dhp.provision.SparkIndexCollectionOnES</class>
<jar>dhp-graph-provision-scholexplorer-${projectVersion}.jar</jar>
<spark-opts>--executor-memory ${sparkExecutorMemory} --driver-memory=${sparkDriverMemory} ${sparkExtraOPT} --conf spark.dynamicAllocation.maxExecutors="8" </spark-opts>
<arg>-mt</arg> <arg>yarn-cluster</arg>
<arg>--sourcePath</arg><arg>${workingDirPath}/scholix_json</arg>
<arg>--index</arg><arg>${index}_scholix</arg>
<arg>--idPath</arg><arg>identifier</arg>
<arg>--cluster</arg><arg>${esCluster}</arg>
</spark>
<ok to="End"/>
<error to="Kill"/>
</action>
<end name="End"/>
</workflow-app>

View File

@ -0,0 +1,21 @@
Joins the graph nodes by resolving the links of distance = 1 to create an adjacency list of linked objects. The
operation considers all the entity types (publication, dataset, software, ORP, project, datasource, organization, and
all the possible relationships (similarity links produced by the Dedup process are excluded).
The operation is implemented by sequentially joining one entity type at time (E) with the relationships (R), and
again by E, finally grouped by E.id;
The workflow is organized in different parts aimed to to reduce the complexity of the operation
1) PrepareRelationsJob: only consider relationships that are not virtually deleted ($.dataInfo.deletedbyinference ==
false), each entity can be linked at most to 100 other objects
2) CreateRelatedEntitiesJob: (phase 1): prepare tuples [relation - target entity] (R - T): for each entity type
E_i map E_i as RelatedEntity T_i to simplify the model and extracting only the necessary information join (R.target =
T_i.id) save the tuples (R_i, T_i) (phase 2): create the union of all the entity types E, hash by id read the tuples
(R, T), hash by R.source join E.id = (R, T).source, where E becomes the Source Entity S save the tuples (S, R, T)
3) AdjacencyListBuilderJob: given the tuple (S - R - T) we need to group by S.id -> List [ R - T ], mapping the
result as JoinedEntity
4) XmlConverterJob: convert the JoinedEntities as XML records

View File

@ -1,109 +0,0 @@
package eu.dnetlib.dhp.oa.provision;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.api.java.function.MapGroupsFunction;
import org.apache.spark.sql.*;
import org.apache.spark.sql.expressions.Aggregator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.common.HdfsSupport;
import eu.dnetlib.dhp.oa.provision.model.*;
import scala.Tuple2;
import scala.collection.JavaConverters;
import scala.collection.Seq;
/**
* Joins the graph nodes by resolving the links of distance = 1 to create an adjacency list of linked objects. The
* operation considers all the entity types (publication, dataset, software, ORP, project, datasource, organization, and
* all the possible relationships (similarity links produced by the Dedup process are excluded).
* <p>
* The operation is implemented by sequentially joining one entity type at time (E) with the relationships (R), and
* again by E, finally grouped by E.id;
* <p>
* The workflow is organized in different parts aimed to to reduce the complexity of the operation 1)
* PrepareRelationsJob: only consider relationships that are not virtually deleted ($.dataInfo.deletedbyinference ==
* false), each entity can be linked at most to 100 other objects
* <p>
* 2) JoinRelationEntityByTargetJob: (phase 1): prepare tuples [relation - target entity] (R - T): for each entity type
* E_i map E_i as RelatedEntity T_i to simplify the model and extracting only the necessary information join (R.target =
* T_i.id) save the tuples (R_i, T_i) (phase 2): create the union of all the entity types E, hash by id read the tuples
* (R, T), hash by R.source join E.id = (R, T).source, where E becomes the Source Entity S save the tuples (S, R, T)
* <p>
* 3) AdjacencyListBuilderJob: given the tuple (S - R - T) we need to group by S.id -> List [ R - T ], mapping the
* result as JoinedEntity
* <p>
* 4) XmlConverterJob: convert the JoinedEntities as XML records
*/
public class AdjacencyListBuilderJob {
private static final Logger log = LoggerFactory.getLogger(AdjacencyListBuilderJob.class);
public static final int MAX_LINKS = 100;
public static void main(String[] args) throws Exception {
final ArgumentApplicationParser parser = new ArgumentApplicationParser(
IOUtils
.toString(
AdjacencyListBuilderJob.class
.getResourceAsStream(
"/eu/dnetlib/dhp/oa/provision/input_params_build_adjacency_lists.json")));
parser.parseArgument(args);
Boolean isSparkSessionManaged = Optional
.ofNullable(parser.get("isSparkSessionManaged"))
.map(Boolean::valueOf)
.orElse(Boolean.TRUE);
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
String inputPath = parser.get("inputPath");
log.info("inputPath: {}", inputPath);
String outputPath = parser.get("outputPath");
log.info("outputPath: {}", outputPath);
SparkConf conf = new SparkConf();
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
conf.registerKryoClasses(ProvisionModelSupport.getModelClasses());
runWithSparkSession(
conf,
isSparkSessionManaged,
spark -> {
removeOutputDir(spark, outputPath);
createAdjacencyListsKryo(spark, inputPath, outputPath);
});
}
private static void createAdjacencyListsKryo(
SparkSession spark, String inputPath, String outputPath) {
log.info("Reading joined entities from: {}", inputPath);
final List<String> paths = HdfsSupport
.listFiles(inputPath, spark.sparkContext().hadoopConfiguration());
log.info("Found paths: {}", String.join(",", paths));
}
private static Seq<String> toSeq(List<String> list) {
return JavaConverters.asScalaIteratorConverter(list.iterator()).asScala().toSeq();
}
private static void removeOutputDir(SparkSession spark, String path) {
HdfsSupport.remove(path, spark.sparkContext().hadoopConfiguration());
}
}

View File

@ -31,26 +31,9 @@ import eu.dnetlib.dhp.schema.oaf.*;
import scala.Tuple2;
/**
* Joins the graph nodes by resolving the links of distance = 1 to create an adjacency list of linked objects. The
* operation considers all the entity types (publication, dataset, software, ORP, project, datasource, organization, and
* all the possible relationships (similarity links produced by the Dedup process are excluded).
* <p>
* The operation is implemented by sequentially joining one entity type at time (E) with the relationships (R), and
* again by E, finally grouped by E.id;
* <p>
* The workflow is organized in different parts aimed to to reduce the complexity of the operation 1)
* PrepareRelationsJob: only consider relationships that are not virtually deleted ($.dataInfo.deletedbyinference ==
* false), each entity can be linked at most to 100 other objects
* <p>
* 2) JoinRelationEntityByTargetJob: (phase 1): prepare tuples [relation - target entity] (R - T): for each entity type
* E_i map E_i as RelatedEntity T_i to simplify the model and extracting only the necessary information join (R.target =
* T_i.id) save the tuples (R_i, T_i) (phase 2): create the union of all the entity types E, hash by id read the tuples
* (R, T), hash by R.source join E.id = (R, T).source, where E becomes the Source Entity S save the tuples (S, R, T)
* <p>
* 3) AdjacencyListBuilderJob: given the tuple (S - R - T) we need to group by S.id -> List [ R - T ], mapping the
* result as JoinedEntity
* <p>
* 4) XmlConverterJob: convert the JoinedEntities as XML records
* CreateRelatedEntitiesJob: (phase 1): prepare tuples [relation - target entity] (R - T): for each entity type
* E_i map E_i as RelatedEntity T_i to simplify the model and extracting only the necessary information join
* (R.target = T_i.id) save the tuples (R_i, T_i)
*/
public class CreateRelatedEntitiesJob_phase1 {
@ -109,7 +92,6 @@ public class CreateRelatedEntitiesJob_phase1 {
String outputPath) {
Dataset<Tuple2<String, Relation>> relsByTarget = readPathRelation(spark, inputRelationsPath)
.filter("dataInfo.deletedbyinference == false")
.map(
(MapFunction<Relation, Tuple2<String, Relation>>) r -> new Tuple2<>(r.getTarget(),
r),

View File

@ -34,26 +34,8 @@ import scala.collection.JavaConverters;
import scala.collection.Seq;
/**
* Joins the graph nodes by resolving the links of distance = 1 to create an adjacency list of linked objects. The
* operation considers all the entity types (publication, dataset, software, ORP, project, datasource, organization, and
* all the possible relationships (similarity links produced by the Dedup process are excluded).
* <p>
* The operation is implemented by sequentially joining one entity type at time (E) with the relationships (R), and
* again by E, finally grouped by E.id;
* <p>
* The workflow is organized in different parts aimed to to reduce the complexity of the operation 1)
* PrepareRelationsJob: only consider relationships that are not virtually deleted ($.dataInfo.deletedbyinference ==
* false), each entity can be linked at most to 100 other objects
* <p>
* 2) JoinRelationEntityByTargetJob: (phase 1): prepare tuples [relation - target entity] (R - T): for each entity type
* E_i map E_i as RelatedEntity T_i to simplify the model and extracting only the necessary information join (R.target =
* T_i.id) save the tuples (R_i, T_i) (phase 2): create the union of all the entity types E, hash by id read the tuples
* CreateRelatedEntitiesJob (phase 2): create the union of all the entity types E, hash by id read the tuples
* (R, T), hash by R.source join E.id = (R, T).source, where E becomes the Source Entity S save the tuples (S, R, T)
* <p>
* 3) AdjacencyListBuilderJob: given the tuple (S - R - T) we need to group by S.id -> List [ R - T ], mapping the
* result as JoinedEntity
* <p>
* 4) XmlConverterJob: convert the JoinedEntities as XML records
*/
public class CreateRelatedEntitiesJob_phase2 {
@ -123,7 +105,7 @@ public class CreateRelatedEntitiesJob_phase2 {
TypedColumn<JoinedEntity, JoinedEntity> aggregator = new AdjacencyListAggregator().toColumn();
entities
.joinWith(relatedEntities, entities.col("_1").equalTo(relatedEntities.col("_1")), "left_outer")
.joinWith(relatedEntities, entities.col("_1").equalTo(relatedEntities.col("_1")), "left")
.map((MapFunction<Tuple2<Tuple2<String, E>, Tuple2<String, RelatedEntityWrapper>>, JoinedEntity>) value -> {
JoinedEntity je = new JoinedEntity(value._1()._2());
Optional
@ -132,7 +114,6 @@ public class CreateRelatedEntitiesJob_phase2 {
.ifPresent(r -> je.getLinks().add(r));
return je;
}, Encoders.kryo(JoinedEntity.class))
.filter(filterEmptyEntityFn())
.groupByKey(
(MapFunction<JoinedEntity, String>) value -> value.getEntity().getId(),
Encoders.STRING())
@ -140,7 +121,6 @@ public class CreateRelatedEntitiesJob_phase2 {
.map(
(MapFunction<Tuple2<String, JoinedEntity>, JoinedEntity>) value -> value._2(),
Encoders.kryo(JoinedEntity.class))
.filter(filterEmptyEntityFn())
.write()
.mode(SaveMode.Overwrite)
.parquet(outputPath);

View File

@ -3,8 +3,10 @@ package eu.dnetlib.dhp.oa.provision;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import java.util.*;
import java.util.function.Supplier;
import java.util.HashSet;
import java.util.Optional;
import java.util.PriorityQueue;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.commons.io.IOUtils;
@ -15,8 +17,10 @@ import org.apache.spark.api.java.function.FilterFunction;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.rdd.RDD;
import org.apache.spark.sql.*;
import org.apache.spark.sql.Encoder;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.expressions.Aggregator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -24,7 +28,6 @@ import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Splitter;
import com.google.common.collect.Iterables;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
@ -36,26 +39,8 @@ import eu.dnetlib.dhp.schema.oaf.Relation;
import scala.Tuple2;
/**
* Joins the graph nodes by resolving the links of distance = 1 to create an adjacency list of linked objects. The
* operation considers all the entity types (publication, dataset, software, ORP, project, datasource, organization, and
* all the possible relationships (similarity links produced by the Dedup process are excluded).
* <p>
* The operation is implemented by sequentially joining one entity type at time (E) with the relationships (R), and
* again by E, finally grouped by E.id;
* <p>
* The workflow is organized in different parts aimed to to reduce the complexity of the operation 1)
* PrepareRelationsJob: only consider relationships that are not virtually deleted ($.dataInfo.deletedbyinference ==
* false), each entity can be linked at most to 100 other objects
* <p>
* 2) JoinRelationEntityByTargetJob: (phase 1): prepare tuples [relation - target entity] (R - T): for each entity type
* E_i map E_i as RelatedEntity T_i to simplify the model and extracting only the necessary information join (R.target =
* T_i.id) save the tuples (R_i, T_i) (phase 2): create the union of all the entity types E, hash by id read the tuples
* (R, T), hash by R.source join E.id = (R, T).source, where E becomes the Source Entity S save the tuples (S, R, T)
* <p>
* 3) AdjacencyListBuilderJob: given the tuple (S - R - T) we need to group by S.id -> List [ R - T ], mapping the
* result as JoinedEntity
* <p>
* 4) XmlConverterJob: convert the JoinedEntities as XML records
* PrepareRelationsJob prunes the relationships: only consider relationships that are not virtually deleted
* ($.dataInfo.deletedbyinference == false), each entity can be linked at most to 100 other objects
*/
public class PrepareRelationsJob {

View File

@ -2,12 +2,11 @@
package eu.dnetlib.dhp.oa.provision;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import static eu.dnetlib.dhp.utils.DHPUtils.toSeq;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.io.Text;
@ -28,39 +27,19 @@ import com.google.common.collect.Maps;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.common.HdfsSupport;
import eu.dnetlib.dhp.oa.provision.model.*;
import eu.dnetlib.dhp.oa.provision.model.JoinedEntity;
import eu.dnetlib.dhp.oa.provision.model.ProvisionModelSupport;
import eu.dnetlib.dhp.oa.provision.utils.ContextMapper;
import eu.dnetlib.dhp.oa.provision.utils.XmlRecordFactory;
import eu.dnetlib.dhp.schema.oaf.*;
import scala.Tuple2;
import scala.collection.JavaConverters;
import scala.collection.Seq;
/**
* Joins the graph nodes by resolving the links of distance = 1 to create an adjacency list of linked objects. The
* operation considers all the entity types (publication, dataset, software, ORP, project, datasource, organization, and
* all the possible relationships (similarity links produced by the Dedup process are excluded).
* <p>
* The workflow is organized in different parts aimed to to reduce the complexity of the operation 1)
* PrepareRelationsJob: only consider relationships that are not virtually deleted ($.dataInfo.deletedbyinference ==
* false), each entity can be linked at most to 100 other objects
* <p>
* 2) JoinRelationEntityByTargetJob: (phase 1): prepare tuples [relation - target entity] (R - T): for each entity type
* E_i map E_i as RelatedEntity T_i to simplify the model and extracting only the necessary information join (R.target =
* T_i.id) save the tuples (R_i, T_i) (phase 2): create the union of all the entity types E, hash by id read the tuples
* (R, T), hash by R.source join E.id = (R, T).source, where E becomes the Source Entity S save the tuples (S, R, T)
* <p>
* 3) AdjacencyListBuilderJob: given the tuple (S - R - T) we need to group by S.id -> List [ R - T ], mapping the
* result as JoinedEntity
* <p>
* 4) XmlConverterJob: convert the JoinedEntities as XML records
* XmlConverterJob converts the JoinedEntities as XML records
*/
public class XmlConverterJob {
private static final Logger log = LoggerFactory.getLogger(XmlConverterJob.class);
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
public static final String schemaLocation = "https://www.openaire.eu/schema/1.0/oaf-1.0.xsd";
public static void main(String[] args) throws Exception {
@ -145,10 +124,6 @@ public class XmlConverterJob {
HdfsSupport.remove(path, spark.sparkContext().hadoopConfiguration());
}
private static Seq<String> toSeq(List<String> list) {
return JavaConverters.asScalaIteratorConverter(list.iterator()).asScala().toSeq();
}
private static Map<String, LongAccumulator> prepareAccumulators(SparkContext sc) {
Map<String, LongAccumulator> accumulators = Maps.newHashMap();
accumulators

View File

@ -0,0 +1,32 @@
-------------------------------------------
--- Extra tables, mostly used by indicators
create table ${stats_db_name}.result_projectcount as
select r.id, count(distinct p.id) as count
from ${stats_db_name}.result r
left outer join ${stats_db_name}.result_projects rp on rp.id=r.id
left outer join ${stats_db_name}.project p on p.id=rp.project
group by r.id;
create table ${stats_db_name}.result_fundercount as
select r.id, count(distinct p.funder) as count
from ${stats_db_name}.result r
left outer join ${stats_db_name}.result_projects rp on rp.id=r.id
left outer join ${stats_db_name}.project p on p.id=rp.project
group by r.id;
create table ${stats_db_name}.project_resultcount as
with rcount as (
select p.id as pid, count(distinct r.id) as `count`, r.type as type
from ${stats_db_name}.project p
left outer join ${stats_db_name}.result_projects rp on rp.project=p.id
left outer join ${stats_db_name}.result r on r.id=rp.id
group by r.type, p.id )
select rcount.pid, sum(case when rcount.type='publication' then rcount.count else 0 end) as publications,
sum(case when rcount.type='dataset' then rcount.count else 0 end) as datasets,
sum(case when rcount.type='software' then rcount.count else 0 end) as software,
sum(case when rcount.type='other' then rcount.count else 0 end) as other
from rcount
group by rcount.pid;
create view ${stats_db_name}.rndexpenditure as select * from stats_ext.rndexpediture

View File

@ -5,8 +5,12 @@
------------------------------------------------------
-- Dropping old views
DROP VIEW IF EXISTS ${stats_db_shadow_name}.category;
DROP VIEW IF EXISTS ${stats_db_shadow_name}.concept;
DROP VIEW IF EXISTS ${stats_db_shadow_name}.context;
DROP VIEW IF EXISTS ${stats_db_shadow_name}.country;
DROP VIEW IF EXISTS ${stats_db_shadow_name}.countrygdp;
DROP VIEW IF EXISTS ${stats_db_shadow_name}.creation_date;
DROP VIEW IF EXISTS ${stats_db_shadow_name}.dataset;
DROP VIEW IF EXISTS ${stats_db_shadow_name}.dataset_citations;
DROP VIEW IF EXISTS ${stats_db_shadow_name}.dataset_classifications;
@ -16,6 +20,7 @@ DROP VIEW IF EXISTS ${stats_db_shadow_name}.dataset_languages;
DROP VIEW IF EXISTS ${stats_db_shadow_name}.dataset_licenses;
DROP VIEW IF EXISTS ${stats_db_shadow_name}.dataset_oids;
DROP VIEW IF EXISTS ${stats_db_shadow_name}.dataset_pids;
DROP VIEW IF EXISTS ${stats_db_shadow_name}.dataset_refereed;
DROP VIEW IF EXISTS ${stats_db_shadow_name}.dataset_sources;
DROP VIEW IF EXISTS ${stats_db_shadow_name}.dataset_topics;
DROP VIEW IF EXISTS ${stats_db_shadow_name}.datasource;
@ -23,11 +28,15 @@ DROP VIEW IF EXISTS ${stats_db_shadow_name}.datasource_languages;
DROP VIEW IF EXISTS ${stats_db_shadow_name}.datasource_oids;
DROP VIEW IF EXISTS ${stats_db_shadow_name}.datasource_organizations;
DROP VIEW IF EXISTS ${stats_db_shadow_name}.datasource_results;
DROP VIEW IF EXISTS ${stats_db_shadow_name}.datasource_sources;
DROP VIEW IF EXISTS ${stats_db_shadow_name}.funder;
DROP VIEW IF EXISTS ${stats_db_shadow_name}.fundref;
DROP VIEW IF EXISTS ${stats_db_shadow_name}.numbers_country;
DROP VIEW IF EXISTS ${stats_db_shadow_name}.organization;
DROP VIEW IF EXISTS ${stats_db_shadow_name}.organization_datasources;
DROP VIEW IF EXISTS ${stats_db_shadow_name}.organization_pids;
DROP VIEW IF EXISTS ${stats_db_shadow_name}.organization_projects;
DROP VIEW IF EXISTS ${stats_db_shadow_name}.organization_sources;
DROP VIEW IF EXISTS ${stats_db_shadow_name}.otherresearchproduct;
DROP VIEW IF EXISTS ${stats_db_shadow_name}.otherresearchproduct_citations;
DROP VIEW IF EXISTS ${stats_db_shadow_name}.otherresearchproduct_classifications;
@ -37,12 +46,15 @@ DROP VIEW IF EXISTS ${stats_db_shadow_name}.otherresearchproduct_languages;
DROP VIEW IF EXISTS ${stats_db_shadow_name}.otherresearchproduct_licenses;
DROP VIEW IF EXISTS ${stats_db_shadow_name}.otherresearchproduct_oids;
DROP VIEW IF EXISTS ${stats_db_shadow_name}.otherresearchproduct_pids;
DROP VIEW IF EXISTS ${stats_db_shadow_name}.otherresearchproduct_refereed;
DROP VIEW IF EXISTS ${stats_db_shadow_name}.otherresearchproduct_sources;
DROP VIEW IF EXISTS ${stats_db_shadow_name}.otherresearchproduct_topics;
DROP VIEW IF EXISTS ${stats_db_shadow_name}.project;
DROP VIEW IF EXISTS ${stats_db_shadow_name}.project_oids;
DROP VIEW IF EXISTS ${stats_db_shadow_name}.project_organizations;
DROP VIEW IF EXISTS ${stats_db_shadow_name}.project_results;
DROP VIEW IF EXISTS ${stats_db_shadow_name}.project_resultcount;
DROP VIEW IF EXISTS ${stats_db_shadow_name}.project_results_publication;
DROP VIEW IF EXISTS ${stats_db_shadow_name}.publication;
DROP VIEW IF EXISTS ${stats_db_shadow_name}.publication_citations;
DROP VIEW IF EXISTS ${stats_db_shadow_name}.publication_classifications;
@ -52,19 +64,28 @@ DROP VIEW IF EXISTS ${stats_db_shadow_name}.publication_languages;
DROP VIEW IF EXISTS ${stats_db_shadow_name}.publication_licenses;
DROP VIEW IF EXISTS ${stats_db_shadow_name}.publication_oids;
DROP VIEW IF EXISTS ${stats_db_shadow_name}.publication_pids;
DROP VIEW IF EXISTS ${stats_db_shadow_name}.publication_refereed;
DROP VIEW IF EXISTS ${stats_db_shadow_name}.publication_sources;
DROP VIEW IF EXISTS ${stats_db_shadow_name}.publication_topics;
DROP VIEW IF EXISTS ${stats_db_shadow_name}.result;
DROP VIEW IF EXISTS ${stats_db_shadow_name}.result_affiliated_country;
DROP VIEW IF EXISTS ${stats_db_shadow_name}.result_citations;
DROP VIEW IF EXISTS ${stats_db_shadow_name}.result_classifications;
DROP VIEW IF EXISTS ${stats_db_shadow_name}.result_concepts;
DROP VIEW IF EXISTS ${stats_db_shadow_name}.result_datasources;
DROP VIEW IF EXISTS ${stats_db_shadow_name}.result_deposited_country;
DROP VIEW IF EXISTS ${stats_db_shadow_name}.result_fundercount;
DROP VIEW IF EXISTS ${stats_db_shadow_name}.result_gold;
DROP VIEW IF EXISTS ${stats_db_shadow_name}.result_greenoa;
DROP VIEW IF EXISTS ${stats_db_shadow_name}.result_languages;
DROP VIEW IF EXISTS ${stats_db_shadow_name}.result_licenses;
DROP VIEW IF EXISTS ${stats_db_shadow_name}.result_oids;
DROP VIEW IF EXISTS ${stats_db_shadow_name}.result_organization;
DROP VIEW IF EXISTS ${stats_db_shadow_name}.result_peerreviewed;
DROP VIEW IF EXISTS ${stats_db_shadow_name}.result_pids;
DROP VIEW IF EXISTS ${stats_db_shadow_name}.result_projectcount;
DROP VIEW IF EXISTS ${stats_db_shadow_name}.result_projects;
DROP VIEW IF EXISTS ${stats_db_shadow_name}.result_refereed;
DROP VIEW IF EXISTS ${stats_db_shadow_name}.result_sources;
DROP VIEW IF EXISTS ${stats_db_shadow_name}.result_topics;
DROP VIEW IF EXISTS ${stats_db_shadow_name}.rndexpediture;
@ -78,6 +99,7 @@ DROP VIEW IF EXISTS ${stats_db_shadow_name}.software_languages;
DROP VIEW IF EXISTS ${stats_db_shadow_name}.software_licenses;
DROP VIEW IF EXISTS ${stats_db_shadow_name}.software_oids;
DROP VIEW IF EXISTS ${stats_db_shadow_name}.software_pids;
DROP VIEW IF EXISTS ${stats_db_shadow_name}.software_refereed;
DROP VIEW IF EXISTS ${stats_db_shadow_name}.software_sources;
DROP VIEW IF EXISTS ${stats_db_shadow_name}.software_topics;
@ -86,8 +108,12 @@ DROP VIEW IF EXISTS ${stats_db_shadow_name}.software_topics;
CREATE database IF NOT EXISTS ${stats_db_shadow_name};
-- Creating new views
CREATE VIEW IF NOT EXISTS ${stats_db_shadow_name}.category AS SELECT * FROM ${stats_db_name}.category;
CREATE VIEW IF NOT EXISTS ${stats_db_shadow_name}.concept AS SELECT * FROM ${stats_db_name}.concept;
CREATE VIEW IF NOT EXISTS ${stats_db_shadow_name}.context AS SELECT * FROM ${stats_db_name}.context;
CREATE VIEW IF NOT EXISTS ${stats_db_shadow_name}.country AS SELECT * FROM ${stats_db_name}.country;
CREATE VIEW IF NOT EXISTS ${stats_db_shadow_name}.countrygdp AS SELECT * FROM ${stats_db_name}.countrygdp;
CREATE VIEW IF NOT EXISTS ${stats_db_shadow_name}.creation_date AS SELECT * FROM ${stats_db_name}.creation_date;
CREATE VIEW IF NOT EXISTS ${stats_db_shadow_name}.dataset AS SELECT * FROM ${stats_db_name}.dataset;
CREATE VIEW IF NOT EXISTS ${stats_db_shadow_name}.dataset_citations AS SELECT * FROM ${stats_db_name}.dataset_citations;
CREATE VIEW IF NOT EXISTS ${stats_db_shadow_name}.dataset_classifications AS SELECT * FROM ${stats_db_name}.dataset_classifications;
@ -97,6 +123,7 @@ CREATE VIEW IF NOT EXISTS ${stats_db_shadow_name}.dataset_languages AS SELECT *
CREATE VIEW IF NOT EXISTS ${stats_db_shadow_name}.dataset_licenses AS SELECT * FROM ${stats_db_name}.dataset_licenses;
CREATE VIEW IF NOT EXISTS ${stats_db_shadow_name}.dataset_oids AS SELECT * FROM ${stats_db_name}.dataset_oids;
CREATE VIEW IF NOT EXISTS ${stats_db_shadow_name}.dataset_pids AS SELECT * FROM ${stats_db_name}.dataset_pids;
CREATE VIEW IF NOT EXISTS ${stats_db_shadow_name}.dataset_refereed AS SELECT * FROM ${stats_db_name}.dataset_refereed;
CREATE VIEW IF NOT EXISTS ${stats_db_shadow_name}.dataset_sources AS SELECT * FROM ${stats_db_name}.dataset_sources;
CREATE VIEW IF NOT EXISTS ${stats_db_shadow_name}.dataset_topics AS SELECT * FROM ${stats_db_name}.dataset_topics;
CREATE VIEW IF NOT EXISTS ${stats_db_shadow_name}.datasource AS SELECT * FROM ${stats_db_name}.datasource;
@ -104,11 +131,15 @@ CREATE VIEW IF NOT EXISTS ${stats_db_shadow_name}.datasource_languages AS SELECT
CREATE VIEW IF NOT EXISTS ${stats_db_shadow_name}.datasource_oids AS SELECT * FROM ${stats_db_name}.datasource_oids;
CREATE VIEW IF NOT EXISTS ${stats_db_shadow_name}.datasource_organizations AS SELECT * FROM ${stats_db_name}.datasource_organizations;
CREATE VIEW IF NOT EXISTS ${stats_db_shadow_name}.datasource_results AS SELECT * FROM ${stats_db_name}.datasource_results;
CREATE VIEW IF NOT EXISTS ${stats_db_shadow_name}.datasource_sources AS SELECT * FROM ${stats_db_name}.datasource_sources;
CREATE VIEW IF NOT EXISTS ${stats_db_shadow_name}.funder AS SELECT * FROM ${stats_db_name}.funder;
CREATE VIEW IF NOT EXISTS ${stats_db_shadow_name}.fundref AS SELECT * FROM ${stats_db_name}.fundref;
CREATE VIEW IF NOT EXISTS ${stats_db_shadow_name}.numbers_country AS SELECT * FROM ${stats_db_name}.numbers_country;
CREATE VIEW IF NOT EXISTS ${stats_db_shadow_name}.organization AS SELECT * FROM ${stats_db_name}.organization;
CREATE VIEW IF NOT EXISTS ${stats_db_shadow_name}.organization_datasources AS SELECT * FROM ${stats_db_name}.organization_datasources;
CREATE VIEW IF NOT EXISTS ${stats_db_shadow_name}.organization_pids AS SELECT * FROM ${stats_db_name}.organization_pids;
CREATE VIEW IF NOT EXISTS ${stats_db_shadow_name}.organization_projects AS SELECT * FROM ${stats_db_name}.organization_projects;
CREATE VIEW IF NOT EXISTS ${stats_db_shadow_name}.organization_sources AS SELECT * FROM ${stats_db_name}.organization_sources;
CREATE VIEW IF NOT EXISTS ${stats_db_shadow_name}.otherresearchproduct AS SELECT * FROM ${stats_db_name}.otherresearchproduct;
CREATE VIEW IF NOT EXISTS ${stats_db_shadow_name}.otherresearchproduct_citations AS SELECT * FROM ${stats_db_name}.otherresearchproduct_citations;
CREATE VIEW IF NOT EXISTS ${stats_db_shadow_name}.otherresearchproduct_classifications AS SELECT * FROM ${stats_db_name}.otherresearchproduct_classifications;
@ -118,12 +149,15 @@ CREATE VIEW IF NOT EXISTS ${stats_db_shadow_name}.otherresearchproduct_languages
CREATE VIEW IF NOT EXISTS ${stats_db_shadow_name}.otherresearchproduct_licenses AS SELECT * FROM ${stats_db_name}.otherresearchproduct_licenses;
CREATE VIEW IF NOT EXISTS ${stats_db_shadow_name}.otherresearchproduct_oids AS SELECT * FROM ${stats_db_name}.otherresearchproduct_oids;
CREATE VIEW IF NOT EXISTS ${stats_db_shadow_name}.otherresearchproduct_pids AS SELECT * FROM ${stats_db_name}.otherresearchproduct_pids;
CREATE VIEW IF NOT EXISTS ${stats_db_shadow_name}.otherresearchproduct_refereed AS SELECT * FROM ${stats_db_name}.otherresearchproduct_refereed;
CREATE VIEW IF NOT EXISTS ${stats_db_shadow_name}.otherresearchproduct_sources AS SELECT * FROM ${stats_db_name}.otherresearchproduct_sources;
CREATE VIEW IF NOT EXISTS ${stats_db_shadow_name}.otherresearchproduct_topics AS SELECT * FROM ${stats_db_name}.otherresearchproduct_topics;
CREATE VIEW IF NOT EXISTS ${stats_db_shadow_name}.project AS SELECT * FROM ${stats_db_name}.project;
CREATE VIEW IF NOT EXISTS ${stats_db_shadow_name}.project_oids AS SELECT * FROM ${stats_db_name}.project_oids;
CREATE VIEW IF NOT EXISTS ${stats_db_shadow_name}.project_organizations AS SELECT * FROM ${stats_db_name}.project_organizations;
CREATE VIEW IF NOT EXISTS ${stats_db_shadow_name}.project_results AS SELECT * FROM ${stats_db_name}.project_results;
CREATE VIEW IF NOT EXISTS ${stats_db_shadow_name}.project_resultcount AS SELECT * FROM ${stats_db_name}.project_resultcount;
CREATE VIEW IF NOT EXISTS ${stats_db_shadow_name}.project_results_publication AS SELECT * FROM ${stats_db_name}.project_results_publication;
CREATE VIEW IF NOT EXISTS ${stats_db_shadow_name}.publication AS SELECT * FROM ${stats_db_name}.publication;
CREATE VIEW IF NOT EXISTS ${stats_db_shadow_name}.publication_citations AS SELECT * FROM ${stats_db_name}.publication_citations;
CREATE VIEW IF NOT EXISTS ${stats_db_shadow_name}.publication_classifications AS SELECT * FROM ${stats_db_name}.publication_classifications;
@ -133,19 +167,28 @@ CREATE VIEW IF NOT EXISTS ${stats_db_shadow_name}.publication_languages AS SELEC
CREATE VIEW IF NOT EXISTS ${stats_db_shadow_name}.publication_licenses AS SELECT * FROM ${stats_db_name}.publication_licenses;
CREATE VIEW IF NOT EXISTS ${stats_db_shadow_name}.publication_oids AS SELECT * FROM ${stats_db_name}.publication_oids;
CREATE VIEW IF NOT EXISTS ${stats_db_shadow_name}.publication_pids AS SELECT * FROM ${stats_db_name}.publication_pids;
CREATE VIEW IF NOT EXISTS ${stats_db_shadow_name}.publication_refereed AS SELECT * FROM ${stats_db_name}.publication_refereed;
CREATE VIEW IF NOT EXISTS ${stats_db_shadow_name}.publication_sources AS SELECT * FROM ${stats_db_name}.publication_sources;
CREATE VIEW IF NOT EXISTS ${stats_db_shadow_name}.publication_topics AS SELECT * FROM ${stats_db_name}.publication_topics;
CREATE VIEW IF NOT EXISTS ${stats_db_shadow_name}.result AS SELECT * FROM ${stats_db_name}.result;
CREATE VIEW IF NOT EXISTS ${stats_db_shadow_name}.result_affiliated_country AS SELECT * FROM ${stats_db_name}.result_affiliated_country;
CREATE VIEW IF NOT EXISTS ${stats_db_shadow_name}.result_citations AS SELECT * FROM ${stats_db_name}.result_citations;
CREATE VIEW IF NOT EXISTS ${stats_db_shadow_name}.result_classifications AS SELECT * FROM ${stats_db_name}.result_classifications;
CREATE VIEW IF NOT EXISTS ${stats_db_shadow_name}.result_concepts AS SELECT * FROM ${stats_db_name}.result_concepts;
CREATE VIEW IF NOT EXISTS ${stats_db_shadow_name}.result_datasources AS SELECT * FROM ${stats_db_name}.result_datasources;
CREATE VIEW IF NOT EXISTS ${stats_db_shadow_name}.result_deposited_country AS SELECT * FROM ${stats_db_name}.result_deposited_country;
CREATE VIEW IF NOT EXISTS ${stats_db_shadow_name}.result_fundercount AS SELECT * FROM ${stats_db_name}.result_fundercount;
CREATE VIEW IF NOT EXISTS ${stats_db_shadow_name}.result_gold AS SELECT * FROM ${stats_db_name}.result_gold;
CREATE VIEW IF NOT EXISTS ${stats_db_shadow_name}.result_greenoa AS SELECT * FROM ${stats_db_name}.result_greenoa;
CREATE VIEW IF NOT EXISTS ${stats_db_shadow_name}.result_languages AS SELECT * FROM ${stats_db_name}.result_languages;
CREATE VIEW IF NOT EXISTS ${stats_db_shadow_name}.result_licenses AS SELECT * FROM ${stats_db_name}.result_licenses;
CREATE VIEW IF NOT EXISTS ${stats_db_shadow_name}.result_oids AS SELECT * FROM ${stats_db_name}.result_oids;
CREATE VIEW IF NOT EXISTS ${stats_db_shadow_name}.result_organization AS SELECT * FROM ${stats_db_name}.result_organization;
CREATE VIEW IF NOT EXISTS ${stats_db_shadow_name}.result_peerreviewed AS SELECT * FROM ${stats_db_name}.result_peerreviewed;
CREATE VIEW IF NOT EXISTS ${stats_db_shadow_name}.result_pids AS SELECT * FROM ${stats_db_name}.result_pids;
CREATE VIEW IF NOT EXISTS ${stats_db_shadow_name}.result_projectcount AS SELECT * FROM ${stats_db_name}.result_projectcount;
CREATE VIEW IF NOT EXISTS ${stats_db_shadow_name}.result_projects AS SELECT * FROM ${stats_db_name}.result_projects;
CREATE VIEW IF NOT EXISTS ${stats_db_shadow_name}.result_refereed AS SELECT * FROM ${stats_db_name}.result_refereed;
CREATE VIEW IF NOT EXISTS ${stats_db_shadow_name}.result_sources AS SELECT * FROM ${stats_db_name}.result_sources;
CREATE VIEW IF NOT EXISTS ${stats_db_shadow_name}.result_topics AS SELECT * FROM ${stats_db_name}.result_topics;
CREATE VIEW IF NOT EXISTS ${stats_db_shadow_name}.rndexpediture AS SELECT * FROM ${stats_db_name}.rndexpediture;
@ -159,5 +202,6 @@ CREATE VIEW IF NOT EXISTS ${stats_db_shadow_name}.software_languages AS SELECT *
CREATE VIEW IF NOT EXISTS ${stats_db_shadow_name}.software_licenses AS SELECT * FROM ${stats_db_name}.software_licenses;
CREATE VIEW IF NOT EXISTS ${stats_db_shadow_name}.software_oids AS SELECT * FROM ${stats_db_name}.software_oids;
CREATE VIEW IF NOT EXISTS ${stats_db_shadow_name}.software_pids AS SELECT * FROM ${stats_db_name}.software_pids;
CREATE VIEW IF NOT EXISTS ${stats_db_shadow_name}.software_refereed AS SELECT * FROM ${stats_db_name}.software_refereed;
CREATE VIEW IF NOT EXISTS ${stats_db_shadow_name}.software_sources AS SELECT * FROM ${stats_db_name}.software_sources;
CREATE VIEW IF NOT EXISTS ${stats_db_shadow_name}.software_topics AS SELECT * FROM ${stats_db_name}.software_topics;

View File

@ -5,77 +5,4 @@
------------------------------------------------------
------------------------------------------------------
COMPUTE STATS country;
COMPUTE STATS countrygdp;
COMPUTE STATS dataset;
COMPUTE STATS dataset_citations;
COMPUTE STATS dataset_classifications;
COMPUTE STATS dataset_concepts;
COMPUTE STATS dataset_datasources;
COMPUTE STATS dataset_languages;
COMPUTE STATS dataset_oids;
COMPUTE STATS dataset_pids;
COMPUTE STATS dataset_sources;
COMPUTE STATS dataset_topics;
COMPUTE STATS datasource;
COMPUTE STATS datasource_languages;
COMPUTE STATS datasource_oids;
COMPUTE STATS datasource_organizations;
COMPUTE STATS datasource_results;
COMPUTE STATS fundref;
COMPUTE STATS numbers_country;
COMPUTE STATS organization;
COMPUTE STATS organization_datasources;
COMPUTE STATS organization_projects;
COMPUTE STATS otherresearchproduct;
COMPUTE STATS otherresearchproduct_citations;
COMPUTE STATS otherresearchproduct_classifications;
COMPUTE STATS otherresearchproduct_concepts;
COMPUTE STATS otherresearchproduct_datasources;
COMPUTE STATS otherresearchproduct_languages;
COMPUTE STATS otherresearchproduct_licenses;
COMPUTE STATS otherresearchproduct_oids;
COMPUTE STATS otherresearchproduct_pids;
COMPUTE STATS otherresearchproduct_sources;
COMPUTE STATS otherresearchproduct_topics;
COMPUTE STATS project;
COMPUTE STATS project_oids;
COMPUTE STATS project_organizations;
COMPUTE STATS project_results;
COMPUTE STATS publication;
COMPUTE STATS publication_citations;
COMPUTE STATS publication_classifications;
COMPUTE STATS publication_concepts;
COMPUTE STATS publication_datasources;
COMPUTE STATS publication_languages;
COMPUTE STATS publication_licenses;
COMPUTE STATS publication_oids;
COMPUTE STATS publication_pids;
COMPUTE STATS publication_sources;
COMPUTE STATS publication_topics;
COMPUTE STATS result;
COMPUTE STATS result_citations;
COMPUTE STATS result_classifications;
COMPUTE STATS result_concepts;
COMPUTE STATS result_datasources;
COMPUTE STATS result_languages;
COMPUTE STATS result_licenses;
COMPUTE STATS result_oids;
COMPUTE STATS result_organization;
COMPUTE STATS result_pids;
COMPUTE STATS result_projects;
COMPUTE STATS result_sources;
COMPUTE STATS result_topics;
COMPUTE STATS rndexpediture;
COMPUTE STATS roarmap;
COMPUTE STATS software;
COMPUTE STATS software_citations;
COMPUTE STATS software_classifications;
COMPUTE STATS software_concepts;
COMPUTE STATS software_datasources;
COMPUTE STATS software_languages;
COMPUTE STATS software_licenses;
COMPUTE STATS software_oids;
COMPUTE STATS software_pids;
COMPUTE STATS software_sources;
COMPUTE STATS software_topics;
INVALIDATE METADATA ${stats_db_name};

View File

@ -0,0 +1,8 @@
------------------------------------------------------
------------------------------------------------------
-- Impala table statistics - Needed to make the tables
-- visible for impala
------------------------------------------------------
------------------------------------------------------
INVALIDATE METADATA ${stats_db_name};

View File

@ -17,19 +17,28 @@ case when size(p.description) > 0 then true else false end as abstract,
from ${openaire_db_name}.publication p
where p.datainfo.deletedbyinference=false;
CREATE TABLE ${stats_db_name}.publication_classifications AS SELECT substr(p.id, 4) as id, instancetype.classname as type from ${openaire_db_name}.publication p LATERAL VIEW explode(p.instance.instancetype) instances as instancetype;
CREATE TABLE ${stats_db_name}.publication_classifications AS SELECT substr(p.id, 4) as id, instancetype.classname as type from ${openaire_db_name}.publication p LATERAL VIEW explode(p.instance.instancetype) instances as instancetype where p.datainfo.deletedbyinference=false;
CREATE TABLE ${stats_db_name}.publication_concepts AS SELECT substr(p.id, 4) as id, contexts.context.id as concept from ${openaire_db_name}.publication p LATERAL VIEW explode(p.context) contexts as context;
CREATE TABLE ${stats_db_name}.publication_concepts AS SELECT substr(p.id, 4) as id, contexts.context.id as concept from ${openaire_db_name}.publication p LATERAL VIEW explode(p.context) contexts as context where p.datainfo.deletedbyinference=false;
CREATE TABLE ${stats_db_name}.publication_datasources as SELECT p.id, case when d.id is null then 'other' else p.datasource end as datasource FROM (SELECT substr(p.id, 4) as id, substr(instances.instance.hostedby.key, 4) as datasource from ${openaire_db_name}.publication p lateral view explode(p.instance) instances as instance) p LEFT OUTER JOIN (SELECT substr(d.id, 4) id from ${openaire_db_name}.datasource d WHERE d.datainfo.deletedbyinference=false) d on p.datasource = d.id;
CREATE TABLE ${stats_db_name}.publication_datasources as
SELECT p.id, case when d.id is null then 'other' else p.datasource end as datasource
FROM (
SELECT substr(p.id, 4) as id, substr(instances.instance.hostedby.key, 4) as datasource
from ${openaire_db_name}.publication p lateral view explode(p.instance) instances as instance
where p.datainfo.deletedbyinference=false ) p
LEFT OUTER JOIN (
SELECT substr(d.id, 4) id
from ${openaire_db_name}.datasource d
WHERE d.datainfo.deletedbyinference=false ) d on p.datasource = d.id;
CREATE TABLE ${stats_db_name}.publication_languages AS select substr(p.id, 4) as id, p.language.classname as language FROM ${openaire_db_name}.publication p;
CREATE TABLE ${stats_db_name}.publication_languages AS select substr(p.id, 4) as id, p.language.classname as language FROM ${openaire_db_name}.publication p where p.datainfo.deletedbyinference=false;
CREATE TABLE ${stats_db_name}.publication_oids AS SELECT substr(p.id, 4) AS id, oids.ids AS oid FROM ${openaire_db_name}.publication p LATERAL VIEW explode(p.originalid) oids AS ids;
CREATE TABLE ${stats_db_name}.publication_oids AS SELECT substr(p.id, 4) AS id, oids.ids AS oid FROM ${openaire_db_name}.publication p LATERAL VIEW explode(p.originalid) oids AS ids where p.datainfo.deletedbyinference=false;
CREATE TABLE ${stats_db_name}.publication_pids AS SELECT substr(p.id, 4) AS id, ppid.qualifier.classname AS type, ppid.value as pid FROM ${openaire_db_name}.publication p LATERAL VIEW explode(p.pid) pids AS ppid;
CREATE TABLE ${stats_db_name}.publication_pids AS SELECT substr(p.id, 4) AS id, ppid.qualifier.classname AS type, ppid.value as pid FROM ${openaire_db_name}.publication p LATERAL VIEW explode(p.pid) pids AS ppid where p.datainfo.deletedbyinference=false;
CREATE TABLE ${stats_db_name}.publication_topics as select substr(p.id, 4) AS id, subjects.subject.qualifier.classname AS TYPE, subjects.subject.value AS topic FROM ${openaire_db_name}.publication p LATERAL VIEW explode(p.subject) subjects AS subject;
CREATE TABLE ${stats_db_name}.publication_topics as select substr(p.id, 4) AS id, subjects.subject.qualifier.classname AS TYPE, subjects.subject.value AS topic FROM ${openaire_db_name}.publication p LATERAL VIEW explode(p.subject) subjects AS subject where p.datainfo.deletedbyinference=false;
-- Publication_citations
CREATE TABLE ${stats_db_name}.publication_citations AS SELECT substr(p.id, 4) AS id, xpath_string(citation.value, "//citation/id[@type='openaire']/@value") AS result FROM ${openaire_db_name}.publication p lateral view explode(p.extrainfo) citations AS citation WHERE xpath_string(citation.value, "//citation/id[@type='openaire']/@value") !="";
CREATE TABLE ${stats_db_name}.publication_citations AS SELECT substr(p.id, 4) AS id, xpath_string(citation.value, "//citation/id[@type='openaire']/@value") AS result FROM ${openaire_db_name}.publication p lateral view explode(p.extrainfo) citations AS citation WHERE xpath_string(citation.value, "//citation/id[@type='openaire']/@value") !="" and p.datainfo.deletedbyinference=false;

View File

@ -17,20 +17,20 @@ FROM ${openaire_db_name}.dataset d
WHERE d.datainfo.deletedbyinference=FALSE;
-- Dataset_citations
CREATE TABLE ${stats_db_name}.dataset_citations AS SELECT substr(d.id, 4) AS id, xpath_string(citation.value, "//citation/id[@type='openaire']/@value") AS result FROM ${openaire_db_name}.dataset d LATERAL VIEW explode(d.extrainfo) citations AS citation WHERE xpath_string(citation.value, "//citation/id[@type='openaire']/@value") !="";
CREATE TABLE ${stats_db_name}.dataset_citations AS SELECT substr(d.id, 4) AS id, xpath_string(citation.value, "//citation/id[@type='openaire']/@value") AS result FROM ${openaire_db_name}.dataset d LATERAL VIEW explode(d.extrainfo) citations AS citation WHERE xpath_string(citation.value, "//citation/id[@type='openaire']/@value") !="" and d.datainfo.deletedbyinference=false;
CREATE TABLE ${stats_db_name}.dataset_classifications AS SELECT substr(p.id, 4) AS id, instancetype.classname AS type FROM ${openaire_db_name}.dataset p LATERAL VIEW explode(p.instance.instancetype) instances AS instancetype;
CREATE TABLE ${stats_db_name}.dataset_classifications AS SELECT substr(p.id, 4) AS id, instancetype.classname AS type FROM ${openaire_db_name}.dataset p LATERAL VIEW explode(p.instance.instancetype) instances AS instancetype where p.datainfo.deletedbyinference=false;
CREATE TABLE ${stats_db_name}.dataset_concepts AS SELECT substr(p.id, 4) as id, contexts.context.id as concept from ${openaire_db_name}.dataset p LATERAL VIEW explode(p.context) contexts as context;
CREATE TABLE ${stats_db_name}.dataset_concepts AS SELECT substr(p.id, 4) as id, contexts.context.id as concept from ${openaire_db_name}.dataset p LATERAL VIEW explode(p.context) contexts as context where p.datainfo.deletedbyinference=false;
CREATE TABLE ${stats_db_name}.dataset_datasources AS SELECT p.id, case when d.id IS NULL THEN 'other' ELSE p.datasource END AS datasource FROM (SELECT substr(p.id, 4) as id, substr(instances.instance.hostedby.key, 4) AS datasource
FROM ${openaire_db_name}.dataset p LATERAL VIEW explode(p.instance) instances AS instance) p LEFT OUTER JOIN
FROM ${openaire_db_name}.dataset p LATERAL VIEW explode(p.instance) instances AS instance where p.datainfo.deletedbyinference=false) p LEFT OUTER JOIN
(SELECT substr(d.id, 4) id FROM ${openaire_db_name}.datasource d WHERE d.datainfo.deletedbyinference=false) d ON p.datasource = d.id;
CREATE TABLE ${stats_db_name}.dataset_languages AS SELECT substr(p.id, 4) AS id, p.language.classname AS language FROM ${openaire_db_name}.dataset p;
CREATE TABLE ${stats_db_name}.dataset_languages AS SELECT substr(p.id, 4) AS id, p.language.classname AS language FROM ${openaire_db_name}.dataset p where p.datainfo.deletedbyinference=false;
CREATE TABLE ${stats_db_name}.dataset_oids AS SELECT substr(p.id, 4) AS id, oids.ids AS oid FROM ${openaire_db_name}.dataset p LATERAL VIEW explode(p.originalid) oids AS ids;
CREATE TABLE ${stats_db_name}.dataset_oids AS SELECT substr(p.id, 4) AS id, oids.ids AS oid FROM ${openaire_db_name}.dataset p LATERAL VIEW explode(p.originalid) oids AS ids where p.datainfo.deletedbyinference=false;
CREATE TABLE ${stats_db_name}.dataset_pids AS SELECT substr(p.id, 4) AS id, ppid.qualifier.classname AS type, ppid.value AS pid FROM ${openaire_db_name}.dataset p LATERAL VIEW explode(p.pid) pids AS ppid;
CREATE TABLE ${stats_db_name}.dataset_pids AS SELECT substr(p.id, 4) AS id, ppid.qualifier.classname AS type, ppid.value AS pid FROM ${openaire_db_name}.dataset p LATERAL VIEW explode(p.pid) pids AS ppid where p.datainfo.deletedbyinference=false;
CREATE TABLE ${stats_db_name}.dataset_topics AS SELECT substr(p.id, 4) AS id, subjects.subject.qualifier.classname AS type, subjects.subject.value AS topic FROM ${openaire_db_name}.dataset p LATERAL VIEW explode(p.subject) subjects AS subject;
CREATE TABLE ${stats_db_name}.dataset_topics AS SELECT substr(p.id, 4) AS id, subjects.subject.qualifier.classname AS type, subjects.subject.value AS topic FROM ${openaire_db_name}.dataset p LATERAL VIEW explode(p.subject) subjects AS subject where p.datainfo.deletedbyinference=false;

View File

@ -17,20 +17,20 @@ from ${openaire_db_name}.software s
where s.datainfo.deletedbyinference=false;
-- Software_citations
CREATE TABLE ${stats_db_name}.software_citations AS SELECT substr(s.id, 4) as id, xpath_string(citation.value, "//citation/id[@type='openaire']/@value") AS RESULT FROM ${openaire_db_name}.software s LATERAL VIEW explode(s.extrainfo) citations as citation where xpath_string(citation.value, "//citation/id[@type='openaire']/@value") !="";
CREATE TABLE ${stats_db_name}.software_citations AS SELECT substr(s.id, 4) as id, xpath_string(citation.value, "//citation/id[@type='openaire']/@value") AS RESULT FROM ${openaire_db_name}.software s LATERAL VIEW explode(s.extrainfo) citations as citation where xpath_string(citation.value, "//citation/id[@type='openaire']/@value") !="" and s.datainfo.deletedbyinference=false;
CREATE TABLE ${stats_db_name}.software_classifications AS SELECT substr(p.id, 4) AS id, instancetype.classname AS type FROM ${openaire_db_name}.software p LATERAL VIEW explode(p.instance.instancetype) instances AS instancetype;
CREATE TABLE ${stats_db_name}.software_classifications AS SELECT substr(p.id, 4) AS id, instancetype.classname AS type FROM ${openaire_db_name}.software p LATERAL VIEW explode(p.instance.instancetype) instances AS instancetype where p.datainfo.deletedbyinference=false;
CREATE TABLE ${stats_db_name}.software_concepts AS SELECT substr(p.id, 4) AS id, contexts.context.id AS concept FROM ${openaire_db_name}.software p LATERAL VIEW explode(p.context) contexts AS context;
CREATE TABLE ${stats_db_name}.software_concepts AS SELECT substr(p.id, 4) AS id, contexts.context.id AS concept FROM ${openaire_db_name}.software p LATERAL VIEW explode(p.context) contexts AS context where p.datainfo.deletedbyinference=false;
CREATE TABLE ${stats_db_name}.software_datasources AS SELECT p.id, CASE WHEN d.id IS NULL THEN 'other' ELSE p.datasource end as datasource FROM (SELECT substr(p.id, 4) AS id, substr(instances.instance.hostedby.key, 4) AS datasource
FROM ${openaire_db_name}.software p LATERAL VIEW explode(p.instance) instances AS instance) p LEFT OUTER JOIN
FROM ${openaire_db_name}.software p LATERAL VIEW explode(p.instance) instances AS instance where p.datainfo.deletedbyinference=false) p LEFT OUTER JOIN
(SELECT substr(d.id, 4) id FROM ${openaire_db_name}.datasource d WHERE d.datainfo.deletedbyinference=false) d ON p.datasource = d.id;
CREATE TABLE ${stats_db_name}.software_languages AS select substr(p.id, 4) AS id, p.language.classname AS language FROM ${openaire_db_name}.software p;
CREATE TABLE ${stats_db_name}.software_languages AS select substr(p.id, 4) AS id, p.language.classname AS language FROM ${openaire_db_name}.software p where p.datainfo.deletedbyinference=false;
CREATE TABLE ${stats_db_name}.software_oids AS SELECT substr(p.id, 4) AS id, oids.ids AS oid FROM ${openaire_db_name}.software p LATERAL VIEW explode(p.originalid) oids AS ids;
CREATE TABLE ${stats_db_name}.software_oids AS SELECT substr(p.id, 4) AS id, oids.ids AS oid FROM ${openaire_db_name}.software p LATERAL VIEW explode(p.originalid) oids AS ids where p.datainfo.deletedbyinference=false;
CREATE TABLE ${stats_db_name}.software_pids AS SELECT substr(p.id, 4) AS id, ppid.qualifier.classname AS type, ppid.value AS pid FROM ${openaire_db_name}.software p LATERAL VIEW explode(p.pid) pids AS ppid;
CREATE TABLE ${stats_db_name}.software_pids AS SELECT substr(p.id, 4) AS id, ppid.qualifier.classname AS type, ppid.value AS pid FROM ${openaire_db_name}.software p LATERAL VIEW explode(p.pid) pids AS ppid where p.datainfo.deletedbyinference=false;
CREATE TABLE ${stats_db_name}.software_topics AS SELECT substr(p.id, 4) AS id, subjects.subject.qualifier.classname AS type, subjects.subject.value AS topic FROM ${openaire_db_name}.software p LATERAL VIEW explode(p.subject) subjects AS subject;
CREATE TABLE ${stats_db_name}.software_topics AS SELECT substr(p.id, 4) AS id, subjects.subject.qualifier.classname AS type, subjects.subject.value AS topic FROM ${openaire_db_name}.software p LATERAL VIEW explode(p.subject) subjects AS subject where p.datainfo.deletedbyinference=false;

View File

@ -17,21 +17,20 @@ FROM ${openaire_db_name}.otherresearchproduct o
WHERE o.datainfo.deletedbyinference=FALSE;
-- Otherresearchproduct_citations
CREATE TABLE ${stats_db_name}.otherresearchproduct_citations AS SELECT substr(o.id, 4) AS id, xpath_string(citation.value, "//citation/id[@type='openaire']/@value") AS RESULT FROM ${openaire_db_name}.otherresearchproduct o LATERAL VIEW explode(o.extrainfo) citations AS citation WHERE xpath_string(citation.value, "//citation/id[@type='openaire']/@value") !="";
CREATE TABLE ${stats_db_name}.otherresearchproduct_citations AS SELECT substr(o.id, 4) AS id, xpath_string(citation.value, "//citation/id[@type='openaire']/@value") AS RESULT FROM ${openaire_db_name}.otherresearchproduct o LATERAL VIEW explode(o.extrainfo) citations AS citation WHERE xpath_string(citation.value, "//citation/id[@type='openaire']/@value") !="" and o.datainfo.deletedbyinference=false;
CREATE TABLE ${stats_db_name}.otherresearchproduct_classifications AS SELECT substr(p.id, 4) AS id, instancetype.classname AS type FROM ${openaire_db_name}.otherresearchproduct p LATERAL VIEW explode(p.instance.instancetype) instances AS instancetype;
CREATE TABLE ${stats_db_name}.otherresearchproduct_concepts AS SELECT substr(p.id, 4) AS id, contexts.context.id AS concept FROM ${openaire_db_name}.otherresearchproduct p LATERAL VIEW explode(p.context) contexts AS context;
CREATE TABLE ${stats_db_name}.otherresearchproduct_classifications AS SELECT substr(p.id, 4) AS id, instancetype.classname AS type FROM ${openaire_db_name}.otherresearchproduct p LATERAL VIEW explode(p.instance.instancetype) instances AS instancetype where p.datainfo.deletedbyinference=false;
CREATE TABLE ${stats_db_name}.otherresearchproduct_concepts AS SELECT substr(p.id, 4) AS id, contexts.context.id AS concept FROM ${openaire_db_name}.otherresearchproduct p LATERAL VIEW explode(p.context) contexts AS context where p.datainfo.deletedbyinference=false;
CREATE TABLE ${stats_db_name}.otherresearchproduct_datasources AS SELECT p.id, CASE WHEN d.id IS NULL THEN 'other' ELSE p.datasource END AS datasource FROM (SELECT substr(p.id, 4) AS id, substr(instances.instance.hostedby.key, 4) AS datasource
from ${openaire_db_name}.otherresearchproduct p lateral view explode(p.instance) instances as instance) p LEFT OUTER JOIN
from ${openaire_db_name}.otherresearchproduct p lateral view explode(p.instance) instances as instance where p.datainfo.deletedbyinference=false) p LEFT OUTER JOIN
(SELECT substr(d.id, 4) id from ${openaire_db_name}.datasource d WHERE d.datainfo.deletedbyinference=false) d on p.datasource = d.id;
CREATE TABLE ${stats_db_name}.otherresearchproduct_languages AS SELECT substr(p.id, 4) AS id, p.language.classname AS language FROM ${openaire_db_name}.otherresearchproduct p;
CREATE TABLE ${stats_db_name}.otherresearchproduct_languages AS SELECT substr(p.id, 4) AS id, p.language.classname AS language FROM ${openaire_db_name}.otherresearchproduct p where p.datainfo.deletedbyinference=false;
CREATE TABLE ${stats_db_name}.otherresearchproduct_oids AS SELECT substr(p.id, 4) AS id, oids.ids AS oid FROM ${openaire_db_name}.otherresearchproduct p LATERAL VIEW explode(p.originalid) oids AS ids;
CREATE TABLE ${stats_db_name}.otherresearchproduct_oids AS SELECT substr(p.id, 4) AS id, oids.ids AS oid FROM ${openaire_db_name}.otherresearchproduct p LATERAL VIEW explode(p.originalid) oids AS ids where p.datainfo.deletedbyinference=false;
CREATE TABLE ${stats_db_name}.otherresearchproduct_pids AS SELECT substr(p.id, 4) AS id, ppid.qualifier.classname AS type, ppid.value AS pid FROM ${openaire_db_name}.otherresearchproduct p LATERAL VIEW explode(p.pid) pids AS ppid;
CREATE TABLE ${stats_db_name}.otherresearchproduct_pids AS SELECT substr(p.id, 4) AS id, ppid.qualifier.classname AS type, ppid.value AS pid FROM ${openaire_db_name}.otherresearchproduct p LATERAL VIEW explode(p.pid) pids AS ppid where p.datainfo.deletedbyinference=false;
CREATE TABLE ${stats_db_name}.otherresearchproduct_topics AS SELECT substr(p.id, 4) AS id, subjects.subject.qualifier.classname AS type, subjects.subject.value AS topic FROM ${openaire_db_name}.otherresearchproduct p LATERAL VIEW explode(p.subject) subjects AS subject;
CREATE TABLE ${stats_db_name}.otherresearchproduct_topics AS SELECT substr(p.id, 4) AS id, subjects.subject.qualifier.classname AS type, subjects.subject.value AS topic FROM ${openaire_db_name}.otherresearchproduct p LATERAL VIEW explode(p.subject) subjects AS subject where p.datainfo.deletedbyinference=false;

View File

@ -46,7 +46,7 @@
</configuration>
</global>
<start to="Step1"/>
<start to="Step18"/>
<kill name="Kill">
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
@ -237,6 +237,17 @@
<param>stats_db_name=${stats_db_name}</param>
<param>openaire_db_name=${openaire_db_name}</param>
</hive2>
<ok to="Step16_6"/>
<error to="Kill"/>
</action>
<action name="Step16_6">
<hive2 xmlns="uri:oozie:hive2-action:0.1">
<jdbc-url>${hive_jdbc_url}</jdbc-url>
<script>scripts/step16_6.sql</script>
<param>stats_db_name=${stats_db_name}</param>
<param>openaire_db_name=${openaire_db_name}</param>
</hive2>
<ok to="Step17"/>
<error to="Kill"/>
</action>
@ -259,12 +270,26 @@
<exec>impala-shell.sh</exec>
<argument>${stats_db_name}</argument>
<argument>step18.sql</argument>
<argument>/user/${wf:user()}/oa/graph/stats/oozie_app/scripts/step18.sql</argument>
<argument>${wf:appPath()}/scripts/step18.sql</argument>
<file>impala-shell.sh</file>
</shell>
<ok to="Step19"/>
<error to="Kill"/>
</action>
<action name="Step19">
<shell xmlns="uri:oozie:shell-action:0.1">
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<exec>impala-shell.sh</exec>
<argument>${stats_db_shadow_name}</argument>
<argument>step19.sql</argument>
<argument>${wf:appPath()}/scripts/step19.sql</argument>
<file>impala-shell.sh</file>
</shell>
<ok to="End"/>
<error to="Kill"/>
</action>
<end name="End"/>
</workflow-app>

View File

@ -50,7 +50,7 @@
<repository>
<id>dnet45-releases</id>
<name>D-Net 45 releases</name>
<url>http://maven.research-infrastructures.eu/nexus/content/repositories/dnet45-releases</url>
<url>https://maven.d4science.org/nexus/content/repositories/dnet45-releases</url>
<layout>default</layout>
<snapshots>
<enabled>false</enabled>
@ -639,12 +639,12 @@
<snapshotRepository>
<id>dnet45-snapshots</id>
<name>DNet45 Snapshots</name>
<url>http://maven.research-infrastructures.eu/nexus/content/repositories/dnet45-snapshots</url>
<url>https://maven.d4science.org/nexus/content/repositories/dnet45-snapshots</url>
<layout>default</layout>
</snapshotRepository>
<repository>
<id>dnet45-releases</id>
<url>http://maven.research-infrastructures.eu/nexus/content/repositories/dnet45-releases</url>
<url>https://maven.d4science.org/nexus/content/repositories/dnet45-releases</url>
</repository>
</distributionManagement>
<reporting>