diff --git a/dhp-build/dhp-code-style/pom.xml b/dhp-build/dhp-code-style/pom.xml index e60e8076ef..77aa2aedbb 100644 --- a/dhp-build/dhp-code-style/pom.xml +++ b/dhp-build/dhp-code-style/pom.xml @@ -15,12 +15,12 @@ dnet45-snapshots DNet45 Snapshots - http://maven.research-infrastructures.eu/nexus/content/repositories/dnet45-snapshots + https://maven.d4science.org/nexus/content/repositories/dnet45-snapshots default dnet45-releases - http://maven.research-infrastructures.eu/nexus/content/repositories/dnet45-releases + https://maven.d4science.org/nexus/content/repositories/dnet45-releases diff --git a/dhp-common/pom.xml b/dhp-common/pom.xml index 4ee706169d..3f4f11243a 100644 --- a/dhp-common/pom.xml +++ b/dhp-common/pom.xml @@ -104,11 +104,6 @@ dnet-pace-core - - eu.dnetlib.dhp - dhp-schemas - ${project.version} - diff --git a/dhp-common/src/main/java/eu/dnetlib/dhp/schema/oaf/CleaningFunctions.java b/dhp-common/src/main/java/eu/dnetlib/dhp/schema/oaf/CleaningFunctions.java index 12fbcc490f..390af6a97d 100644 --- a/dhp-common/src/main/java/eu/dnetlib/dhp/schema/oaf/CleaningFunctions.java +++ b/dhp-common/src/main/java/eu/dnetlib/dhp/schema/oaf/CleaningFunctions.java @@ -1,9 +1,7 @@ -package eu.dnetlib.dhp.schema.oaf; +package eu.dnetlib.dhp.oa.graph.clean; -import java.util.LinkedHashMap; -import java.util.Objects; -import java.util.Optional; +import java.util.*; import java.util.function.Function; import java.util.stream.Collectors; @@ -12,12 +10,19 @@ import org.apache.commons.lang3.StringUtils; import com.clearspring.analytics.util.Lists; import eu.dnetlib.dhp.schema.common.ModelConstants; +import eu.dnetlib.dhp.schema.oaf.*; public class CleaningFunctions { public static final String DOI_URL_PREFIX_REGEX = "(^http(s?):\\/\\/)(((dx\\.)?doi\\.org)|(handle\\.test\\.datacite\\.org))\\/"; public static final String ORCID_PREFIX_REGEX = "^http(s?):\\/\\/orcid\\.org\\/"; - public static final String NONE = "none"; + + public static final Set PID_BLACKLIST = new HashSet<>(); + + static { + PID_BLACKLIST.add("none"); + PID_BLACKLIST.add("na"); + } public static T fixVocabularyNames(T value) { if (value instanceof Datasource) { @@ -71,7 +76,7 @@ public class CleaningFunctions { return value; } - public static T fixDefaults(T value) { + protected static T fixDefaults(T value) { if (value instanceof Datasource) { // nothing to clean here } else if (value instanceof Project) { @@ -114,7 +119,7 @@ public class CleaningFunctions { .stream() .filter(Objects::nonNull) .filter(sp -> StringUtils.isNotBlank(StringUtils.trim(sp.getValue()))) - .filter(sp -> NONE.equalsIgnoreCase(sp.getValue())) + .filter(sp -> !PID_BLACKLIST.contains(sp.getValue().trim().toLowerCase())) .filter(sp -> Objects.nonNull(sp.getQualifier())) .filter(sp -> StringUtils.isNotBlank(sp.getQualifier().getClassid())) .map(CleaningFunctions::normalizePidValue) diff --git a/dhp-common/src/main/java/eu/dnetlib/dhp/schema/oaf/ModelHardLimits.java b/dhp-common/src/main/java/eu/dnetlib/dhp/schema/oaf/ModelHardLimits.java index 16fdc3760d..147bd31b27 100644 --- a/dhp-common/src/main/java/eu/dnetlib/dhp/schema/oaf/ModelHardLimits.java +++ b/dhp-common/src/main/java/eu/dnetlib/dhp/schema/oaf/ModelHardLimits.java @@ -3,6 +3,10 @@ package eu.dnetlib.dhp.schema.oaf; public class ModelHardLimits { + public static final String LAYOUT = "index"; + public static final String INTERPRETATION = "openaire"; + public static final String SEPARATOR = "-"; + public static final int MAX_EXTERNAL_ENTITIES = 50; public static final int MAX_AUTHORS = 200; public static final int MAX_AUTHOR_FULLNAME_LENGTH = 1000; @@ -11,4 +15,8 @@ public class ModelHardLimits { public static final int MAX_ABSTRACT_LENGTH = 150000; public static final int MAX_INSTANCES = 10; + public static String getCollectionName(String format) { + return format + SEPARATOR + LAYOUT + SEPARATOR + INTERPRETATION; + } + } diff --git a/dhp-common/src/main/java/eu/dnetlib/dhp/schema/oaf/OafMapperUtils.java b/dhp-common/src/main/java/eu/dnetlib/dhp/schema/oaf/OafMapperUtils.java index f079c55afd..301afaccef 100644 --- a/dhp-common/src/main/java/eu/dnetlib/dhp/schema/oaf/OafMapperUtils.java +++ b/dhp-common/src/main/java/eu/dnetlib/dhp/schema/oaf/OafMapperUtils.java @@ -2,7 +2,6 @@ package eu.dnetlib.dhp.schema.oaf; import static eu.dnetlib.dhp.schema.common.ModelConstants.*; -import static eu.dnetlib.dhp.schema.common.ModelConstants.DNET_ACCESS_MODES; import java.util.*; import java.util.concurrent.ConcurrentHashMap; @@ -13,10 +12,43 @@ import java.util.stream.Collectors; import org.apache.commons.lang3.StringUtils; import eu.dnetlib.dhp.schema.common.LicenseComparator; +import eu.dnetlib.dhp.schema.common.ModelSupport; import eu.dnetlib.dhp.utils.DHPUtils; public class OafMapperUtils { + public static Oaf merge(final Oaf o1, final Oaf o2) { + if (ModelSupport.isSubClass(o1, OafEntity.class)) { + if (ModelSupport.isSubClass(o1, Result.class)) { + + return mergeResults((Result) o1, (Result) o2); + } else if (ModelSupport.isSubClass(o1, Datasource.class)) { + ((Datasource) o1).mergeFrom((Datasource) o2); + } else if (ModelSupport.isSubClass(o1, Organization.class)) { + ((Organization) o1).mergeFrom((Organization) o2); + } else if (ModelSupport.isSubClass(o1, Project.class)) { + ((Project) o1).mergeFrom((Project) o2); + } else { + throw new RuntimeException("invalid OafEntity subtype:" + o1.getClass().getCanonicalName()); + } + } else if (ModelSupport.isSubClass(o1, Relation.class)) { + ((Relation) o1).mergeFrom((Relation) o2); + } else { + throw new RuntimeException("invalid Oaf type:" + o1.getClass().getCanonicalName()); + } + return o1; + } + + public static Result mergeResults(Result r1, Result r2) { + if (new ResultTypeComparator().compare(r1, r2) < 0) { + r1.mergeFrom(r2); + return r1; + } else { + r2.mergeFrom(r1); + return r2; + } + } + public static KeyValue keyValue(final String k, final String v) { final KeyValue kv = new KeyValue(); kv.setKey(k); diff --git a/dhp-common/src/main/java/eu/dnetlib/dhp/schema/oaf/utils/IdentifierFactory.java b/dhp-common/src/main/java/eu/dnetlib/dhp/schema/oaf/utils/IdentifierFactory.java index 319cda0bf1..28e4accca7 100644 --- a/dhp-common/src/main/java/eu/dnetlib/dhp/schema/oaf/utils/IdentifierFactory.java +++ b/dhp-common/src/main/java/eu/dnetlib/dhp/schema/oaf/utils/IdentifierFactory.java @@ -3,12 +3,10 @@ package eu.dnetlib.dhp.schema.oaf.utils; import java.io.Serializable; import java.util.Objects; -import java.util.Optional; -import java.util.regex.Pattern; import org.apache.commons.lang.StringUtils; -import eu.dnetlib.dhp.schema.oaf.CleaningFunctions; +import eu.dnetlib.dhp.oa.graph.clean.CleaningFunctions; import eu.dnetlib.dhp.schema.oaf.OafEntity; import eu.dnetlib.dhp.schema.oaf.StructuredProperty; import eu.dnetlib.dhp.utils.DHPUtils; diff --git a/dhp-common/src/main/java/eu/dnetlib/dhp/utils/DHPUtils.java b/dhp-common/src/main/java/eu/dnetlib/dhp/utils/DHPUtils.java index dfbaf3a6ca..8872174a5c 100644 --- a/dhp-common/src/main/java/eu/dnetlib/dhp/utils/DHPUtils.java +++ b/dhp-common/src/main/java/eu/dnetlib/dhp/utils/DHPUtils.java @@ -5,6 +5,7 @@ import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.nio.charset.StandardCharsets; import java.security.MessageDigest; +import java.util.List; import java.util.zip.GZIPInputStream; import java.util.zip.GZIPOutputStream; @@ -15,9 +16,15 @@ import org.apache.commons.codec.binary.Hex; import com.jayway.jsonpath.JsonPath; import net.minidev.json.JSONArray; +import scala.collection.JavaConverters; +import scala.collection.Seq; public class DHPUtils { + public static Seq toSeq(List list) { + return JavaConverters.asScalaIteratorConverter(list.iterator()).asScala().toSeq(); + } + public static String md5(final String s) { try { final MessageDigest md = MessageDigest.getInstance("MD5"); diff --git a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/notifications_only/oozie_app/workflow.xml b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/notifications_only/oozie_app/workflow.xml index f629c2101e..879c0d3490 100644 --- a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/notifications_only/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/notifications_only/oozie_app/workflow.xml @@ -1,4 +1,4 @@ - + diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/CleanGraphSparkJob.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/CleanGraphSparkJob.java index a15c2ee623..8231dd77e0 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/CleanGraphSparkJob.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/CleanGraphSparkJob.java @@ -6,7 +6,9 @@ import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; import java.util.Optional; import org.apache.commons.io.IOUtils; +import org.apache.commons.lang3.StringUtils; import org.apache.spark.SparkConf; +import org.apache.spark.api.java.function.FilterFunction; import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoders; @@ -20,7 +22,8 @@ import com.fasterxml.jackson.databind.ObjectMapper; import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.common.HdfsSupport; import eu.dnetlib.dhp.oa.graph.raw.common.VocabularyGroup; -import eu.dnetlib.dhp.schema.oaf.*; +import eu.dnetlib.dhp.schema.oaf.Oaf; +import eu.dnetlib.dhp.schema.oaf.OafEntity; import eu.dnetlib.dhp.utils.ISLookupClientFactory; import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; @@ -68,12 +71,12 @@ public class CleanGraphSparkJob { conf, isSparkSessionManaged, spark -> { - removeOutputDir(spark, outputPath); - fixGraphTable(spark, vocs, inputPath, entityClazz, outputPath); + HdfsSupport.remove(outputPath, spark.sparkContext().hadoopConfiguration()); + cleanGraphTable(spark, vocs, inputPath, entityClazz, outputPath); }); } - private static void fixGraphTable( + private static void cleanGraphTable( SparkSession spark, VocabularyGroup vocs, String inputPath, @@ -99,13 +102,15 @@ public class CleanGraphSparkJob { return spark .read() .textFile(inputEntityPath) + .filter((FilterFunction) s -> isEntityType(s, clazz)) + .map((MapFunction) s -> StringUtils.substringAfter(s, "|"), Encoders.STRING()) .map( (MapFunction) value -> OBJECT_MAPPER.readValue(value, clazz), Encoders.bean(clazz)); } - private static void removeOutputDir(SparkSession spark, String path) { - HdfsSupport.remove(path, spark.sparkContext().hadoopConfiguration()); + private static boolean isEntityType(final String s, final Class clazz) { + return StringUtils.substringBefore(s, "|").equals(clazz.getName()); } } diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/GroupEntitiesAndRelationsSparkJob.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/GroupEntitiesAndRelationsSparkJob.java new file mode 100644 index 0000000000..9c80528e34 --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/GroupEntitiesAndRelationsSparkJob.java @@ -0,0 +1,206 @@ + +package eu.dnetlib.dhp.oa.graph.clean; + +import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; +import static eu.dnetlib.dhp.utils.DHPUtils.toSeq; + +import java.io.IOException; +import java.util.List; +import java.util.Objects; +import java.util.Optional; +import java.util.stream.Collectors; + +import org.apache.commons.io.IOUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.api.java.function.FilterFunction; +import org.apache.spark.api.java.function.MapFunction; +import org.apache.spark.sql.*; +import org.apache.spark.sql.expressions.Aggregator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.jayway.jsonpath.Configuration; +import com.jayway.jsonpath.DocumentContext; +import com.jayway.jsonpath.JsonPath; +import com.jayway.jsonpath.Option; + +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.common.HdfsSupport; +import eu.dnetlib.dhp.schema.common.ModelSupport; +import eu.dnetlib.dhp.schema.oaf.*; +import scala.Tuple2; + +/** + * Groups the graph content by entity identifier to ensure ID uniqueness + */ +public class GroupEntitiesAndRelationsSparkJob { + + private static final Logger log = LoggerFactory.getLogger(GroupEntitiesAndRelationsSparkJob.class); + + private final static String ID_JPATH = "$.id"; + + private final static String SOURCE_JPATH = "$.source"; + + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + + public static void main(String[] args) throws Exception { + + String jsonConfiguration = IOUtils + .toString( + GroupEntitiesAndRelationsSparkJob.class + .getResourceAsStream( + "/eu/dnetlib/dhp/oa/graph/group_graph_entities_parameters.json")); + final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration); + parser.parseArgument(args); + + Boolean isSparkSessionManaged = Optional + .ofNullable(parser.get("isSparkSessionManaged")) + .map(Boolean::valueOf) + .orElse(Boolean.TRUE); + log.info("isSparkSessionManaged: {}", isSparkSessionManaged); + + String graphInputPath = parser.get("graphInputPath"); + log.info("graphInputPath: {}", graphInputPath); + + String outputPath = parser.get("outputPath"); + log.info("outputPath: {}", outputPath); + + SparkConf conf = new SparkConf(); + conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); + conf.registerKryoClasses(ModelSupport.getOafModelClasses()); + + runWithSparkSession( + conf, + isSparkSessionManaged, + spark -> { + HdfsSupport.remove(outputPath, spark.sparkContext().hadoopConfiguration()); + groupEntitiesAndRelations(spark, graphInputPath, outputPath); + }); + } + + private static void groupEntitiesAndRelations( + SparkSession spark, + String inputPath, + String outputPath) { + + TypedColumn aggregator = new GroupingAggregator().toColumn(); + final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); + spark + .read() + .textFile(toSeq(listPaths(inputPath, sc))) + .map((MapFunction) s -> parseOaf(s), Encoders.kryo(Oaf.class)) + .filter((FilterFunction) oaf -> StringUtils.isNotBlank(ModelSupport.idFn().apply(oaf))) + .groupByKey((MapFunction) oaf -> ModelSupport.idFn().apply(oaf), Encoders.STRING()) + .agg(aggregator) + .map( + (MapFunction, String>) t -> t._2().getClass().getName() + + "|" + OBJECT_MAPPER.writeValueAsString(t._2()), + Encoders.STRING()) + .write() + .option("compression", "gzip") + .mode(SaveMode.Overwrite) + .text(outputPath); + } + + public static class GroupingAggregator extends Aggregator { + + @Override + public Oaf zero() { + return null; + } + + @Override + public Oaf reduce(Oaf b, Oaf a) { + return mergeAndGet(b, a); + } + + private Oaf mergeAndGet(Oaf b, Oaf a) { + if (Objects.nonNull(a) && Objects.nonNull(b)) { + return OafMapperUtils.merge(b, a); + } + return Objects.isNull(a) ? b : a; + } + + @Override + public Oaf merge(Oaf b, Oaf a) { + return mergeAndGet(b, a); + } + + @Override + public Oaf finish(Oaf j) { + return j; + } + + @Override + public Encoder bufferEncoder() { + return Encoders.kryo(Oaf.class); + } + + @Override + public Encoder outputEncoder() { + return Encoders.kryo(Oaf.class); + } + + } + + private static Oaf parseOaf(String s) { + + DocumentContext dc = JsonPath + .parse(s, Configuration.defaultConfiguration().addOptions(Option.SUPPRESS_EXCEPTIONS)); + final String id = dc.read(ID_JPATH); + if (StringUtils.isNotBlank(id)) { + + String prefix = StringUtils.substringBefore(id, "|"); + switch (prefix) { + case "10": + return parse(s, Datasource.class); + case "20": + return parse(s, Organization.class); + case "40": + return parse(s, Project.class); + case "50": + String resultType = dc.read("$.resulttype.classid"); + switch (resultType) { + case "publication": + return parse(s, Publication.class); + case "dataset": + return parse(s, eu.dnetlib.dhp.schema.oaf.Dataset.class); + case "software": + return parse(s, Software.class); + case "other": + return parse(s, OtherResearchProduct.class); + default: + throw new IllegalArgumentException(String.format("invalid resultType: '%s'", resultType)); + } + default: + throw new IllegalArgumentException(String.format("invalid id prefix: '%s'", prefix)); + } + } else { + String source = dc.read(SOURCE_JPATH); + if (StringUtils.isNotBlank(source)) { + return parse(s, Relation.class); + } else { + throw new IllegalArgumentException(String.format("invalid oaf: '%s'", s)); + } + } + } + + private static Oaf parse(String s, Class clazz) { + try { + return OBJECT_MAPPER.readValue(s, clazz); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + private static List listPaths(String inputPath, JavaSparkContext sc) { + return HdfsSupport + .listFiles(inputPath, sc.hadoopConfiguration()) + .stream() + .collect(Collectors.toList()); + } + +} diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/merge/MergeGraphSparkJob.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/merge/MergeGraphTableSparkJob.java similarity index 98% rename from dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/merge/MergeGraphSparkJob.java rename to dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/merge/MergeGraphTableSparkJob.java index 037683604a..e53f4ca30d 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/merge/MergeGraphSparkJob.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/merge/MergeGraphTableSparkJob.java @@ -33,9 +33,9 @@ import scala.Tuple2; * are picked preferring those from the BETA aggregator rather then from PROD. The identity of a relationship is defined * by eu.dnetlib.dhp.schema.common.ModelSupport#idFn() */ -public class MergeGraphSparkJob { +public class MergeGraphTableSparkJob { - private static final Logger log = LoggerFactory.getLogger(CleanGraphSparkJob.class); + private static final Logger log = LoggerFactory.getLogger(MergeGraphTableSparkJob.class); private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/AbstractMdRecordToOafMapper.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/AbstractMdRecordToOafMapper.java index 0512258573..9db56198fa 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/AbstractMdRecordToOafMapper.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/AbstractMdRecordToOafMapper.java @@ -258,8 +258,8 @@ public abstract class AbstractMdRecordToOafMapper { r.setCollectedfrom(Arrays.asList(collectedFrom)); r.setPid(prepareResultPids(doc, info)); - r.setDateofcollection(doc.valueOf("//dr:dateOfCollection")); - r.setDateoftransformation(doc.valueOf("//dr:dateOfTransformation")); + r.setDateofcollection(doc.valueOf("//dr:dateOfCollection|//dri:dateOfCollection")); + r.setDateoftransformation(doc.valueOf("//dr:dateOfTransformation|//dri:dateOfTransformation")); r.setExtraInfo(new ArrayList<>()); // NOT PRESENT IN MDSTORES r.setOaiprovenance(prepareOAIprovenance(doc)); r.setAuthor(prepareAuthors(doc, info)); diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/GenerateEntitiesApplication.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/GenerateEntitiesApplication.java index b235c3f544..cfd190670f 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/GenerateEntitiesApplication.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/GenerateEntitiesApplication.java @@ -4,9 +4,11 @@ package eu.dnetlib.dhp.oa.graph.raw; import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; import java.io.IOException; -import java.util.*; +import java.util.Arrays; +import java.util.List; +import java.util.Objects; +import java.util.Optional; import java.util.stream.Collectors; -import java.util.stream.Stream; import org.apache.commons.io.IOUtils; import org.apache.commons.lang3.StringUtils; @@ -18,7 +20,6 @@ import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.SparkSession; -import org.jetbrains.annotations.NotNull; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -68,7 +69,7 @@ public class GenerateEntitiesApplication { final SparkConf conf = new SparkConf(); runWithSparkSession(conf, isSparkSessionManaged, spark -> { - removeOutputDir(spark, targetPath); + HdfsSupport.remove(targetPath, spark.sparkContext().hadoopConfiguration()); generateEntities(spark, vocs, sourcePaths, targetPath); }); } @@ -82,7 +83,7 @@ public class GenerateEntitiesApplication { final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); final List existingSourcePaths = Arrays .stream(sourcePaths.split(",")) - .filter(p -> exists(sc, p)) + .filter(p -> HdfsSupport.exists(p, sc.hadoopConfiguration())) .collect(Collectors.toList()); log.info("Generate entities from files:"); @@ -103,7 +104,7 @@ public class GenerateEntitiesApplication { inputRdd .mapToPair(oaf -> new Tuple2<>(ModelSupport.idFn().apply(oaf), oaf)) - .reduceByKey((o1, o2) -> merge(o1, o2)) + .reduceByKey((o1, o2) -> OafMapperUtils.merge(o1, o2)) .map(Tuple2::_2) .map( oaf -> oaf.getClass().getSimpleName().toLowerCase() @@ -112,38 +113,6 @@ public class GenerateEntitiesApplication { .saveAsTextFile(targetPath, GzipCodec.class); } - private static Oaf merge(final Oaf o1, final Oaf o2) { - if (ModelSupport.isSubClass(o1, OafEntity.class)) { - if (ModelSupport.isSubClass(o1, Result.class)) { - - return mergeResults((Result) o1, (Result) o2); - } else if (ModelSupport.isSubClass(o1, Datasource.class)) { - ((Datasource) o1).mergeFrom((Datasource) o2); - } else if (ModelSupport.isSubClass(o1, Organization.class)) { - ((Organization) o1).mergeFrom((Organization) o2); - } else if (ModelSupport.isSubClass(o1, Project.class)) { - ((Project) o1).mergeFrom((Project) o2); - } else { - throw new RuntimeException("invalid OafEntity subtype:" + o1.getClass().getCanonicalName()); - } - } else if (ModelSupport.isSubClass(o1, Relation.class)) { - ((Relation) o1).mergeFrom((Relation) o2); - } else { - throw new RuntimeException("invalid Oaf type:" + o1.getClass().getCanonicalName()); - } - return o1; - } - - protected static Result mergeResults(Result r1, Result r2) { - if (new ResultTypeComparator().compare(r1, r2) < 0) { - r1.mergeFrom(r2); - return r1; - } else { - r2.mergeFrom(r1); - return r2; - } - } - private static List convertToListOaf( final String id, final String s, @@ -192,17 +161,4 @@ public class GenerateEntitiesApplication { } } - private static boolean exists(final JavaSparkContext context, final String pathToFile) { - try { - final FileSystem hdfs = FileSystem.get(context.hadoopConfiguration()); - final Path path = new Path(pathToFile); - return hdfs.exists(path); - } catch (final IOException e) { - throw new RuntimeException(e); - } - } - - private static void removeOutputDir(final SparkSession spark, final String path) { - HdfsSupport.remove(path, spark.sparkContext().hadoopConfiguration()); - } } diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/MigrateDbEntitiesApplication.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/MigrateDbEntitiesApplication.java index c76ccb0cf9..b6210013cb 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/MigrateDbEntitiesApplication.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/MigrateDbEntitiesApplication.java @@ -1,6 +1,7 @@ package eu.dnetlib.dhp.oa.graph.raw; +import static eu.dnetlib.dhp.schema.common.ModelConstants.DATASET_DEFAULT_RESULTTYPE; import static eu.dnetlib.dhp.schema.common.ModelConstants.DATASOURCE_ORGANIZATION; import static eu.dnetlib.dhp.schema.common.ModelConstants.DNET_PROVENANCE_ACTIONS; import static eu.dnetlib.dhp.schema.common.ModelConstants.ENTITYREGISTRY_PROVENANCE_ACTION; @@ -9,25 +10,20 @@ import static eu.dnetlib.dhp.schema.common.ModelConstants.IS_PARTICIPANT; import static eu.dnetlib.dhp.schema.common.ModelConstants.IS_PRODUCED_BY; import static eu.dnetlib.dhp.schema.common.ModelConstants.IS_PROVIDED_BY; import static eu.dnetlib.dhp.schema.common.ModelConstants.IS_RELATED_TO; +import static eu.dnetlib.dhp.schema.common.ModelConstants.ORP_DEFAULT_RESULTTYPE; import static eu.dnetlib.dhp.schema.common.ModelConstants.OUTCOME; import static eu.dnetlib.dhp.schema.common.ModelConstants.PARTICIPATION; import static eu.dnetlib.dhp.schema.common.ModelConstants.PRODUCES; import static eu.dnetlib.dhp.schema.common.ModelConstants.PROJECT_ORGANIZATION; import static eu.dnetlib.dhp.schema.common.ModelConstants.PROVIDES; import static eu.dnetlib.dhp.schema.common.ModelConstants.PROVISION; +import static eu.dnetlib.dhp.schema.common.ModelConstants.PUBLICATION_DEFAULT_RESULTTYPE; import static eu.dnetlib.dhp.schema.common.ModelConstants.RELATIONSHIP; import static eu.dnetlib.dhp.schema.common.ModelConstants.RESULT_PROJECT; import static eu.dnetlib.dhp.schema.common.ModelConstants.RESULT_RESULT; +import static eu.dnetlib.dhp.schema.common.ModelConstants.SOFTWARE_DEFAULT_RESULTTYPE; import static eu.dnetlib.dhp.schema.common.ModelConstants.USER_CLAIM; -import static eu.dnetlib.dhp.schema.oaf.OafMapperUtils.asString; -import static eu.dnetlib.dhp.schema.oaf.OafMapperUtils.createOpenaireId; -import static eu.dnetlib.dhp.schema.oaf.OafMapperUtils.dataInfo; -import static eu.dnetlib.dhp.schema.oaf.OafMapperUtils.field; -import static eu.dnetlib.dhp.schema.oaf.OafMapperUtils.journal; -import static eu.dnetlib.dhp.schema.oaf.OafMapperUtils.listFields; -import static eu.dnetlib.dhp.schema.oaf.OafMapperUtils.listKeyValues; -import static eu.dnetlib.dhp.schema.oaf.OafMapperUtils.qualifier; -import static eu.dnetlib.dhp.schema.oaf.OafMapperUtils.structuredProperty; +import static eu.dnetlib.dhp.schema.oaf.OafMapperUtils.*; import java.io.Closeable; import java.io.IOException; @@ -442,26 +438,22 @@ public class MigrateDbEntitiesApplication extends AbstractMigrationApplication i createOpenaireId(10, "infrastruct_::openaire", true), "OpenAIRE"); try { - final String targetType = rs.getString(TARGET_TYPE); if (rs.getString(SOURCE_TYPE).equals("context")) { final Result r; - switch (targetType) { - case "dataset": - r = new Dataset(); - break; - case "software": - r = new Software(); - break; - case "other": - r = new OtherResearchProduct(); - break; - case "publication": - default: - r = new Publication(); - break; + if (rs.getString(TARGET_TYPE).equals("dataset")) { + r = new Dataset(); + r.setResulttype(DATASET_DEFAULT_RESULTTYPE); + } else if (rs.getString(TARGET_TYPE).equals("software")) { + r = new Software(); + r.setResulttype(SOFTWARE_DEFAULT_RESULTTYPE); + } else if (rs.getString(TARGET_TYPE).equals("other")) { + r = new OtherResearchProduct(); + r.setResulttype(ORP_DEFAULT_RESULTTYPE); + } else { + r = new Publication(); + r.setResulttype(PUBLICATION_DEFAULT_RESULTTYPE); } - r.setId(createOpenaireId(50, rs.getString("target_id"), false)); r.setLastupdatetimestamp(lastUpdateTimestamp); r.setContext(prepareContext(rs.getString("source_id"), info)); @@ -471,7 +463,7 @@ public class MigrateDbEntitiesApplication extends AbstractMigrationApplication i return Arrays.asList(r); } else { final String sourceId = createOpenaireId(rs.getString(SOURCE_TYPE), rs.getString("source_id"), false); - final String targetId = createOpenaireId(targetType, rs.getString("target_id"), false); + final String targetId = createOpenaireId(rs.getString(TARGET_TYPE), rs.getString("target_id"), false); final Relation r1 = new Relation(); final Relation r2 = new Relation(); @@ -527,9 +519,12 @@ public class MigrateDbEntitiesApplication extends AbstractMigrationApplication i final Boolean deletedbyinference = rs.getBoolean("deletedbyinference"); final String inferenceprovenance = rs.getString("inferenceprovenance"); final Boolean inferred = rs.getBoolean("inferred"); - final String trust = rs.getString("trust"); + + final double trust = rs.getDouble("trust"); + return dataInfo( - deletedbyinference, inferenceprovenance, inferred, false, ENTITYREGISTRY_PROVENANCE_ACTION, trust); + deletedbyinference, inferenceprovenance, inferred, false, ENTITYREGISTRY_PROVENANCE_ACTION, + String.format("%.3f", trust)); } private Qualifier prepareQualifierSplitting(final String s) { diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/OafToOafMapper.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/OafToOafMapper.java index a1798e4fba..af1a9aec6a 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/OafToOafMapper.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/OafToOafMapper.java @@ -2,9 +2,7 @@ package eu.dnetlib.dhp.oa.graph.raw; import static eu.dnetlib.dhp.schema.common.ModelConstants.*; -import static eu.dnetlib.dhp.schema.oaf.OafMapperUtils.createOpenaireId; -import static eu.dnetlib.dhp.schema.oaf.OafMapperUtils.field; -import static eu.dnetlib.dhp.schema.oaf.OafMapperUtils.structuredProperty; +import static eu.dnetlib.dhp.schema.oaf.OafMapperUtils.*; import java.util.ArrayList; import java.util.List; @@ -18,9 +16,9 @@ import org.dom4j.Node; import com.google.common.collect.Lists; import eu.dnetlib.dhp.common.PacePerson; +import eu.dnetlib.dhp.oa.graph.clean.CleaningFunctions; import eu.dnetlib.dhp.oa.graph.raw.common.VocabularyGroup; import eu.dnetlib.dhp.schema.oaf.*; -import eu.dnetlib.dhp.schema.oaf.utils.IdentifierFactory; public class OafToOafMapper extends AbstractMdRecordToOafMapper { diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/OdfToOafMapper.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/OdfToOafMapper.java index b7ee2d546b..25ff4ae884 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/OdfToOafMapper.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/OdfToOafMapper.java @@ -2,15 +2,9 @@ package eu.dnetlib.dhp.oa.graph.raw; import static eu.dnetlib.dhp.schema.common.ModelConstants.*; -import static eu.dnetlib.dhp.schema.oaf.OafMapperUtils.createOpenaireId; -import static eu.dnetlib.dhp.schema.oaf.OafMapperUtils.field; -import static eu.dnetlib.dhp.schema.oaf.OafMapperUtils.structuredProperty; +import static eu.dnetlib.dhp.schema.oaf.OafMapperUtils.*; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.HashSet; -import java.util.List; -import java.util.Set; +import java.util.*; import java.util.stream.Collectors; import org.apache.commons.lang3.StringUtils; @@ -18,9 +12,9 @@ import org.dom4j.Document; import org.dom4j.Node; import eu.dnetlib.dhp.common.PacePerson; +import eu.dnetlib.dhp.oa.graph.clean.CleaningFunctions; import eu.dnetlib.dhp.oa.graph.raw.common.VocabularyGroup; import eu.dnetlib.dhp.schema.oaf.*; -import eu.dnetlib.dhp.schema.oaf.utils.IdentifierFactory; public class OdfToOafMapper extends AbstractMdRecordToOafMapper { diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/clean/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/clean/oozie_app/workflow.xml index 7329df29a4..992d8c40ee 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/clean/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/clean/oozie_app/workflow.xml @@ -50,12 +50,36 @@ - + Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] + + + yarn + cluster + group graph entities and relations + eu.dnetlib.dhp.oa.graph.clean.GroupEntitiesAndRelationsSparkJob + dhp-graph-mapper-${projectVersion}.jar + + --executor-cores=${sparkExecutorCores} + --executor-memory=${sparkExecutorMemory} + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.shuffle.partitions=7680 + + --graphInputPath${graphInputPath} + --outputPath${workingDir}/grouped_entities + + + + + @@ -84,7 +108,7 @@ --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.sql.shuffle.partitions=7680 - --inputPath${graphInputPath}/publication + --inputPath${workingDir}/grouped_entities --outputPath${graphOutputPath}/publication --graphTableClassNameeu.dnetlib.dhp.schema.oaf.Publication --isLookupUrl${isLookupUrl} @@ -110,7 +134,7 @@ --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.sql.shuffle.partitions=7680 - --inputPath${graphInputPath}/dataset + --inputPath${workingDir}/grouped_entities --outputPath${graphOutputPath}/dataset --graphTableClassNameeu.dnetlib.dhp.schema.oaf.Dataset --isLookupUrl${isLookupUrl} @@ -136,7 +160,7 @@ --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.sql.shuffle.partitions=7680 - --inputPath${graphInputPath}/otherresearchproduct + --inputPath${workingDir}/grouped_entities --outputPath${graphOutputPath}/otherresearchproduct --graphTableClassNameeu.dnetlib.dhp.schema.oaf.OtherResearchProduct --isLookupUrl${isLookupUrl} @@ -162,7 +186,7 @@ --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.sql.shuffle.partitions=7680 - --inputPath${graphInputPath}/software + --inputPath${workingDir}/grouped_entities --outputPath${graphOutputPath}/software --graphTableClassNameeu.dnetlib.dhp.schema.oaf.Software --isLookupUrl${isLookupUrl} @@ -188,7 +212,7 @@ --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.sql.shuffle.partitions=7680 - --inputPath${graphInputPath}/datasource + --inputPath${workingDir}/grouped_entities --outputPath${graphOutputPath}/datasource --graphTableClassNameeu.dnetlib.dhp.schema.oaf.Datasource --isLookupUrl${isLookupUrl} @@ -214,7 +238,7 @@ --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.sql.shuffle.partitions=7680 - --inputPath${graphInputPath}/organization + --inputPath${workingDir}/grouped_entities --outputPath${graphOutputPath}/organization --graphTableClassNameeu.dnetlib.dhp.schema.oaf.Organization --isLookupUrl${isLookupUrl} @@ -240,7 +264,7 @@ --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.sql.shuffle.partitions=7680 - --inputPath${graphInputPath}/project + --inputPath${workingDir}/grouped_entities --outputPath${graphOutputPath}/project --graphTableClassNameeu.dnetlib.dhp.schema.oaf.Project --isLookupUrl${isLookupUrl} @@ -266,7 +290,7 @@ --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.sql.shuffle.partitions=7680 - --inputPath${graphInputPath}/relation + --inputPath${workingDir}/grouped_entities --outputPath${graphOutputPath}/relation --graphTableClassNameeu.dnetlib.dhp.schema.oaf.Relation --isLookupUrl${isLookupUrl} diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/group_graph_entities_parameters.json b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/group_graph_entities_parameters.json new file mode 100644 index 0000000000..e65acb3c43 --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/group_graph_entities_parameters.json @@ -0,0 +1,20 @@ +[ + { + "paramName": "issm", + "paramLongName": "isSparkSessionManaged", + "paramDescription": "when true will stop SparkSession after job execution", + "paramRequired": false + }, + { + "paramName": "gin", + "paramLongName": "graphInputPath", + "paramDescription": "the graph root path", + "paramRequired": true + }, + { + "paramName": "out", + "paramLongName": "outputPath", + "paramDescription": "the output merged graph root path", + "paramRequired": true + } +] \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/merge/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/merge/oozie_app/workflow.xml index 07a125fb64..86fb510420 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/merge/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/merge/oozie_app/workflow.xml @@ -2,11 +2,11 @@ - betaInputGgraphPath + betaInputGraphPath the beta graph root path - prodInputGgraphPath + prodInputGraphPath the production graph root path @@ -76,7 +76,7 @@ yarn cluster Merge publications - eu.dnetlib.dhp.oa.graph.merge.MergeGraphSparkJob + eu.dnetlib.dhp.oa.graph.merge.MergeGraphTableSparkJob dhp-graph-mapper-${projectVersion}.jar --executor-cores=${sparkExecutorCores} @@ -88,8 +88,8 @@ --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.sql.shuffle.partitions=7680 - --betaInputPath${betaInputGgraphPath}/publication - --prodInputPath${prodInputGgraphPath}/publication + --betaInputPath${betaInputGraphPath}/publication + --prodInputPath${prodInputGraphPath}/publication --outputPath${graphOutputPath}/publication --graphTableClassNameeu.dnetlib.dhp.schema.oaf.Publication --priority${priority} @@ -103,7 +103,7 @@ yarn cluster Merge datasets - eu.dnetlib.dhp.oa.graph.merge.MergeGraphSparkJob + eu.dnetlib.dhp.oa.graph.merge.MergeGraphTableSparkJob dhp-graph-mapper-${projectVersion}.jar --executor-cores=${sparkExecutorCores} @@ -115,8 +115,8 @@ --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.sql.shuffle.partitions=7680 - --betaInputPath${betaInputGgraphPath}/dataset - --prodInputPath${prodInputGgraphPath}/dataset + --betaInputPath${betaInputGraphPath}/dataset + --prodInputPath${prodInputGraphPath}/dataset --outputPath${graphOutputPath}/dataset --graphTableClassNameeu.dnetlib.dhp.schema.oaf.Dataset --priority${priority} @@ -130,7 +130,7 @@ yarn cluster Merge otherresearchproducts - eu.dnetlib.dhp.oa.graph.merge.MergeGraphSparkJob + eu.dnetlib.dhp.oa.graph.merge.MergeGraphTableSparkJob dhp-graph-mapper-${projectVersion}.jar --executor-cores=${sparkExecutorCores} @@ -142,8 +142,8 @@ --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.sql.shuffle.partitions=7680 - --betaInputPath${betaInputGgraphPath}/otherresearchproduct - --prodInputPath${prodInputGgraphPath}/otherresearchproduct + --betaInputPath${betaInputGraphPath}/otherresearchproduct + --prodInputPath${prodInputGraphPath}/otherresearchproduct --outputPath${graphOutputPath}/otherresearchproduct --graphTableClassNameeu.dnetlib.dhp.schema.oaf.OtherResearchProduct --priority${priority} @@ -157,7 +157,7 @@ yarn cluster Merge softwares - eu.dnetlib.dhp.oa.graph.merge.MergeGraphSparkJob + eu.dnetlib.dhp.oa.graph.merge.MergeGraphTableSparkJob dhp-graph-mapper-${projectVersion}.jar --executor-cores=${sparkExecutorCores} @@ -169,8 +169,8 @@ --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.sql.shuffle.partitions=7680 - --betaInputPath${betaInputGgraphPath}/software - --prodInputPath${prodInputGgraphPath}/software + --betaInputPath${betaInputGraphPath}/software + --prodInputPath${prodInputGraphPath}/software --outputPath${graphOutputPath}/software --graphTableClassNameeu.dnetlib.dhp.schema.oaf.Software --priority${priority} @@ -184,7 +184,7 @@ yarn cluster Merge datasources - eu.dnetlib.dhp.oa.graph.merge.MergeGraphSparkJob + eu.dnetlib.dhp.oa.graph.merge.MergeGraphTableSparkJob dhp-graph-mapper-${projectVersion}.jar --executor-cores=${sparkExecutorCores} @@ -196,8 +196,8 @@ --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.sql.shuffle.partitions=7680 - --betaInputPath${betaInputGgraphPath}/datasource - --prodInputPath${prodInputGgraphPath}/datasource + --betaInputPath${betaInputGraphPath}/datasource + --prodInputPath${prodInputGraphPath}/datasource --outputPath${graphOutputPath}/datasource --graphTableClassNameeu.dnetlib.dhp.schema.oaf.Datasource --priority${priority} @@ -211,7 +211,7 @@ yarn cluster Merge organizations - eu.dnetlib.dhp.oa.graph.merge.MergeGraphSparkJob + eu.dnetlib.dhp.oa.graph.merge.MergeGraphTableSparkJob dhp-graph-mapper-${projectVersion}.jar --executor-cores=${sparkExecutorCores} @@ -223,8 +223,8 @@ --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.sql.shuffle.partitions=7680 - --betaInputPath${betaInputGgraphPath}/organization - --prodInputPath${prodInputGgraphPath}/organization + --betaInputPath${betaInputGraphPath}/organization + --prodInputPath${prodInputGraphPath}/organization --outputPath${graphOutputPath}/organization --graphTableClassNameeu.dnetlib.dhp.schema.oaf.Organization --priority${priority} @@ -238,7 +238,7 @@ yarn cluster Merge projects - eu.dnetlib.dhp.oa.graph.merge.MergeGraphSparkJob + eu.dnetlib.dhp.oa.graph.merge.MergeGraphTableSparkJob dhp-graph-mapper-${projectVersion}.jar --executor-cores=${sparkExecutorCores} @@ -250,8 +250,8 @@ --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.sql.shuffle.partitions=7680 - --betaInputPath${betaInputGgraphPath}/project - --prodInputPath${prodInputGgraphPath}/project + --betaInputPath${betaInputGraphPath}/project + --prodInputPath${prodInputGraphPath}/project --outputPath${graphOutputPath}/project --graphTableClassNameeu.dnetlib.dhp.schema.oaf.Project --priority${priority} @@ -265,7 +265,7 @@ yarn cluster Merge relations - eu.dnetlib.dhp.oa.graph.merge.MergeGraphSparkJob + eu.dnetlib.dhp.oa.graph.merge.MergeGraphTableSparkJob dhp-graph-mapper-${projectVersion}.jar --executor-cores=${sparkExecutorCores} @@ -277,8 +277,8 @@ --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.sql.shuffle.partitions=7680 - --betaInputPath${betaInputGgraphPath}/relation - --prodInputPath${prodInputGgraphPath}/relation + --betaInputPath${betaInputGraphPath}/relation + --prodInputPath${prodInputGraphPath}/relation --outputPath${graphOutputPath}/relation --graphTableClassNameeu.dnetlib.dhp.schema.oaf.Relation --priority${priority} diff --git a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/clean/CleaningFunctionTest.java b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/clean/CleaningFunctionTest.java index 5ad8f2ac7b..cb34b0cb3c 100644 --- a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/clean/CleaningFunctionTest.java +++ b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/clean/CleaningFunctionTest.java @@ -19,7 +19,10 @@ import org.mockito.junit.jupiter.MockitoExtension; import com.fasterxml.jackson.databind.ObjectMapper; import eu.dnetlib.dhp.oa.graph.raw.common.VocabularyGroup; -import eu.dnetlib.dhp.schema.oaf.*; +import eu.dnetlib.dhp.schema.oaf.Publication; +import eu.dnetlib.dhp.schema.oaf.Qualifier; +import eu.dnetlib.dhp.schema.oaf.Result; +import eu.dnetlib.dhp.schema.oaf.StructuredProperty; import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException; import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; diff --git a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/merge/MergeGraphSparkJobTest.java b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/merge/MergeGraphTableSparkJobTest.java similarity index 90% rename from dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/merge/MergeGraphSparkJobTest.java rename to dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/merge/MergeGraphTableSparkJobTest.java index 28e8e5abce..0089811cf8 100644 --- a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/merge/MergeGraphSparkJobTest.java +++ b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/merge/MergeGraphTableSparkJobTest.java @@ -15,7 +15,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import eu.dnetlib.dhp.schema.oaf.Datasource; -public class MergeGraphSparkJobTest { +public class MergeGraphTableSparkJobTest { private ObjectMapper mapper; @@ -28,7 +28,7 @@ public class MergeGraphSparkJobTest { public void testMergeDatasources() throws IOException { assertEquals( "openaire-cris_1.1", - MergeGraphSparkJob + MergeGraphTableSparkJob .mergeDatasource( d("datasource_cris.json"), d("datasource_UNKNOWN.json")) @@ -36,7 +36,7 @@ public class MergeGraphSparkJobTest { .getClassid()); assertEquals( "openaire-cris_1.1", - MergeGraphSparkJob + MergeGraphTableSparkJob .mergeDatasource( d("datasource_UNKNOWN.json"), d("datasource_cris.json")) @@ -44,7 +44,7 @@ public class MergeGraphSparkJobTest { .getClassid()); assertEquals( "driver-openaire2.0", - MergeGraphSparkJob + MergeGraphTableSparkJob .mergeDatasource( d("datasource_native.json"), d("datasource_driver-openaire2.0.json")) @@ -52,7 +52,7 @@ public class MergeGraphSparkJobTest { .getClassid()); assertEquals( "driver-openaire2.0", - MergeGraphSparkJob + MergeGraphTableSparkJob .mergeDatasource( d("datasource_driver-openaire2.0.json"), d("datasource_native.json")) @@ -60,7 +60,7 @@ public class MergeGraphSparkJobTest { .getClassid()); assertEquals( "openaire4.0", - MergeGraphSparkJob + MergeGraphTableSparkJob .mergeDatasource( d("datasource_notCompatible.json"), d("datasource_openaire4.0.json")) @@ -68,7 +68,7 @@ public class MergeGraphSparkJobTest { .getClassid()); assertEquals( "notCompatible", - MergeGraphSparkJob + MergeGraphTableSparkJob .mergeDatasource( d("datasource_notCompatible.json"), d("datasource_UNKNOWN.json")) diff --git a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/raw/GenerateEntitiesApplicationTest.java b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/raw/GenerateEntitiesApplicationTest.java index a028738ea2..705f1dddbb 100644 --- a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/raw/GenerateEntitiesApplicationTest.java +++ b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/raw/GenerateEntitiesApplicationTest.java @@ -70,7 +70,7 @@ public class GenerateEntitiesApplicationTest { protected void verifyMerge(Result publication, Result dataset, Class clazz, String resultType) { - final Result merge = GenerateEntitiesApplication.mergeResults(publication, dataset); + final Result merge = OafMapperUtils.mergeResults(publication, dataset); assertTrue(clazz.isAssignableFrom(merge.getClass())); assertEquals(resultType, merge.getResulttype().getClassid()); } diff --git a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/raw/MappersTest.java b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/raw/MappersTest.java index 1f5936ac30..4e4a21fa9f 100644 --- a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/raw/MappersTest.java +++ b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/raw/MappersTest.java @@ -72,6 +72,8 @@ public class MappersTest { assertTrue(StringUtils.isNotBlank(p.getTitle().get(0).getValue())); assertFalse(p.getDataInfo().getInvisible()); assertTrue(p.getSource().size() == 1); + assertTrue(StringUtils.isNotBlank(p.getDateofcollection())); + assertTrue(StringUtils.isNotBlank(p.getDateoftransformation())); assertTrue(p.getAuthor().size() > 0); final Optional author = p @@ -317,7 +319,7 @@ public class MappersTest { @Test void testODFRecord() throws IOException { final String xml = IOUtils.toString(getClass().getResourceAsStream("odf_record.xml")); - List list = new OdfToOafMapper(vocs, false).processMdRecord(xml); + final List list = new OdfToOafMapper(vocs, false).processMdRecord(xml); System.out.println("***************"); System.out.println(new ObjectMapper().writeValueAsString(list)); System.out.println("***************"); @@ -328,6 +330,22 @@ public class MappersTest { assertTrue(StringUtils.isNotBlank(p.getTitle().get(0).getValue())); } + @Test + void testTextGrid() throws IOException { + final String xml = IOUtils.toString(getClass().getResourceAsStream("textgrid.xml")); + final List list = new OdfToOafMapper(vocs, false).processMdRecord(xml); + + System.out.println("***************"); + System.out.println(new ObjectMapper().writeValueAsString(list)); + System.out.println("***************"); + + final Dataset p = (Dataset) list.get(0); + assertValidId(p.getId()); + assertValidId(p.getCollectedfrom().get(0).getKey()); + assertTrue(StringUtils.isNotBlank(p.getTitle().get(0).getValue())); + System.out.println(p.getTitle().get(0).getValue()); + } + private void assertValidId(final String id) { assertEquals(49, id.length()); assertEquals('|', id.charAt(2)); diff --git a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/raw/MigrateDbEntitiesApplicationTest.java b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/raw/MigrateDbEntitiesApplicationTest.java index 3c0e2ce8e2..9cf75f2080 100644 --- a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/raw/MigrateDbEntitiesApplicationTest.java +++ b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/raw/MigrateDbEntitiesApplicationTest.java @@ -28,13 +28,7 @@ import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; import eu.dnetlib.dhp.oa.graph.raw.common.VocabularyGroup; -import eu.dnetlib.dhp.schema.oaf.Datasource; -import eu.dnetlib.dhp.schema.oaf.Oaf; -import eu.dnetlib.dhp.schema.oaf.OafMapperUtils; -import eu.dnetlib.dhp.schema.oaf.Organization; -import eu.dnetlib.dhp.schema.oaf.Project; -import eu.dnetlib.dhp.schema.oaf.Relation; -import eu.dnetlib.dhp.schema.oaf.Result; +import eu.dnetlib.dhp.schema.oaf.*; @ExtendWith(MockitoExtension.class) public class MigrateDbEntitiesApplicationTest { diff --git a/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/raw/datasourceorganization_resultset_entry.json b/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/raw/datasourceorganization_resultset_entry.json index 2baf7c8f17..06b0d483bc 100644 --- a/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/raw/datasourceorganization_resultset_entry.json +++ b/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/raw/datasourceorganization_resultset_entry.json @@ -31,8 +31,8 @@ }, { "field": "trust", - "type": "string", - "value": "0.9" + "type": "double", + "value": 0.9 }, { "field": "inferenceprovenance", diff --git a/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/raw/datasources_resultset_entry.json b/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/raw/datasources_resultset_entry.json index 8f8aed3a0e..befa722e1d 100644 --- a/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/raw/datasources_resultset_entry.json +++ b/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/raw/datasources_resultset_entry.json @@ -114,8 +114,8 @@ }, { "field": "trust", - "type": "string", - "value": "0.9" + "type": "double", + "value": 0.9 }, { "field": "inferenceprovenance", diff --git a/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/raw/oaf_record.xml b/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/raw/oaf_record.xml index 3b2658bcf3..2c6c98ebb3 100644 --- a/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/raw/oaf_record.xml +++ b/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/raw/oaf_record.xml @@ -7,13 +7,12 @@
pensoft_____::00ea4a1cd53806a97d62ea6bf268f2a2 10.3897/oneeco.2.e13718 - - 2020-03-23T00:20:51.392Z - 2020-03-23T00:26:59.078Z + 2020-03-23T00:20:51.392Z + 2020-03-23T00:26:59.078Z pensoft_____
diff --git a/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/raw/organizations_resultset_entry.json b/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/raw/organizations_resultset_entry.json index 38657a1e1f..811a9079fd 100644 --- a/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/raw/organizations_resultset_entry.json +++ b/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/raw/organizations_resultset_entry.json @@ -96,8 +96,8 @@ }, { "field": "trust", - "type": "string", - "value": "0.9" + "type": "double", + "value": 0.9 }, { "field": "inferenceprovenance", diff --git a/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/raw/projectorganization_resultset_entry.json b/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/raw/projectorganization_resultset_entry.json index 4311086e74..a3305926df 100644 --- a/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/raw/projectorganization_resultset_entry.json +++ b/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/raw/projectorganization_resultset_entry.json @@ -41,8 +41,8 @@ }, { "field": "trust", - "type": "string", - "value": "0.9" + "type": "double", + "value": 0.9 }, { "field": "inferenceprovenance", diff --git a/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/raw/projects_resultset_entry.json b/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/raw/projects_resultset_entry.json index a25215ca36..818bf3e58a 100644 --- a/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/raw/projects_resultset_entry.json +++ b/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/raw/projects_resultset_entry.json @@ -86,8 +86,8 @@ }, { "field": "trust", - "type": "string", - "value": "0.9" + "type": "double", + "value": 0.9 }, { "field": "inferenceprovenance", diff --git a/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/raw/textgrid.xml b/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/raw/textgrid.xml new file mode 100644 index 0000000000..d6970ab3ee --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/raw/textgrid.xml @@ -0,0 +1,113 @@ + + + + r3f52792889d::000051aa1f61d77d2c0b340091f8024e + textgrid:q9cv.0 + 2020-11-17T09:34:11.128+01:00 + r3f52792889d + textgrid:q9cv.0 + 2012-01-21T13:35:20Z + 2020-11-17T09:46:21.551+01:00 + + + + hdl:11858/00-1734-0000-0003-7664-F + + + Hoffmann von Fallersleben, August Heinrich + 118552589 + + + + Mailied + August Heinrich Hoffmann von Fallersleben: Unpolitische Lieder von Hoffmann von Fallersleben, 1. + 2. Theil, 1. Theil, Hamburg: Hoffmann und Campe, 1841. + + TextGrid + 2012 + + + tvitt@textgrid.de + + + Digitale Bibliothek + TGPR-372fe6dc-57f2-6cd4-01b5-2c4bbefcfd3c + + + + 2012-01-21T13:35:20Z + 2012-01-21T13:35:20Z + 2012-01-21T13:35:20Z + + + + textgrid:q9cv.0 + http://hdl.handle.net/hdl:11858/00-1734-0000-0003-7664-F + + + hdl:11858/00-1734-0000-0003-7666-B + + + 527 Bytes + + + text/tg.edition+tg.aggregation+xml + + 0 + + Der annotierte Datenbestand der Digitalen Bibliothek inklusive + Metadaten sowie davon einzeln zugängliche Teile sind eine Abwandlung + des Datenbestandes von www.editura.de durch TextGrid und werden + unter der Lizenz Creative Commons Namensnennung 3.0 Deutschland + Lizenz (by-Nennung TextGrid) veröffentlicht. Die Lizenz bezieht sich + nicht auf die der Annotation zu Grunde liegenden allgemeinfreien + Texte (Siehe auch Punkt 2 der Lizenzbestimmungen). + + + + + + + + Hamburg + + + + hdl:11858/00-1734-0000-0003-7664-F + 0021 + 0002 + 2012-01-01 + OPEN + http://creativecommons.org/licenses/by/3.0/de/legalcode + und + + + + + + + https%3A%2F%2Fdev.textgridlab.org%2F1.0%2Ftgoaipmh%2Foai + textgrid:q9cv.0 + 2012-01-21T13:35:20Z + http://schema.datacite.org/oai/oai-1.0/ + + + + false + false + 0.9 + + + + + \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-provision/pom.xml b/dhp-workflows/dhp-graph-provision/pom.xml index 8194d4d016..1547056b94 100644 --- a/dhp-workflows/dhp-graph-provision/pom.xml +++ b/dhp-workflows/dhp-graph-provision/pom.xml @@ -22,6 +22,12 @@ com.jayway.jsonpath json-path + + + org.slf4j + slf4j-api + + dom4j @@ -82,9 +88,6 @@ org.codehaus.woodstox * - - - com.github.ben-manes.caffeine * @@ -109,11 +112,10 @@ org.apache.hadoop * - - - - - + + org.apache.zookeeper + zookeeper + diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/ProvisionConstants.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/ProvisionConstants.java new file mode 100644 index 0000000000..28c1111d67 --- /dev/null +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/ProvisionConstants.java @@ -0,0 +1,14 @@ + +package eu.dnetlib.dhp.oa.provision; + +public class ProvisionConstants { + + public static final String LAYOUT = "index"; + public static final String INTERPRETATION = "openaire"; + public static final String SEPARATOR = "-"; + + public static String getCollectionName(String format) { + return format + SEPARATOR + LAYOUT + SEPARATOR + INTERPRETATION; + } + +} diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/SolrAdminApplication.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/SolrAdminApplication.java index 8c8947298c..5fe452efef 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/SolrAdminApplication.java +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/SolrAdminApplication.java @@ -14,11 +14,12 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.oa.provision.utils.ISLookupClient; import eu.dnetlib.dhp.oa.provision.utils.ZkServers; import eu.dnetlib.dhp.utils.ISLookupClientFactory; import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; -public class SolrAdminApplication extends SolrApplication implements Closeable { +public class SolrAdminApplication implements Closeable { private static final Logger log = LoggerFactory.getLogger(SolrAdminApplication.class); @@ -54,12 +55,12 @@ public class SolrAdminApplication extends SolrApplication implements Closeable { .orElse(false); log.info("commit: {}", commit); - final ISLookUpService isLookup = ISLookupClientFactory.getLookUpService(isLookupUrl); + final ISLookupClient isLookup = new ISLookupClient(ISLookupClientFactory.getLookUpService(isLookupUrl)); - final String zkHost = getZkHost(isLookup); + final String zkHost = isLookup.getZkHost(); log.info("zkHost: {}", zkHost); - final String collection = format + SEPARATOR + LAYOUT + SEPARATOR + INTERPRETATION; + final String collection = ProvisionConstants.getCollectionName(format); log.info("collection: {}", collection); try (SolrAdminApplication app = new SolrAdminApplication(zkHost)) { diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/SolrApplication.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/SolrApplication.java deleted file mode 100644 index a824c6c2c8..0000000000 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/SolrApplication.java +++ /dev/null @@ -1,40 +0,0 @@ - -package eu.dnetlib.dhp.oa.provision; - -import org.apache.commons.lang3.StringUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException; -import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; - -public abstract class SolrApplication { - - private static final Logger log = LoggerFactory.getLogger(SolrApplication.class); - - protected static final String LAYOUT = "index"; - protected static final String INTERPRETATION = "openaire"; - protected static final String SEPARATOR = "-"; - protected static final String DATE_FORMAT = "yyyy-MM-dd'T'hh:mm:ss'Z'"; - - /** - * Method retrieves from the information system the zookeeper quorum of the Solr server - * - * @param isLookup - * @return the zookeeper quorum of the Solr server - * @throws ISLookUpException - */ - protected static String getZkHost(ISLookUpService isLookup) throws ISLookUpException { - return doLookup( - isLookup, - "for $x in /RESOURCE_PROFILE[.//RESOURCE_TYPE/@value='IndexServiceResourceType'] return $x//PROTOCOL[./@name='solr']/@address/string()"); - } - - protected static String doLookup(ISLookUpService isLookup, String xquery) throws ISLookUpException { - log.info(String.format("running xquery: %s", xquery)); - final String res = isLookup.getResourceProfileByQuery(xquery); - log.info(String.format("got response (100 chars): %s", StringUtils.left(res, 100) + " ...")); - return res; - } - -} diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/XmlConverterJob.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/XmlConverterJob.java index d8eba31b62..b44ed7446f 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/XmlConverterJob.java +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/XmlConverterJob.java @@ -2,12 +2,11 @@ package eu.dnetlib.dhp.oa.provision; import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; +import static eu.dnetlib.dhp.utils.DHPUtils.toSeq; -import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.Optional; -import java.util.stream.Collectors; import org.apache.commons.io.IOUtils; import org.apache.hadoop.io.Text; @@ -28,13 +27,11 @@ import com.google.common.collect.Maps; import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.common.HdfsSupport; -import eu.dnetlib.dhp.oa.provision.model.*; +import eu.dnetlib.dhp.oa.provision.model.JoinedEntity; +import eu.dnetlib.dhp.oa.provision.model.ProvisionModelSupport; import eu.dnetlib.dhp.oa.provision.utils.ContextMapper; import eu.dnetlib.dhp.oa.provision.utils.XmlRecordFactory; -import eu.dnetlib.dhp.schema.oaf.*; import scala.Tuple2; -import scala.collection.JavaConverters; -import scala.collection.Seq; /** * XmlConverterJob converts the JoinedEntities as XML records @@ -43,8 +40,6 @@ public class XmlConverterJob { private static final Logger log = LoggerFactory.getLogger(XmlConverterJob.class); - private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); - public static final String schemaLocation = "https://www.openaire.eu/schema/1.0/oaf-1.0.xsd"; public static void main(String[] args) throws Exception { @@ -129,10 +124,6 @@ public class XmlConverterJob { HdfsSupport.remove(path, spark.sparkContext().hadoopConfiguration()); } - private static Seq toSeq(List list) { - return JavaConverters.asScalaIteratorConverter(list.iterator()).asScala().toSeq(); - } - private static Map prepareAccumulators(SparkContext sc) { Map accumulators = Maps.newHashMap(); accumulators diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/XmlIndexingJob.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/XmlIndexingJob.java index 5b55961621..48538c059f 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/XmlIndexingJob.java +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/XmlIndexingJob.java @@ -20,27 +20,42 @@ import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.io.Text; import org.apache.solr.common.SolrInputDocument; import org.apache.spark.SparkConf; +import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; -import org.apache.spark.rdd.RDD; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.SaveMode; +import org.apache.spark.sql.SparkSession; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.lucidworks.spark.util.SolrSupport; import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.oa.provision.model.SerializableSolrInputDocument; +import eu.dnetlib.dhp.oa.provision.utils.ISLookupClient; import eu.dnetlib.dhp.oa.provision.utils.StreamingInputDocumentFactory; import eu.dnetlib.dhp.utils.ISLookupClientFactory; import eu.dnetlib.dhp.utils.saxon.SaxonTransformerFactory; -import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpDocumentNotFoundException; import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException; -import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; -public class XmlIndexingJob extends SolrApplication { +public class XmlIndexingJob { private static final Logger log = LoggerFactory.getLogger(XmlIndexingJob.class); private static final Integer DEFAULT_BATCH_SIZE = 1000; + protected static final String DATE_FORMAT = "yyyy-MM-dd'T'hh:mm:ss'Z'"; + + private String inputPath; + + private String format; + + private int batchSize; + + private String outputPath; + + private SparkSession spark; + public static void main(String[] args) throws Exception { final ArgumentApplicationParser parser = new ArgumentApplicationParser( @@ -60,27 +75,53 @@ public class XmlIndexingJob extends SolrApplication { final String inputPath = parser.get("inputPath"); log.info("inputPath: {}", inputPath); - final String isLookupUrl = parser.get("isLookupUrl"); - log.info("isLookupUrl: {}", isLookupUrl); - final String format = parser.get("format"); log.info("format: {}", format); + final String outputPath = Optional + .ofNullable(parser.get("outputPath")) + .orElse(null); + log.info("outputPath: {}", outputPath); + final Integer batchSize = parser.getObjectMap().containsKey("batchSize") ? Integer.valueOf(parser.get("batchSize")) : DEFAULT_BATCH_SIZE; log.info("batchSize: {}", batchSize); - final ISLookUpService isLookup = ISLookupClientFactory.getLookUpService(isLookupUrl); - final String fields = getLayoutSource(isLookup, format); + final SparkConf conf = new SparkConf(); + conf.registerKryoClasses(new Class[] { + SerializableSolrInputDocument.class + }); + + runWithSparkSession( + conf, + isSparkSessionManaged, + spark -> { + final String isLookupUrl = parser.get("isLookupUrl"); + log.info("isLookupUrl: {}", isLookupUrl); + final ISLookupClient isLookup = new ISLookupClient(ISLookupClientFactory.getLookUpService(isLookupUrl)); + new XmlIndexingJob(spark, inputPath, format, batchSize, outputPath).run(isLookup); + }); + } + + public XmlIndexingJob(SparkSession spark, String inputPath, String format, Integer batchSize, String outputPath) { + this.spark = spark; + this.inputPath = inputPath; + this.format = format; + this.batchSize = batchSize; + this.outputPath = outputPath; + } + + public void run(ISLookupClient isLookup) throws ISLookUpException, TransformerException { + final String fields = isLookup.getLayoutSource(format); log.info("fields: {}", fields); - final String xslt = getLayoutTransformer(isLookup); + final String xslt = isLookup.getLayoutTransformer(); - final String dsId = getDsId(format, isLookup); + final String dsId = isLookup.getDsId(format); log.info("dsId: {}", dsId); - final String zkHost = getZkHost(isLookup); + final String zkHost = isLookup.getZkHost(); log.info("zkHost: {}", zkHost); final String version = getRecordDatestamp(); @@ -88,24 +129,26 @@ public class XmlIndexingJob extends SolrApplication { final String indexRecordXslt = getLayoutTransformer(format, fields, xslt); log.info("indexRecordTransformer {}", indexRecordXslt); - final SparkConf conf = new SparkConf(); + final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); - runWithSparkSession( - conf, - isSparkSessionManaged, - spark -> { - final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); + JavaRDD docs = sc + .sequenceFile(inputPath, Text.class, Text.class) + .map(t -> t._2().toString()) + .map(s -> toIndexRecord(SaxonTransformerFactory.newInstance(indexRecordXslt), s)) + .map(s -> new StreamingInputDocumentFactory(version, dsId).parseDocument(s)); - RDD docs = sc - .sequenceFile(inputPath, Text.class, Text.class) - .map(t -> t._2().toString()) - .map(s -> toIndexRecord(SaxonTransformerFactory.newInstance(indexRecordXslt), s)) - .map(s -> new StreamingInputDocumentFactory(version, dsId).parseDocument(s)) - .rdd(); - - final String collection = format + SEPARATOR + LAYOUT + SEPARATOR + INTERPRETATION; - SolrSupport.indexDocs(zkHost, collection, batchSize, docs); - }); + if (StringUtils.isNotBlank(outputPath)) { + spark + .createDataset( + docs.map(s -> new SerializableSolrInputDocument(s)).rdd(), + Encoders.kryo(SerializableSolrInputDocument.class)) + .write() + .mode(SaveMode.Overwrite) + .parquet(outputPath); + } else { + final String collection = ProvisionConstants.getCollectionName(format); + SolrSupport.indexDocs(zkHost, collection, batchSize, docs.rdd()); + } } protected static String toIndexRecord(Transformer tr, final String record) { @@ -151,56 +194,4 @@ public class XmlIndexingJob extends SolrApplication { return new SimpleDateFormat(DATE_FORMAT).format(new Date()); } - /** - * Method retrieves from the information system the list of fields associated to the given MDFormat name - * - * @param isLookup the ISLookup service stub - * @param format the Metadata format name - * @return the string representation of the list of fields to be indexed - * @throws ISLookUpDocumentNotFoundException - * @throws ISLookUpException - */ - private static String getLayoutSource(final ISLookUpService isLookup, final String format) - throws ISLookUpDocumentNotFoundException, ISLookUpException { - return doLookup( - isLookup, - String - .format( - "collection('')//RESOURCE_PROFILE[.//RESOURCE_TYPE/@value = 'MDFormatDSResourceType' and .//NAME='%s']//LAYOUT[@name='%s']", - format, LAYOUT)); - } - - /** - * Method retrieves from the information system the openaireLayoutToRecordStylesheet - * - * @param isLookup the ISLookup service stub - * @return the string representation of the XSLT contained in the transformation rule profile - * @throws ISLookUpDocumentNotFoundException - * @throws ISLookUpException - */ - private static String getLayoutTransformer(ISLookUpService isLookup) throws ISLookUpException { - return doLookup( - isLookup, - "collection('/db/DRIVER/TransformationRuleDSResources/TransformationRuleDSResourceType')" - + "//RESOURCE_PROFILE[./BODY/CONFIGURATION/SCRIPT/TITLE/text() = 'openaireLayoutToRecordStylesheet']//CODE/node()"); - } - - /** - * Method retrieves from the information system the IndexDS profile ID associated to the given MDFormat name - * - * @param format - * @param isLookup - * @return the IndexDS identifier - * @throws ISLookUpException - */ - private static String getDsId(String format, ISLookUpService isLookup) throws ISLookUpException { - return doLookup( - isLookup, - String - .format( - "collection('/db/DRIVER/IndexDSResources/IndexDSResourceType')" - + "//RESOURCE_PROFILE[./BODY/CONFIGURATION/METADATA_FORMAT/text() = '%s']//RESOURCE_IDENTIFIER/@value/string()", - format)); - } - } diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/SerializableSolrInputDocument.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/SerializableSolrInputDocument.java new file mode 100644 index 0000000000..bbda1522e0 --- /dev/null +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/SerializableSolrInputDocument.java @@ -0,0 +1,23 @@ + +package eu.dnetlib.dhp.oa.provision.model; + +import java.util.HashMap; +import java.util.Map; + +import org.apache.solr.common.SolrInputDocument; +import org.apache.solr.common.SolrInputField; + +/** + * Wrapper class needed to make the SolrInputDocument compatible with the Kryo serialization mechanism. + */ +public class SerializableSolrInputDocument extends SolrInputDocument { + + public SerializableSolrInputDocument() { + super(new HashMap<>()); + } + + public SerializableSolrInputDocument(Map fields) { + super(fields); + } + +} diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/utils/ISLookupClient.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/utils/ISLookupClient.java new file mode 100644 index 0000000000..29a51cb29d --- /dev/null +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/utils/ISLookupClient.java @@ -0,0 +1,95 @@ + +package eu.dnetlib.dhp.oa.provision.utils; + +import org.apache.commons.lang3.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import eu.dnetlib.dhp.oa.provision.ProvisionConstants; +import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpDocumentNotFoundException; +import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException; +import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; + +public class ISLookupClient { + + private static final Logger log = LoggerFactory.getLogger(ISLookupClient.class); + + private ISLookUpService isLookup; + + public ISLookupClient(ISLookUpService isLookup) { + this.isLookup = isLookup; + } + + /** + * Method retrieves from the information system the list of fields associated to the given MDFormat name + * + * @param format the Metadata format name + * @return the string representation of the list of fields to be indexed + * @throws ISLookUpDocumentNotFoundException + * @throws ISLookUpException + */ + public String getLayoutSource(final String format) + throws ISLookUpDocumentNotFoundException, ISLookUpException { + return doLookup( + String + .format( + "collection('')//RESOURCE_PROFILE[.//RESOURCE_TYPE/@value = 'MDFormatDSResourceType' and .//NAME='%s']//LAYOUT[@name='%s']", + format, ProvisionConstants.LAYOUT)); + } + + /** + * Method retrieves from the information system the openaireLayoutToRecordStylesheet + * + * @return the string representation of the XSLT contained in the transformation rule profile + * @throws ISLookUpDocumentNotFoundException + * @throws ISLookUpException + */ + public String getLayoutTransformer() throws ISLookUpException { + return doLookup( + "collection('/db/DRIVER/TransformationRuleDSResources/TransformationRuleDSResourceType')" + + "//RESOURCE_PROFILE[./BODY/CONFIGURATION/SCRIPT/TITLE/text() = 'openaireLayoutToRecordStylesheet']//CODE/node()"); + } + + /** + * Method retrieves from the information system the IndexDS profile ID associated to the given MDFormat name + * + * @param format + * @return the IndexDS identifier + * @throws ISLookUpException + */ + public String getDsId(String format) throws ISLookUpException { + return doLookup( + String + .format( + "collection('/db/DRIVER/IndexDSResources/IndexDSResourceType')" + + "//RESOURCE_PROFILE[./BODY/CONFIGURATION/METADATA_FORMAT/text() = '%s']//RESOURCE_IDENTIFIER/@value/string()", + format)); + } + + /** + * Method retrieves from the information system the zookeeper quorum of the Solr server + * + * @return the zookeeper quorum of the Solr server + * @throws ISLookUpException + */ + public String getZkHost() throws ISLookUpException { + return doLookup( + "for $x in /RESOURCE_PROFILE[.//RESOURCE_TYPE/@value='IndexServiceResourceType'] return $x//PROTOCOL[./@name='solr']/@address/string()"); + } + + private String doLookup(String xquery) throws ISLookUpException { + log.info(String.format("running xquery: %s", xquery)); + final String res = getIsLookup().getResourceProfileByQuery(xquery); + log.info(String.format("got response (100 chars): %s", StringUtils.left(res, 100) + " ...")); + return res; + } + + public ISLookUpService getIsLookup() { + return isLookup; + } + + public void setIsLookup(ISLookUpService isLookup) { + this.isLookup = isLookup; + } + +} diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/utils/StreamingInputDocumentFactory.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/utils/StreamingInputDocumentFactory.java index 3e8abbd9f1..f16ee260fe 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/utils/StreamingInputDocumentFactory.java +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/utils/StreamingInputDocumentFactory.java @@ -46,11 +46,6 @@ public class StreamingInputDocumentFactory { private static final String INDEX_RECORD_ID = INDEX_FIELD_PREFIX + "indexrecordidentifier"; - private static final String outFormat = "yyyy-MM-dd'T'hh:mm:ss'Z'"; - - private static final List dateFormats = Arrays - .asList("yyyy-MM-dd'T'hh:mm:ss", "yyyy-MM-dd", "dd-MM-yyyy", "dd/MM/yyyy", "yyyy"); - private static final String DEFAULTDNETRESULT = "dnetResult"; private static final String TARGETFIELDS = "targetFields"; @@ -125,13 +120,12 @@ public class StreamingInputDocumentFactory { } if (!indexDocument.containsKey(INDEX_RECORD_ID)) { - indexDocument.clear(); - System.err.println("missing indexrecord id:\n" + inputDocument); + throw new IllegalStateException("cannot extract record ID from: " + inputDocument); } return indexDocument; } catch (XMLStreamException e) { - return new SolrInputDocument(); + throw new IllegalStateException(e); } } diff --git a/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/input_params_update_index.json b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/input_params_update_index.json index 3396020e07..3169648fb9 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/input_params_update_index.json +++ b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/input_params_update_index.json @@ -22,5 +22,11 @@ "paramLongName": "batchSize", "paramDescription": "size of the batch of documents sent to solr", "paramRequired": false + }, + { + "paramName": "o", + "paramLongName": "outputPath", + "paramDescription": "path on hdfs activating an alternative output for the SolrInputDocuments", + "paramRequired": false } ] diff --git a/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/oozie_app/workflow.xml index e2b74b9aa5..ee636b68e0 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/oozie_app/workflow.xml @@ -638,6 +638,7 @@ --isLookupUrl${isLookupUrl} --format${format} --batchSize${batchSize} + --outputPath${outputPath} diff --git a/dhp-workflows/dhp-graph-provision/src/test/java/eu/dnetlib/dhp/oa/provision/SolrAdminApplicationTest.java b/dhp-workflows/dhp-graph-provision/src/test/java/eu/dnetlib/dhp/oa/provision/SolrAdminApplicationTest.java index cbd7b2de29..33def91b39 100644 --- a/dhp-workflows/dhp-graph-provision/src/test/java/eu/dnetlib/dhp/oa/provision/SolrAdminApplicationTest.java +++ b/dhp-workflows/dhp-graph-provision/src/test/java/eu/dnetlib/dhp/oa/provision/SolrAdminApplicationTest.java @@ -1,107 +1,18 @@ package eu.dnetlib.dhp.oa.provision; -import java.io.File; -import java.nio.file.Path; - -import org.apache.solr.client.solrj.SolrResponse; -import org.apache.solr.client.solrj.embedded.JettyConfig; -import org.apache.solr.client.solrj.impl.CloudSolrClient; -import org.apache.solr.client.solrj.impl.XMLResponseParser; -import org.apache.solr.client.solrj.request.CollectionAdminRequest; -import org.apache.solr.client.solrj.request.ConfigSetAdminRequest; -import org.apache.solr.client.solrj.request.QueryRequest; -import org.apache.solr.client.solrj.request.RequestWriter; -import org.apache.solr.client.solrj.response.CollectionAdminResponse; -import org.apache.solr.client.solrj.response.ConfigSetAdminResponse; import org.apache.solr.client.solrj.response.SolrPingResponse; import org.apache.solr.client.solrj.response.UpdateResponse; -import org.apache.solr.cloud.MiniSolrCloudCluster; -import org.apache.solr.common.params.CollectionParams; -import org.apache.solr.common.params.CoreAdminParams; -import org.apache.solr.common.params.ModifiableSolrParams; -import org.apache.solr.common.util.NamedList; -import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.Assertions; -import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.io.TempDir; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import junit.framework.Assert; -public class SolrAdminApplicationTest { - - private static final Logger log = LoggerFactory.getLogger(SolrAdminApplicationTest.class); - public static final String DEFAULT_COLLECTION = "testCollection"; - public static final String CONFIG_NAME = "testConfig"; - - private static MiniSolrCloudCluster miniCluster; - private static CloudSolrClient cloudSolrClient; - - @TempDir - public static Path tempDir; - - @BeforeAll - public static void setup() throws Exception { - - // random unassigned HTTP port - final int jettyPort = 0; - - final JettyConfig jettyConfig = JettyConfig.builder().setPort(jettyPort).build(); - - // create a MiniSolrCloudCluster instance - miniCluster = new MiniSolrCloudCluster(2, tempDir, jettyConfig); - - // Upload Solr configuration directory to ZooKeeper - String solrZKConfigDir = "src/test/resources/eu/dnetlib/dhp/oa/provision/solr/conf/testConfig"; - File configDir = new File(solrZKConfigDir); - - miniCluster.uploadConfigSet(configDir.toPath(), CONFIG_NAME); - - // override settings in the solrconfig include - System.setProperty("solr.tests.maxBufferedDocs", "100000"); - System.setProperty("solr.tests.maxIndexingThreads", "-1"); - System.setProperty("solr.tests.ramBufferSizeMB", "100"); - - // use non-test classes so RandomizedRunner isn't necessary - System.setProperty("solr.tests.mergeScheduler", "org.apache.lucene.index.ConcurrentMergeScheduler"); - System.setProperty("solr.directoryFactory", "solr.RAMDirectoryFactory"); - - cloudSolrClient = miniCluster.getSolrClient(); - cloudSolrClient.setRequestWriter(new RequestWriter()); - cloudSolrClient.setParser(new XMLResponseParser()); - cloudSolrClient.setDefaultCollection(DEFAULT_COLLECTION); - cloudSolrClient.connect(); - - log.info(new ConfigSetAdminRequest.List().process(cloudSolrClient).toString()); - log.info(CollectionAdminRequest.ClusterStatus.getClusterStatus().process(cloudSolrClient).toString()); - - createCollection(cloudSolrClient, DEFAULT_COLLECTION, 2, 1, CONFIG_NAME); - } - - @AfterAll - public static void shutDown() throws Exception { - miniCluster.shutdown(); - } - - protected static NamedList createCollection(CloudSolrClient client, String name, int numShards, - int replicationFactor, String configName) throws Exception { - ModifiableSolrParams modParams = new ModifiableSolrParams(); - modParams.set(CoreAdminParams.ACTION, CollectionParams.CollectionAction.CREATE.name()); - modParams.set("name", name); - modParams.set("numShards", numShards); - modParams.set("replicationFactor", replicationFactor); - modParams.set("collection.configName", configName); - QueryRequest request = new QueryRequest(modParams); - request.setPath("/admin/collections"); - return client.request(request); - } +public class SolrAdminApplicationTest extends SolrTest { @Test public void testPing() throws Exception { - SolrPingResponse pingResponse = cloudSolrClient.ping(); + SolrPingResponse pingResponse = miniCluster.getSolrClient().ping(); log.info("pingResponse: '{}'", pingResponse.getStatus()); Assert.assertTrue(pingResponse.getStatus() == 0); } diff --git a/dhp-workflows/dhp-graph-provision/src/test/java/eu/dnetlib/dhp/oa/provision/SolrTest.java b/dhp-workflows/dhp-graph-provision/src/test/java/eu/dnetlib/dhp/oa/provision/SolrTest.java new file mode 100644 index 0000000000..186cb964a0 --- /dev/null +++ b/dhp-workflows/dhp-graph-provision/src/test/java/eu/dnetlib/dhp/oa/provision/SolrTest.java @@ -0,0 +1,109 @@ + +package eu.dnetlib.dhp.oa.provision; + +import java.io.File; +import java.nio.file.Path; + +import org.apache.commons.io.FileUtils; +import org.apache.solr.client.solrj.embedded.JettyConfig; +import org.apache.solr.client.solrj.impl.CloudSolrClient; +import org.apache.solr.client.solrj.request.CollectionAdminRequest; +import org.apache.solr.client.solrj.request.ConfigSetAdminRequest; +import org.apache.solr.client.solrj.request.QueryRequest; +import org.apache.solr.cloud.MiniSolrCloudCluster; +import org.apache.solr.common.params.CollectionParams; +import org.apache.solr.common.params.CoreAdminParams; +import org.apache.solr.common.params.ModifiableSolrParams; +import org.apache.solr.common.util.NamedList; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.io.TempDir; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public abstract class SolrTest { + + protected static final Logger log = LoggerFactory.getLogger(SolrTest.class); + + protected static final String FORMAT = "test"; + protected static final String DEFAULT_COLLECTION = FORMAT + "-index-openaire"; + protected static final String CONFIG_NAME = "testConfig"; + + protected static MiniSolrCloudCluster miniCluster; + + @TempDir + public static Path workingDir; + + @BeforeAll + public static void setup() throws Exception { + + // random unassigned HTTP port + final int jettyPort = 0; + final JettyConfig jettyConfig = JettyConfig.builder().setPort(jettyPort).build(); + + log.info(String.format("working directory: %s", workingDir.toString())); + System.setProperty("solr.log.dir", workingDir.resolve("logs").toString()); + + // create a MiniSolrCloudCluster instance + miniCluster = new MiniSolrCloudCluster(2, workingDir.resolve("solr"), jettyConfig); + + // Upload Solr configuration directory to ZooKeeper + String solrZKConfigDir = "src/test/resources/eu/dnetlib/dhp/oa/provision/solr/conf/testConfig"; + File configDir = new File(solrZKConfigDir); + + miniCluster.uploadConfigSet(configDir.toPath(), CONFIG_NAME); + + // override settings in the solrconfig include + System.setProperty("solr.tests.maxBufferedDocs", "100000"); + System.setProperty("solr.tests.maxIndexingThreads", "-1"); + System.setProperty("solr.tests.ramBufferSizeMB", "100"); + + // use non-test classes so RandomizedRunner isn't necessary + System.setProperty("solr.tests.mergeScheduler", "org.apache.lucene.index.ConcurrentMergeScheduler"); + System.setProperty("solr.directoryFactory", "solr.RAMDirectoryFactory"); + System.setProperty("solr.lock.type", "single"); + + log.info(new ConfigSetAdminRequest.List().process(miniCluster.getSolrClient()).toString()); + log + .info( + CollectionAdminRequest.ClusterStatus + .getClusterStatus() + .process(miniCluster.getSolrClient()) + .toString()); + + NamedList res = createCollection( + miniCluster.getSolrClient(), DEFAULT_COLLECTION, 4, 2, 20, CONFIG_NAME); + res.forEach(o -> log.info(o.toString())); + + miniCluster.getSolrClient().setDefaultCollection(DEFAULT_COLLECTION); + + log + .info( + CollectionAdminRequest.ClusterStatus + .getClusterStatus() + .process(miniCluster.getSolrClient()) + .toString()); + + } + + @AfterAll + public static void shutDown() throws Exception { + miniCluster.shutdown(); + FileUtils.deleteDirectory(workingDir.toFile()); + } + + protected static NamedList createCollection(CloudSolrClient client, String name, int numShards, + int replicationFactor, int maxShardsPerNode, String configName) throws Exception { + ModifiableSolrParams modParams = new ModifiableSolrParams(); + modParams.set(CoreAdminParams.ACTION, CollectionParams.CollectionAction.CREATE.name()); + modParams.set("name", name); + modParams.set("numShards", numShards); + modParams.set("replicationFactor", replicationFactor); + modParams.set("collection.configName", configName); + modParams.set("maxShardsPerNode", maxShardsPerNode); + QueryRequest request = new QueryRequest(modParams); + request.setPath("/admin/collections"); + return client.request(request); + } + +} diff --git a/dhp-workflows/dhp-graph-provision/src/test/java/eu/dnetlib/dhp/oa/provision/XmlIndexingJobTest.java b/dhp-workflows/dhp-graph-provision/src/test/java/eu/dnetlib/dhp/oa/provision/XmlIndexingJobTest.java new file mode 100644 index 0000000000..78d8930b69 --- /dev/null +++ b/dhp-workflows/dhp-graph-provision/src/test/java/eu/dnetlib/dhp/oa/provision/XmlIndexingJobTest.java @@ -0,0 +1,147 @@ + +package eu.dnetlib.dhp.oa.provision; + +import java.io.IOException; +import java.io.StringReader; +import java.net.URI; + +import org.apache.commons.io.IOUtils; +import org.apache.hadoop.io.Text; +import org.apache.solr.client.solrj.SolrQuery; +import org.apache.solr.client.solrj.response.QueryResponse; +import org.apache.solr.common.SolrInputField; +import org.apache.solr.common.params.CommonParams; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.JavaPairRDD; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.api.java.function.MapFunction; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.SparkSession; +import org.dom4j.io.SAXReader; +import org.junit.jupiter.api.*; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.junit.jupiter.MockitoExtension; + +import eu.dnetlib.dhp.oa.provision.model.SerializableSolrInputDocument; +import eu.dnetlib.dhp.oa.provision.utils.ISLookupClient; +import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException; +import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; + +@ExtendWith(MockitoExtension.class) +public class XmlIndexingJobTest extends SolrTest { + + protected static SparkSession spark; + + private static final Integer batchSize = 100; + + @Mock + private ISLookUpService isLookUpService; + + @Mock + private ISLookupClient isLookupClient; + + @BeforeEach + public void prepareMocks() throws ISLookUpException, IOException { + isLookupClient.setIsLookup(isLookUpService); + + int solrPort = URI.create("http://" + miniCluster.getZkClient().getZkServerAddress()).getPort(); + + Mockito + .when(isLookupClient.getDsId(Mockito.anyString())) + .thenReturn("313f0381-23b6-466f-a0b8-c72a9679ac4b_SW5kZXhEU1Jlc291cmNlcy9JbmRleERTUmVzb3VyY2VUeXBl"); + Mockito.when(isLookupClient.getZkHost()).thenReturn(String.format("127.0.0.1:%s/solr", solrPort)); + Mockito + .when(isLookupClient.getLayoutSource(Mockito.anyString())) + .thenReturn(IOUtils.toString(getClass().getResourceAsStream("fields.xml"))); + Mockito + .when(isLookupClient.getLayoutTransformer()) + .thenReturn(IOUtils.toString(getClass().getResourceAsStream("layoutToRecordTransformer.xsl"))); + } + + @BeforeAll + public static void before() { + + SparkConf conf = new SparkConf(); + conf.setAppName(XmlIndexingJobTest.class.getSimpleName()); + conf.registerKryoClasses(new Class[] { + SerializableSolrInputDocument.class + }); + + conf.setMaster("local[1]"); + conf.set("spark.driver.host", "localhost"); + conf.set("hive.metastore.local", "true"); + conf.set("spark.ui.enabled", "false"); + conf.set("spark.sql.warehouse.dir", workingDir.resolve("spark").toString()); + + spark = SparkSession + .builder() + .appName(XmlIndexingJobTest.class.getSimpleName()) + .config(conf) + .getOrCreate(); + } + + @AfterAll + public static void tearDown() { + spark.stop(); + } + + @Test + public void testXmlIndexingJob_onSolr() throws Exception { + + String inputPath = "src/test/resources/eu/dnetlib/dhp/oa/provision/xml"; + + long nRecord = JavaSparkContext + .fromSparkContext(spark.sparkContext()) + .sequenceFile(inputPath, Text.class, Text.class) + .count(); + + new XmlIndexingJob(spark, inputPath, FORMAT, batchSize, null).run(isLookupClient); + + Assertions.assertEquals(0, miniCluster.getSolrClient().commit().getStatus()); + + QueryResponse rsp = miniCluster.getSolrClient().query(new SolrQuery().add(CommonParams.Q, "*:*")); + + Assertions + .assertEquals( + nRecord, rsp.getResults().getNumFound(), + "the number of indexed records should be equal to the number of input records"); + } + + @Test + public void testXmlIndexingJob_saveOnHDFS() throws Exception { + final String ID_XPATH = "//header/*[local-name()='objIdentifier']"; + + String inputPath = "src/test/resources/eu/dnetlib/dhp/oa/provision/xml"; + + final JavaPairRDD xmlRecords = JavaSparkContext + .fromSparkContext(spark.sparkContext()) + .sequenceFile(inputPath, Text.class, Text.class); + long nRecord = xmlRecords.count(); + long xmlIdUnique = xmlRecords + .map(t -> t._2().toString()) + .map(s -> new SAXReader().read(new StringReader(s)).valueOf(ID_XPATH)) + .distinct() + .count(); + Assertions.assertEquals(nRecord, xmlIdUnique, "IDs should be unique among input records"); + + final String outputPath = workingDir.resolve("outputPath").toAbsolutePath().toString(); + new XmlIndexingJob(spark, inputPath, FORMAT, batchSize, outputPath).run(isLookupClient); + + final Dataset solrDocs = spark + .read() + .load(outputPath) + .as(Encoders.kryo(SerializableSolrInputDocument.class)); + long docIdUnique = solrDocs.map((MapFunction) doc -> { + final SolrInputField id = doc.getField("__indexrecordidentifier"); + return id.getFirstValue().toString(); + }, Encoders.STRING()) + .distinct() + .count(); + Assertions.assertEquals(xmlIdUnique, docIdUnique, "IDs should be unique among the output records"); + + } + +} diff --git a/dhp-workflows/dhp-graph-provision/src/test/resources/eu/dnetlib/dhp/oa/provision/fields.xml b/dhp-workflows/dhp-graph-provision/src/test/resources/eu/dnetlib/dhp/oa/provision/fields.xml index f74da5d071..1f5cf7b815 100644 --- a/dhp-workflows/dhp-graph-provision/src/test/resources/eu/dnetlib/dhp/oa/provision/fields.xml +++ b/dhp-workflows/dhp-graph-provision/src/test/resources/eu/dnetlib/dhp/oa/provision/fields.xml @@ -105,7 +105,7 @@ - + @@ -123,7 +123,8 @@ - + + diff --git a/dhp-workflows/dhp-graph-provision/src/test/resources/eu/dnetlib/dhp/oa/provision/solr/conf/testConfig/elevate.xml b/dhp-workflows/dhp-graph-provision/src/test/resources/eu/dnetlib/dhp/oa/provision/solr/conf/testConfig/elevate.xml new file mode 100644 index 0000000000..668332b28a --- /dev/null +++ b/dhp-workflows/dhp-graph-provision/src/test/resources/eu/dnetlib/dhp/oa/provision/solr/conf/testConfig/elevate.xml @@ -0,0 +1,31 @@ +Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + --> + + + + + + \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-provision/src/test/resources/eu/dnetlib/dhp/oa/provision/solr/conf/testConfig/managed-schema b/dhp-workflows/dhp-graph-provision/src/test/resources/eu/dnetlib/dhp/oa/provision/solr/conf/testConfig/managed-schema index b50c5586c4..977e0b2d72 100644 --- a/dhp-workflows/dhp-graph-provision/src/test/resources/eu/dnetlib/dhp/oa/provision/solr/conf/testConfig/managed-schema +++ b/dhp-workflows/dhp-graph-provision/src/test/resources/eu/dnetlib/dhp/oa/provision/solr/conf/testConfig/managed-schema @@ -1,1003 +1,404 @@ - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - id - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - + + + + __indexrecordidentifier + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-provision/src/test/resources/eu/dnetlib/dhp/oa/provision/solr/conf/testConfig/solrconfig.xml b/dhp-workflows/dhp-graph-provision/src/test/resources/eu/dnetlib/dhp/oa/provision/solr/conf/testConfig/solrconfig.xml index 562b1cb555..79f3c61044 100644 --- a/dhp-workflows/dhp-graph-provision/src/test/resources/eu/dnetlib/dhp/oa/provision/solr/conf/testConfig/solrconfig.xml +++ b/dhp-workflows/dhp-graph-provision/src/test/resources/eu/dnetlib/dhp/oa/provision/solr/conf/testConfig/solrconfig.xml @@ -83,6 +83,7 @@ + @@ -204,7 +206,7 @@ More details on the nuances of each LockFactory... http://wiki.apache.org/lucene-java/AvailableLockFactories --> - ${solr.lock.type:single} + ${solr.lock.type:native} + + + @@ -366,14 +391,22 @@ Query section - these settings control query time things like caches ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ --> + 1024 + - + + explicit + AND 10 - - + + default - _text_ + __all solr.DirectSolrSpellChecker internal @@ -986,6 +1044,7 @@ string + elevate.xml @@ -1116,81 +1175,70 @@ - - - - [^\w-\.] - _ - - - - - - - yyyy-MM-dd'T'HH:mm:ss.SSSZ - yyyy-MM-dd'T'HH:mm:ss,SSSZ - yyyy-MM-dd'T'HH:mm:ss.SSS - yyyy-MM-dd'T'HH:mm:ss,SSS - yyyy-MM-dd'T'HH:mm:ssZ - yyyy-MM-dd'T'HH:mm:ss - yyyy-MM-dd'T'HH:mmZ - yyyy-MM-dd'T'HH:mm - yyyy-MM-dd HH:mm:ss.SSSZ - yyyy-MM-dd HH:mm:ss,SSSZ - yyyy-MM-dd HH:mm:ss.SSS - yyyy-MM-dd HH:mm:ss,SSS - yyyy-MM-dd HH:mm:ssZ - yyyy-MM-dd HH:mm:ss - yyyy-MM-dd HH:mmZ - yyyy-MM-dd HH:mm - yyyy-MM-dd - - - - - java.lang.String - text_general - - *_str - 256 + + + + + + [^\w-\.] + _ + + + + + + + yyyy-MM-dd'T'HH:mm:ss.SSSZ + yyyy-MM-dd'T'HH:mm:ss,SSSZ + yyyy-MM-dd'T'HH:mm:ss.SSS + yyyy-MM-dd'T'HH:mm:ss,SSS + yyyy-MM-dd'T'HH:mm:ssZ + yyyy-MM-dd'T'HH:mm:ss + yyyy-MM-dd'T'HH:mmZ + yyyy-MM-dd'T'HH:mm + yyyy-MM-dd HH:mm:ss.SSSZ + yyyy-MM-dd HH:mm:ss,SSSZ + yyyy-MM-dd HH:mm:ss.SSS + yyyy-MM-dd HH:mm:ss,SSS + yyyy-MM-dd HH:mm:ssZ + yyyy-MM-dd HH:mm:ss + yyyy-MM-dd HH:mmZ + yyyy-MM-dd HH:mm + yyyy-MM-dd + + + + strings + + java.lang.Boolean + booleans - - true - - - java.lang.Boolean - booleans - - - java.util.Date - pdates - - - java.lang.Long - java.lang.Integer - plongs - - - java.lang.Number - pdoubles - - + + java.util.Date + tdates + + + java.lang.Long + java.lang.Integer + tlongs + + + java.lang.Number + tdoubles + + - - @@ -1313,7 +1361,7 @@ - + \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-provision/src/test/resources/eu/dnetlib/dhp/oa/provision/xml/part-00000 b/dhp-workflows/dhp-graph-provision/src/test/resources/eu/dnetlib/dhp/oa/provision/xml/part-00000 new file mode 100644 index 0000000000..ff4095a11b Binary files /dev/null and b/dhp-workflows/dhp-graph-provision/src/test/resources/eu/dnetlib/dhp/oa/provision/xml/part-00000 differ diff --git a/dhp-workflows/dhp-stats-update/src/main/resources/eu/dnetlib/dhp/oa/graph/stats/oozie_app/workflow.xml b/dhp-workflows/dhp-stats-update/src/main/resources/eu/dnetlib/dhp/oa/graph/stats/oozie_app/workflow.xml index 20eec37dc3..d6cc14e255 100644 --- a/dhp-workflows/dhp-stats-update/src/main/resources/eu/dnetlib/dhp/oa/graph/stats/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-stats-update/src/main/resources/eu/dnetlib/dhp/oa/graph/stats/oozie_app/workflow.xml @@ -46,7 +46,7 @@ - + Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] diff --git a/pom.xml b/pom.xml index 03c69108d5..d0757f145a 100644 --- a/pom.xml +++ b/pom.xml @@ -50,7 +50,7 @@ dnet45-releases D-Net 45 releases - http://maven.research-infrastructures.eu/nexus/content/repositories/dnet45-releases + https://maven.d4science.org/nexus/content/repositories/dnet45-releases default false @@ -70,6 +70,26 @@ false + + dnet45-releases-old + http://maven.research-infrastructures.eu/nexus/content/repositories/dnet45-releases + + false + + + false + + + + dnet45-snapshots-old + http://maven.research-infrastructures.eu/nexus/content/repositories/dnet45-snapshots + + false + + + false + + @@ -639,12 +659,12 @@ dnet45-snapshots DNet45 Snapshots - http://maven.research-infrastructures.eu/nexus/content/repositories/dnet45-snapshots + https://maven.d4science.org/nexus/content/repositories/dnet45-snapshots default dnet45-releases - http://maven.research-infrastructures.eu/nexus/content/repositories/dnet45-releases + https://maven.d4science.org/nexus/content/repositories/dnet45-releases