diff --git a/dhp-workflows/dhp-actionmanager/src/main/java/eu/dnetlib/dhp/actionmanager/promote/MergeAndGet.java b/dhp-workflows/dhp-actionmanager/src/main/java/eu/dnetlib/dhp/actionmanager/promote/MergeAndGet.java index 6de631173..13bf889d6 100644 --- a/dhp-workflows/dhp-actionmanager/src/main/java/eu/dnetlib/dhp/actionmanager/promote/MergeAndGet.java +++ b/dhp-workflows/dhp-actionmanager/src/main/java/eu/dnetlib/dhp/actionmanager/promote/MergeAndGet.java @@ -1,14 +1,14 @@ package eu.dnetlib.dhp.actionmanager.promote; +import static eu.dnetlib.dhp.schema.common.ModelSupport.isSubClass; + +import java.util.function.BiFunction; + import eu.dnetlib.dhp.common.FunctionalInterfaceSupport.SerializableSupplier; import eu.dnetlib.dhp.schema.oaf.Oaf; import eu.dnetlib.dhp.schema.oaf.utils.MergeUtils; -import java.util.function.BiFunction; - -import static eu.dnetlib.dhp.schema.common.ModelSupport.isSubClass; - /** OAF model merging support. */ public class MergeAndGet { diff --git a/dhp-workflows/dhp-actionmanager/src/test/java/eu/dnetlib/dhp/actionmanager/promote/MergeAndGetTest.java b/dhp-workflows/dhp-actionmanager/src/test/java/eu/dnetlib/dhp/actionmanager/promote/MergeAndGetTest.java index 6b54ae0fd..f25ed7903 100644 --- a/dhp-workflows/dhp-actionmanager/src/test/java/eu/dnetlib/dhp/actionmanager/promote/MergeAndGetTest.java +++ b/dhp-workflows/dhp-actionmanager/src/test/java/eu/dnetlib/dhp/actionmanager/promote/MergeAndGetTest.java @@ -100,7 +100,7 @@ public class MergeAndGetTest { assertTrue(Relation.class.isAssignableFrom(x.getClass())); // TODO should be reimplemented - //verify(a).mergeFrom(b); + // verify(a).mergeFrom(b); assertEquals(a, x); } @@ -163,7 +163,7 @@ public class MergeAndGetTest { assertTrue(OafEntity.class.isAssignableFrom(x.getClass())); // TODO should be reimplemented - //verify(a).mergeFrom(b); + // verify(a).mergeFrom(b); assertEquals(a, x); } } diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipfinder/SparkAtomicActionScoreJob.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipfinder/SparkAtomicActionScoreJob.java index 76c8ec7fa..8adc88920 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipfinder/SparkAtomicActionScoreJob.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipfinder/SparkAtomicActionScoreJob.java @@ -95,7 +95,7 @@ public class SparkAtomicActionScoreJob implements Serializable { return projectScores.map((MapFunction) bipProjectScores -> { Project project = new Project(); - //project.setId(bipProjectScores.getProjectId()); + // project.setId(bipProjectScores.getProjectId()); project.setMeasures(bipProjectScores.toMeasures()); return project; }, Encoders.bean(Project.class)) diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipmodel/score/deserializers/BipProjectModel.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipmodel/score/deserializers/BipProjectModel.java index a41540564..a3ec8d3af 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipmodel/score/deserializers/BipProjectModel.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipmodel/score/deserializers/BipProjectModel.java @@ -38,7 +38,6 @@ public class BipProjectModel { return projectId; } - // each project bip measure has exactly one value, hence one key-value pair private Measure createMeasure(String measureId, String measureValue) { diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/opencitations/MapOCIdsInPids.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/opencitations/MapOCIdsInPids.java index f7de4ac08..306662c18 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/opencitations/MapOCIdsInPids.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/opencitations/MapOCIdsInPids.java @@ -166,6 +166,6 @@ public class MapOCIdsInPids implements Serializable { .option("compression", "gzip") .json(outputPath); - } + } } diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/SparkAtomicActionJob.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/SparkAtomicActionJob.java index 21f3044a5..d986da78b 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/SparkAtomicActionJob.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/SparkAtomicActionJob.java @@ -1,7 +1,26 @@ package eu.dnetlib.dhp.actionmanager.project; +import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; + +import java.util.Arrays; +import java.util.Objects; +import java.util.Optional; + +import org.apache.commons.io.IOUtils; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapred.SequenceFileOutputFormat; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.function.MapFunction; +import org.apache.spark.api.java.function.MapGroupsFunction; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.SparkSession; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import com.fasterxml.jackson.databind.ObjectMapper; + import eu.dnetlib.dhp.actionmanager.project.utils.model.CSVProgramme; import eu.dnetlib.dhp.actionmanager.project.utils.model.CSVProject; import eu.dnetlib.dhp.actionmanager.project.utils.model.JsonTopic; @@ -15,25 +34,8 @@ import eu.dnetlib.dhp.schema.oaf.OafEntity; import eu.dnetlib.dhp.schema.oaf.Project; import eu.dnetlib.dhp.schema.oaf.utils.MergeUtils; import eu.dnetlib.dhp.utils.DHPUtils; -import org.apache.commons.io.IOUtils; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.mapred.SequenceFileOutputFormat; -import org.apache.spark.SparkConf; -import org.apache.spark.api.java.function.MapFunction; -import org.apache.spark.api.java.function.MapGroupsFunction; -import org.apache.spark.sql.Dataset; -import org.apache.spark.sql.Encoders; -import org.apache.spark.sql.SparkSession; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import scala.Tuple2; -import java.util.Arrays; -import java.util.Objects; -import java.util.Optional; - -import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; - /** * Class that makes the ActionSet. To prepare the AS two joins are needed * diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/transformativeagreement/CreateActionSetSparkJob.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/transformativeagreement/CreateActionSetSparkJob.java index e1c3226cd..e8443c033 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/transformativeagreement/CreateActionSetSparkJob.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/transformativeagreement/CreateActionSetSparkJob.java @@ -79,35 +79,35 @@ public class CreateActionSetSparkJob implements Serializable { private static void createActionSet(SparkSession spark, String inputPath, String outputPath) { JavaRDD relations = spark - .read() - .textFile(inputPath) - .map( - (MapFunction) value -> OBJECT_MAPPER - .readValue(value, TransformativeAgreementModel.class), - Encoders.bean(TransformativeAgreementModel.class)) - .flatMap( - (FlatMapFunction) value -> createRelation( - value) - .iterator(), - Encoders.bean(Relation.class)) - .filter((FilterFunction) Objects::nonNull) - .toJavaRDD() - .map(p -> new AtomicAction(p.getClass(), p)); + .read() + .textFile(inputPath) + .map( + (MapFunction) value -> OBJECT_MAPPER + .readValue(value, TransformativeAgreementModel.class), + Encoders.bean(TransformativeAgreementModel.class)) + .flatMap( + (FlatMapFunction) value -> createRelation( + value) + .iterator(), + Encoders.bean(Relation.class)) + .filter((FilterFunction) Objects::nonNull) + .toJavaRDD() + .map(p -> new AtomicAction(p.getClass(), p)); //TODO relations in stand-by waiting to know if we need to create them or not In case we need just make a union before saving the sequence file - spark - .read() - .textFile(inputPath) - .map( - (MapFunction) value -> OBJECT_MAPPER - .readValue(value, TransformativeAgreementModel.class), - Encoders.bean(TransformativeAgreementModel.class)) - .map( - (MapFunction) value -> createResult( - value), - Encoders.bean(Result.class)) - .filter((FilterFunction) r -> r != null) - .toJavaRDD() - .map(p -> new AtomicAction(p.getClass(), p)) + spark + .read() + .textFile(inputPath) + .map( + (MapFunction) value -> OBJECT_MAPPER + .readValue(value, TransformativeAgreementModel.class), + Encoders.bean(TransformativeAgreementModel.class)) + .map( + (MapFunction) value -> createResult( + value), + Encoders.bean(Result.class)) + .filter((FilterFunction) r -> r != null) + .toJavaRDD() + .map(p -> new AtomicAction(p.getClass(), p)) .mapToPair( aa -> new Tuple2<>(new Text(aa.getClazz().getCanonicalName()), new Text(OBJECT_MAPPER.writeValueAsString(aa)))) diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/transformation/xslt/XSLTTransformationFunction.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/transformation/xslt/XSLTTransformationFunction.java index 247915d17..4b8d30292 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/transformation/xslt/XSLTTransformationFunction.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/transformation/xslt/XSLTTransformationFunction.java @@ -49,29 +49,29 @@ public class XSLTTransformationFunction implements MapFunctiontr.call(mr)); + assertThrows(RuntimeException.class, () -> tr.call(mr)); } diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/DedupRecordFactory.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/DedupRecordFactory.java index 35fa61b94..574b13b26 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/DedupRecordFactory.java +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/DedupRecordFactory.java @@ -4,7 +4,6 @@ package eu.dnetlib.dhp.oa.dedup; import java.util.*; import java.util.stream.Stream; -import eu.dnetlib.dhp.schema.oaf.utils.MergeUtils; import org.apache.commons.beanutils.BeanUtils; import org.apache.commons.lang3.StringUtils; import org.apache.spark.api.java.function.FlatMapFunction; @@ -15,200 +14,200 @@ import org.apache.spark.sql.*; import eu.dnetlib.dhp.oa.dedup.model.Identifier; import eu.dnetlib.dhp.oa.merge.AuthorMerger; import eu.dnetlib.dhp.schema.common.ModelSupport; -import eu.dnetlib.dhp.schema.oaf.utils.MergeUtils; import eu.dnetlib.dhp.schema.oaf.Author; import eu.dnetlib.dhp.schema.oaf.DataInfo; import eu.dnetlib.dhp.schema.oaf.OafEntity; import eu.dnetlib.dhp.schema.oaf.Result; +import eu.dnetlib.dhp.schema.oaf.utils.MergeUtils; import scala.Tuple2; import scala.Tuple3; import scala.collection.JavaConversions; public class DedupRecordFactory { - public static final class DedupRecordReduceState { - public final String dedupId; + public static final class DedupRecordReduceState { + public final String dedupId; - public final ArrayList aliases = new ArrayList<>(); + public final ArrayList aliases = new ArrayList<>(); - public final HashSet acceptanceDate = new HashSet<>(); + public final HashSet acceptanceDate = new HashSet<>(); - public OafEntity entity; + public OafEntity entity; - public DedupRecordReduceState(String dedupId, String id, OafEntity entity) { - this.dedupId = dedupId; - this.entity = entity; - if (entity == null) { - aliases.add(id); - } else { - if (Result.class.isAssignableFrom(entity.getClass())) { - Result result = (Result) entity; - if (result.getDateofacceptance() != null - && StringUtils.isNotBlank(result.getDateofacceptance().getValue())) { - acceptanceDate.add(result.getDateofacceptance().getValue()); - } - } - } - } + public DedupRecordReduceState(String dedupId, String id, OafEntity entity) { + this.dedupId = dedupId; + this.entity = entity; + if (entity == null) { + aliases.add(id); + } else { + if (Result.class.isAssignableFrom(entity.getClass())) { + Result result = (Result) entity; + if (result.getDateofacceptance() != null + && StringUtils.isNotBlank(result.getDateofacceptance().getValue())) { + acceptanceDate.add(result.getDateofacceptance().getValue()); + } + } + } + } - public String getDedupId() { - return dedupId; - } - } + public String getDedupId() { + return dedupId; + } + } - private static final int MAX_ACCEPTANCE_DATE = 20; + private static final int MAX_ACCEPTANCE_DATE = 20; - private DedupRecordFactory() { - } + private DedupRecordFactory() { + } - public static Dataset createDedupRecord( - final SparkSession spark, - final DataInfo dataInfo, - final String mergeRelsInputPath, - final String entitiesInputPath, - final Class clazz) { + public static Dataset createDedupRecord( + final SparkSession spark, + final DataInfo dataInfo, + final String mergeRelsInputPath, + final String entitiesInputPath, + final Class clazz) { - final long ts = System.currentTimeMillis(); - final Encoder beanEncoder = Encoders.bean(clazz); - final Encoder kryoEncoder = Encoders.kryo(clazz); + final long ts = System.currentTimeMillis(); + final Encoder beanEncoder = Encoders.bean(clazz); + final Encoder kryoEncoder = Encoders.kryo(clazz); - // - Dataset entities = spark - .read() - .schema(Encoders.bean(clazz).schema()) - .json(entitiesInputPath) - .as(beanEncoder) - .map( - (MapFunction>) entity -> { - return new Tuple2<>(entity.getId(), entity); - }, - Encoders.tuple(Encoders.STRING(), kryoEncoder)) - .selectExpr("_1 AS id", "_2 AS kryoObject"); + // + Dataset entities = spark + .read() + .schema(Encoders.bean(clazz).schema()) + .json(entitiesInputPath) + .as(beanEncoder) + .map( + (MapFunction>) entity -> { + return new Tuple2<>(entity.getId(), entity); + }, + Encoders.tuple(Encoders.STRING(), kryoEncoder)) + .selectExpr("_1 AS id", "_2 AS kryoObject"); - // : source is the dedup_id, target is the id of the mergedIn - Dataset mergeRels = spark - .read() - .load(mergeRelsInputPath) - .where("relClass == 'merges'") - .selectExpr("source as dedupId", "target as id"); + // : source is the dedup_id, target is the id of the mergedIn + Dataset mergeRels = spark + .read() + .load(mergeRelsInputPath) + .where("relClass == 'merges'") + .selectExpr("source as dedupId", "target as id"); - return mergeRels - .join(entities, JavaConversions.asScalaBuffer(Collections.singletonList("id")), "left") - .select("dedupId", "id", "kryoObject") - .as(Encoders.tuple(Encoders.STRING(), Encoders.STRING(), kryoEncoder)) - .map( - (MapFunction, DedupRecordReduceState>) t -> new DedupRecordReduceState( - t._1(), t._2(), t._3()), - Encoders.kryo(DedupRecordReduceState.class)) - .groupByKey( - (MapFunction) DedupRecordReduceState::getDedupId, Encoders.STRING()) - .reduceGroups( - (ReduceFunction) (t1, t2) -> { - if (t1.entity == null) { - t2.aliases.addAll(t1.aliases); - return t2; - } - if (t1.acceptanceDate.size() < MAX_ACCEPTANCE_DATE) { - t1.acceptanceDate.addAll(t2.acceptanceDate); - } - t1.aliases.addAll(t2.aliases); - t1.entity = reduceEntity(t1.entity, t2.entity); + return mergeRels + .join(entities, JavaConversions.asScalaBuffer(Collections.singletonList("id")), "left") + .select("dedupId", "id", "kryoObject") + .as(Encoders.tuple(Encoders.STRING(), Encoders.STRING(), kryoEncoder)) + .map( + (MapFunction, DedupRecordReduceState>) t -> new DedupRecordReduceState( + t._1(), t._2(), t._3()), + Encoders.kryo(DedupRecordReduceState.class)) + .groupByKey( + (MapFunction) DedupRecordReduceState::getDedupId, Encoders.STRING()) + .reduceGroups( + (ReduceFunction) (t1, t2) -> { + if (t1.entity == null) { + t2.aliases.addAll(t1.aliases); + return t2; + } + if (t1.acceptanceDate.size() < MAX_ACCEPTANCE_DATE) { + t1.acceptanceDate.addAll(t2.acceptanceDate); + } + t1.aliases.addAll(t2.aliases); + t1.entity = reduceEntity(t1.entity, t2.entity); - return t1; - }) - .flatMap((FlatMapFunction, OafEntity>) t -> { - String dedupId = t._1(); - DedupRecordReduceState agg = t._2(); + return t1; + }) + .flatMap((FlatMapFunction, OafEntity>) t -> { + String dedupId = t._1(); + DedupRecordReduceState agg = t._2(); - if (agg.acceptanceDate.size() >= MAX_ACCEPTANCE_DATE) { - return Collections.emptyIterator(); - } + if (agg.acceptanceDate.size() >= MAX_ACCEPTANCE_DATE) { + return Collections.emptyIterator(); + } - return Stream - .concat( - Stream - .of(agg.getDedupId()) - .map(id -> createDedupOafEntity(id, agg.entity, dataInfo, ts)), - agg.aliases - .stream() - .map(id -> createMergedDedupAliasOafEntity(id, agg.entity, dataInfo, ts))) - .iterator(); - }, beanEncoder); - } + return Stream + .concat( + Stream + .of(agg.getDedupId()) + .map(id -> createDedupOafEntity(id, agg.entity, dataInfo, ts)), + agg.aliases + .stream() + .map(id -> createMergedDedupAliasOafEntity(id, agg.entity, dataInfo, ts))) + .iterator(); + }, beanEncoder); + } - private static OafEntity createDedupOafEntity(String id, OafEntity base, DataInfo dataInfo, long ts) { - try { - OafEntity res = (OafEntity) BeanUtils.cloneBean(base); - res.setId(id); - res.setDataInfo(dataInfo); - res.setLastupdatetimestamp(ts); - return res; - } catch (Exception e) { - throw new RuntimeException(e); - } - } + private static OafEntity createDedupOafEntity(String id, OafEntity base, DataInfo dataInfo, long ts) { + try { + OafEntity res = (OafEntity) BeanUtils.cloneBean(base); + res.setId(id); + res.setDataInfo(dataInfo); + res.setLastupdatetimestamp(ts); + return res; + } catch (Exception e) { + throw new RuntimeException(e); + } + } - private static OafEntity createMergedDedupAliasOafEntity(String id, OafEntity base, DataInfo dataInfo, long ts) { - try { - OafEntity res = createDedupOafEntity(id, base, dataInfo, ts); - DataInfo ds = (DataInfo) BeanUtils.cloneBean(dataInfo); - ds.setDeletedbyinference(true); - res.setDataInfo(ds); - return res; - } catch (Exception e) { - throw new RuntimeException(e); - } - } + private static OafEntity createMergedDedupAliasOafEntity(String id, OafEntity base, DataInfo dataInfo, long ts) { + try { + OafEntity res = createDedupOafEntity(id, base, dataInfo, ts); + DataInfo ds = (DataInfo) BeanUtils.cloneBean(dataInfo); + ds.setDeletedbyinference(true); + res.setDataInfo(ds); + return res; + } catch (Exception e) { + throw new RuntimeException(e); + } + } - private static OafEntity reduceEntity(OafEntity entity, OafEntity duplicate) { + private static OafEntity reduceEntity(OafEntity entity, OafEntity duplicate) { - if (duplicate == null) { - return entity; - } + if (duplicate == null) { + return entity; + } - int compare = new IdentifierComparator<>() - .compare(Identifier.newInstance(entity), Identifier.newInstance(duplicate)); + int compare = new IdentifierComparator<>() + .compare(Identifier.newInstance(entity), Identifier.newInstance(duplicate)); - if (compare > 0) { - OafEntity swap = duplicate; - duplicate = entity; - entity = swap; - } + if (compare > 0) { + OafEntity swap = duplicate; + duplicate = entity; + entity = swap; + } - entity = MergeUtils.checkedMerge(entity, duplicate); + entity = MergeUtils.checkedMerge(entity, duplicate); - if (ModelSupport.isSubClass(duplicate, Result.class)) { - Result re = (Result) entity; - Result rd = (Result) duplicate; + if (ModelSupport.isSubClass(duplicate, Result.class)) { + Result re = (Result) entity; + Result rd = (Result) duplicate; - List> authors = new ArrayList<>(); - if (re.getAuthor() != null) { - authors.add(re.getAuthor()); - } - if (rd.getAuthor() != null) { - authors.add(rd.getAuthor()); - } + List> authors = new ArrayList<>(); + if (re.getAuthor() != null) { + authors.add(re.getAuthor()); + } + if (rd.getAuthor() != null) { + authors.add(rd.getAuthor()); + } - re.setAuthor(AuthorMerger.merge(authors)); - } + re.setAuthor(AuthorMerger.merge(authors)); + } - return entity; - } + return entity; + } - public static T entityMerger( - String id, Iterator> entities, long ts, DataInfo dataInfo, Class clazz) { - T base = entities.next()._2(); + public static T entityMerger( + String id, Iterator> entities, long ts, DataInfo dataInfo, Class clazz) { + T base = entities.next()._2(); - while (entities.hasNext()) { - T duplicate = entities.next()._2(); - if (duplicate != null) - base = (T) reduceEntity(base, duplicate); - } + while (entities.hasNext()) { + T duplicate = entities.next()._2(); + if (duplicate != null) + base = (T) reduceEntity(base, duplicate); + } - base.setId(id); - base.setDataInfo(dataInfo); - base.setLastupdatetimestamp(ts); + base.setId(id); + base.setDataInfo(dataInfo); + base.setLastupdatetimestamp(ts); - return base; - } + return base; + } } diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkPropagateRelation.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkPropagateRelation.java index 77e696501..e4bcf1e82 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkPropagateRelation.java +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkPropagateRelation.java @@ -3,7 +3,6 @@ package eu.dnetlib.dhp.oa.dedup; import static org.apache.spark.sql.functions.col; -import eu.dnetlib.dhp.schema.oaf.utils.MergeUtils; import org.apache.commons.io.IOUtils; import org.apache.spark.SparkConf; import org.apache.spark.api.java.function.MapFunction; @@ -21,6 +20,7 @@ import eu.dnetlib.dhp.schema.common.ModelConstants; import eu.dnetlib.dhp.schema.common.ModelSupport; import eu.dnetlib.dhp.schema.oaf.DataInfo; import eu.dnetlib.dhp.schema.oaf.Relation; +import eu.dnetlib.dhp.schema.oaf.utils.MergeUtils; import eu.dnetlib.dhp.utils.ISLookupClientFactory; import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; import scala.Tuple2; @@ -128,8 +128,7 @@ public class SparkPropagateRelation extends AbstractSparkAction { (MapFunction) r -> String .join(" ", r.getSource(), r.getTarget(), r.getRelType(), r.getSubRelType(), r.getRelClass()), Encoders.STRING()) - .reduceGroups((ReduceFunction) MergeUtils::mergeRelation - ) + .reduceGroups((ReduceFunction) MergeUtils::mergeRelation) .map((MapFunction, Relation>) Tuple2::_2, REL_BEAN_ENC); final String outputRelationPath = graphOutputPath + "/relation"; diff --git a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttocommunityfromproject/SparkResultToCommunityFromProject.java b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttocommunityfromproject/SparkResultToCommunityFromProject.java index f791825a7..934856742 100644 --- a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttocommunityfromproject/SparkResultToCommunityFromProject.java +++ b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttocommunityfromproject/SparkResultToCommunityFromProject.java @@ -13,7 +13,6 @@ import java.util.List; import java.util.Optional; import java.util.stream.Collectors; -import eu.dnetlib.dhp.schema.oaf.utils.MergeUtils; import org.apache.commons.io.IOUtils; import org.apache.spark.SparkConf; import org.apache.spark.api.java.function.MapFunction; @@ -29,6 +28,7 @@ import eu.dnetlib.dhp.schema.common.ModelConstants; import eu.dnetlib.dhp.schema.common.ModelSupport; import eu.dnetlib.dhp.schema.oaf.Context; import eu.dnetlib.dhp.schema.oaf.Result; +import eu.dnetlib.dhp.schema.oaf.utils.MergeUtils; import scala.Tuple2; /** diff --git a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttocommunityfromsemrel/SparkResultToCommunityThroughSemRelJob.java b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttocommunityfromsemrel/SparkResultToCommunityThroughSemRelJob.java index d2bfd75c1..3cf2f73c3 100644 --- a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttocommunityfromsemrel/SparkResultToCommunityThroughSemRelJob.java +++ b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttocommunityfromsemrel/SparkResultToCommunityThroughSemRelJob.java @@ -25,118 +25,118 @@ import scala.Tuple2; public class SparkResultToCommunityThroughSemRelJob { - private static final Logger log = LoggerFactory.getLogger(SparkResultToCommunityThroughSemRelJob.class); + private static final Logger log = LoggerFactory.getLogger(SparkResultToCommunityThroughSemRelJob.class); - public static void main(String[] args) throws Exception { + public static void main(String[] args) throws Exception { - String jsonConfiguration = IOUtils - .toString( - SparkResultToCommunityThroughSemRelJob.class - .getResourceAsStream( - "/eu/dnetlib/dhp/wf/subworkflows/resulttocommunityfromsemrel/input_communitytoresult_parameters.json")); + String jsonConfiguration = IOUtils + .toString( + SparkResultToCommunityThroughSemRelJob.class + .getResourceAsStream( + "/eu/dnetlib/dhp/wf/subworkflows/resulttocommunityfromsemrel/input_communitytoresult_parameters.json")); - final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration); + final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration); - parser.parseArgument(args); + parser.parseArgument(args); - Boolean isSparkSessionManaged = isSparkSessionManaged(parser); - log.info("isSparkSessionManaged: {}", isSparkSessionManaged); + Boolean isSparkSessionManaged = isSparkSessionManaged(parser); + log.info("isSparkSessionManaged: {}", isSparkSessionManaged); - String inputPath = parser.get("sourcePath"); - log.info("inputPath: {}", inputPath); + String inputPath = parser.get("sourcePath"); + log.info("inputPath: {}", inputPath); - final String outputPath = parser.get("outputPath"); - log.info("outputPath: {}", outputPath); + final String outputPath = parser.get("outputPath"); + log.info("outputPath: {}", outputPath); - final String preparedInfoPath = parser.get("preparedInfoPath"); - log.info("preparedInfoPath: {}", preparedInfoPath); + final String preparedInfoPath = parser.get("preparedInfoPath"); + log.info("preparedInfoPath: {}", preparedInfoPath); - SparkConf conf = new SparkConf(); - conf.set("hive.metastore.uris", parser.get("hive_metastore_uris")); + SparkConf conf = new SparkConf(); + conf.set("hive.metastore.uris", parser.get("hive_metastore_uris")); - final String resultClassName = parser.get("resultTableName"); - log.info("resultTableName: {}", resultClassName); + final String resultClassName = parser.get("resultTableName"); + log.info("resultTableName: {}", resultClassName); - final Boolean saveGraph = Optional - .ofNullable(parser.get("saveGraph")) - .map(Boolean::valueOf) - .orElse(Boolean.TRUE); - log.info("saveGraph: {}", saveGraph); + final Boolean saveGraph = Optional + .ofNullable(parser.get("saveGraph")) + .map(Boolean::valueOf) + .orElse(Boolean.TRUE); + log.info("saveGraph: {}", saveGraph); - @SuppressWarnings("unchecked") - Class resultClazz = (Class) Class.forName(resultClassName); + @SuppressWarnings("unchecked") + Class resultClazz = (Class) Class.forName(resultClassName); - runWithSparkHiveSession( - conf, - isSparkSessionManaged, - spark -> { - if (isTest(parser)) { - removeOutputDir(spark, outputPath); - } - if (saveGraph) { - execPropagation( - spark, inputPath, outputPath, preparedInfoPath, resultClazz); - } - }); - } + runWithSparkHiveSession( + conf, + isSparkSessionManaged, + spark -> { + if (isTest(parser)) { + removeOutputDir(spark, outputPath); + } + if (saveGraph) { + execPropagation( + spark, inputPath, outputPath, preparedInfoPath, resultClazz); + } + }); + } - private static void execPropagation( - SparkSession spark, - String inputPath, - String outputPath, - String preparedInfoPath, - Class resultClazz) { + private static void execPropagation( + SparkSession spark, + String inputPath, + String outputPath, + String preparedInfoPath, + Class resultClazz) { - Dataset possibleUpdates = readPath(spark, preparedInfoPath, ResultCommunityList.class); - Dataset result = readPath(spark, inputPath, resultClazz); + Dataset possibleUpdates = readPath(spark, preparedInfoPath, ResultCommunityList.class); + Dataset result = readPath(spark, inputPath, resultClazz); - result - .joinWith( - possibleUpdates, - result.col("id").equalTo(possibleUpdates.col("resultId")), - "left_outer") - .map(contextUpdaterFn(), Encoders.bean(resultClazz)) - .write() - .mode(SaveMode.Overwrite) - .option("compression", "gzip") - .json(outputPath); + result + .joinWith( + possibleUpdates, + result.col("id").equalTo(possibleUpdates.col("resultId")), + "left_outer") + .map(contextUpdaterFn(), Encoders.bean(resultClazz)) + .write() + .mode(SaveMode.Overwrite) + .option("compression", "gzip") + .json(outputPath); - } + } - private static MapFunction, R> contextUpdaterFn() { - return value -> { - R ret = value._1(); - Optional rcl = Optional.ofNullable(value._2()); - if (rcl.isPresent()) { - Set contexts = new HashSet<>(); - ret.getContext().forEach(c -> contexts.add(c.getId())); - rcl - .get() - .getCommunityList() - .stream() - .forEach( - c -> { - if (!contexts.contains(c)) { - Context newContext = new Context(); - newContext.setId(c); - newContext - .setDataInfo( - Arrays - .asList( - getDataInfo( - PROPAGATION_DATA_INFO_TYPE, - PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, - PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME, - ModelConstants.DNET_PROVENANCE_ACTIONS))); - ret.getContext().add(newContext); - } + private static MapFunction, R> contextUpdaterFn() { + return value -> { + R ret = value._1(); + Optional rcl = Optional.ofNullable(value._2()); + if (rcl.isPresent()) { + Set contexts = new HashSet<>(); + ret.getContext().forEach(c -> contexts.add(c.getId())); + rcl + .get() + .getCommunityList() + .stream() + .forEach( + c -> { + if (!contexts.contains(c)) { + Context newContext = new Context(); + newContext.setId(c); + newContext + .setDataInfo( + Arrays + .asList( + getDataInfo( + PROPAGATION_DATA_INFO_TYPE, + PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID, + PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME, + ModelConstants.DNET_PROVENANCE_ACTIONS))); + ret.getContext().add(newContext); + } - }); + }); - } + } - return ret; - }; - } + return ret; + }; + } } diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/merge/MergeGraphTableSparkJob.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/merge/MergeGraphTableSparkJob.java index ed745b4fb..bb32e603b 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/merge/MergeGraphTableSparkJob.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/merge/MergeGraphTableSparkJob.java @@ -7,7 +7,6 @@ import java.util.*; import java.util.stream.Collectors; import java.util.stream.Stream; -import eu.dnetlib.dhp.schema.oaf.utils.MergeUtils; import org.apache.commons.io.IOUtils; import org.apache.spark.SparkConf; import org.apache.spark.api.java.function.FilterFunction; @@ -25,6 +24,7 @@ import eu.dnetlib.dhp.common.HdfsSupport; import eu.dnetlib.dhp.schema.common.ModelConstants; import eu.dnetlib.dhp.schema.common.ModelSupport; import eu.dnetlib.dhp.schema.oaf.*; +import eu.dnetlib.dhp.schema.oaf.utils.MergeUtils; import scala.Tuple2; /** @@ -251,7 +251,7 @@ public class MergeGraphTableSparkJob { return (T) MergeUtils.merge(b, a); } if (a instanceof Relation && b instanceof Relation) { - return (T) MergeUtils.mergeRelation((Relation)a, (Relation) b); + return (T) MergeUtils.mergeRelation((Relation) a, (Relation) b); } } return Objects.isNull(a) ? b : a; diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/AbstractMdRecordToOafMapper.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/AbstractMdRecordToOafMapper.java index 6cf297f61..c95d5442a 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/AbstractMdRecordToOafMapper.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/AbstractMdRecordToOafMapper.java @@ -98,8 +98,10 @@ public abstract class AbstractMdRecordToOafMapper { protected static final String DATACITE_SCHEMA_KERNEL_3 = "http://datacite.org/schema/kernel-3"; protected static final String DATACITE_SCHEMA_KERNEL_3_SLASH = "http://datacite.org/schema/kernel-3/"; - protected static final Qualifier ORCID_PID_TYPE = qualifier(ModelConstants.ORCID_PENDING, ModelConstants.ORCID_CLASSNAME, DNET_PID_TYPES, DNET_PID_TYPES); - protected static final Qualifier MAG_PID_TYPE = qualifier("MAGIdentifier", "Microsoft Academic Graph Identifier", DNET_PID_TYPES, DNET_PID_TYPES); + protected static final Qualifier ORCID_PID_TYPE = qualifier( + ModelConstants.ORCID_PENDING, ModelConstants.ORCID_CLASSNAME, DNET_PID_TYPES, DNET_PID_TYPES); + protected static final Qualifier MAG_PID_TYPE = qualifier( + "MAGIdentifier", "Microsoft Academic Graph Identifier", DNET_PID_TYPES, DNET_PID_TYPES); protected static final String DEFAULT_TRUST_FOR_VALIDATED_RELS = "0.999"; @@ -122,14 +124,14 @@ public abstract class AbstractMdRecordToOafMapper { static { IdentifierFactory.PID_AUTHORITY - .keySet() - .stream() - .forEach(entry -> pidTypeWithAuthority.put(entry.toString().toLowerCase(), entry.toString())); + .keySet() + .stream() + .forEach(entry -> pidTypeWithAuthority.put(entry.toString().toLowerCase(), entry.toString())); } protected AbstractMdRecordToOafMapper(final VocabularyGroup vocs, final boolean invisible, - final boolean shouldHashId, final boolean forceOriginalId) { + final boolean shouldHashId, final boolean forceOriginalId) { this.vocs = vocs; this.invisible = invisible; this.shouldHashId = shouldHashId; @@ -137,7 +139,7 @@ public abstract class AbstractMdRecordToOafMapper { } protected AbstractMdRecordToOafMapper(final VocabularyGroup vocs, final boolean invisible, - final boolean shouldHashId) { + final boolean shouldHashId) { this.vocs = vocs; this.invisible = invisible; this.shouldHashId = shouldHashId; @@ -149,20 +151,26 @@ public abstract class AbstractMdRecordToOafMapper { DocumentFactory.getInstance().setXPathNamespaceURIs(nsContext); try { final Document doc = DocumentHelper - .parseText(xml - .replaceAll(DATACITE_SCHEMA_KERNEL_4, DATACITE_SCHEMA_KERNEL_3) - .replaceAll(DATACITE_SCHEMA_KERNEL_4_SLASH, DATACITE_SCHEMA_KERNEL_3) - .replaceAll(DATACITE_SCHEMA_KERNEL_3_SLASH, DATACITE_SCHEMA_KERNEL_3)); + .parseText( + xml + .replaceAll(DATACITE_SCHEMA_KERNEL_4, DATACITE_SCHEMA_KERNEL_3) + .replaceAll(DATACITE_SCHEMA_KERNEL_4_SLASH, DATACITE_SCHEMA_KERNEL_3) + .replaceAll(DATACITE_SCHEMA_KERNEL_3_SLASH, DATACITE_SCHEMA_KERNEL_3)); - final KeyValue collectedFrom = getProvenanceDatasource(doc, "//oaf:collectedFrom/@id", "//oaf:collectedFrom/@name"); + final KeyValue collectedFrom = getProvenanceDatasource( + doc, "//oaf:collectedFrom/@id", "//oaf:collectedFrom/@name"); - if (collectedFrom == null) { return Lists.newArrayList(); } + if (collectedFrom == null) { + return Lists.newArrayList(); + } final KeyValue hostedBy = StringUtils.isBlank(doc.valueOf("//oaf:hostedBy/@id")) - ? collectedFrom - : getProvenanceDatasource(doc, "//oaf:hostedBy/@id", "//oaf:hostedBy/@name"); + ? collectedFrom + : getProvenanceDatasource(doc, "//oaf:hostedBy/@id", "//oaf:hostedBy/@name"); - if (hostedBy == null) { return Lists.newArrayList(); } + if (hostedBy == null) { + return Lists.newArrayList(); + } final DataInfo entityInfo = prepareDataInfo(doc, this.invisible); final long lastUpdateTimestamp = new Date().getTime(); @@ -183,15 +191,15 @@ public abstract class AbstractMdRecordToOafMapper { if (StringUtils.isBlank(type) && this.vocs.vocabularyExists(ModelConstants.DNET_RESULT_TYPOLOGIES)) { final String instanceType = instances - .stream() - .map(i -> i.getInstancetype().getClassid()) - .findFirst() - .filter(s -> !UNKNOWN.equalsIgnoreCase(s)) - .orElse("0000"); // Unknown + .stream() + .map(i -> i.getInstancetype().getClassid()) + .findFirst() + .filter(s -> !UNKNOWN.equalsIgnoreCase(s)) + .orElse("0000"); // Unknown return Optional - .ofNullable(this.vocs.getSynonymAsQualifier(ModelConstants.DNET_RESULT_TYPOLOGIES, instanceType)) - .map(Qualifier::getClassid) - .orElse("0000"); + .ofNullable(this.vocs.getSynonymAsQualifier(ModelConstants.DNET_RESULT_TYPOLOGIES, instanceType)) + .map(Qualifier::getClassid) + .orElse("0000"); } return type; @@ -201,18 +209,20 @@ public abstract class AbstractMdRecordToOafMapper { final String dsId = doc.valueOf(xpathId); final String dsName = doc.valueOf(xpathName); - if (StringUtils.isBlank(dsId) || StringUtils.isBlank(dsName)) { return null; } + if (StringUtils.isBlank(dsId) || StringUtils.isBlank(dsName)) { + return null; + } return keyValue(createOpenaireId(10, dsId, true), dsName); } protected List createOafs( - final Document doc, - final String type, - final List instances, - final KeyValue collectedFrom, - final DataInfo info, - final long lastUpdateTimestamp) { + final Document doc, + final String type, + final List instances, + final KeyValue collectedFrom, + final DataInfo info, + final long lastUpdateTimestamp) { final OafEntity entity = createEntity(doc, type, instances, collectedFrom, info, lastUpdateTimestamp); @@ -246,52 +256,52 @@ public abstract class AbstractMdRecordToOafMapper { } private OafEntity createEntity(final Document doc, - final String type, - final List instances, - final KeyValue collectedFrom, - final DataInfo info, - final long lastUpdateTimestamp) { + final String type, + final List instances, + final KeyValue collectedFrom, + final DataInfo info, + final long lastUpdateTimestamp) { switch (type.toLowerCase()) { - case "publication": - final Publication p = new Publication(); - populateResultFields(p, doc, instances, collectedFrom, info, lastUpdateTimestamp); - p.setJournal(prepareJournal(doc, info)); - return p; - case "dataset": - final Dataset d = new Dataset(); - populateResultFields(d, doc, instances, collectedFrom, info, lastUpdateTimestamp); - d.setStoragedate(prepareDatasetStorageDate(doc, info)); - d.setDevice(prepareDatasetDevice(doc, info)); - d.setSize(prepareDatasetSize(doc, info)); - d.setVersion(prepareDatasetVersion(doc, info)); - d.setLastmetadataupdate(prepareDatasetLastMetadataUpdate(doc, info)); - d.setMetadataversionnumber(prepareDatasetMetadataVersionNumber(doc, info)); - d.setGeolocation(prepareDatasetGeoLocations(doc, info)); - return d; - case "software": - final Software s = new Software(); - populateResultFields(s, doc, instances, collectedFrom, info, lastUpdateTimestamp); - s.setDocumentationUrl(prepareSoftwareDocumentationUrls(doc, info)); - s.setLicense(prepareSoftwareLicenses(doc, info)); - s.setCodeRepositoryUrl(prepareSoftwareCodeRepositoryUrl(doc, info)); - s.setProgrammingLanguage(prepareSoftwareProgrammingLanguage(doc, info)); - return s; - case "": - case "otherresearchproducts": - default: - final OtherResearchProduct o = new OtherResearchProduct(); - populateResultFields(o, doc, instances, collectedFrom, info, lastUpdateTimestamp); - o.setContactperson(prepareOtherResearchProductContactPersons(doc, info)); - o.setContactgroup(prepareOtherResearchProductContactGroups(doc, info)); - o.setTool(prepareOtherResearchProductTools(doc, info)); - return o; + case "publication": + final Publication p = new Publication(); + populateResultFields(p, doc, instances, collectedFrom, info, lastUpdateTimestamp); + p.setJournal(prepareJournal(doc, info)); + return p; + case "dataset": + final Dataset d = new Dataset(); + populateResultFields(d, doc, instances, collectedFrom, info, lastUpdateTimestamp); + d.setStoragedate(prepareDatasetStorageDate(doc, info)); + d.setDevice(prepareDatasetDevice(doc, info)); + d.setSize(prepareDatasetSize(doc, info)); + d.setVersion(prepareDatasetVersion(doc, info)); + d.setLastmetadataupdate(prepareDatasetLastMetadataUpdate(doc, info)); + d.setMetadataversionnumber(prepareDatasetMetadataVersionNumber(doc, info)); + d.setGeolocation(prepareDatasetGeoLocations(doc, info)); + return d; + case "software": + final Software s = new Software(); + populateResultFields(s, doc, instances, collectedFrom, info, lastUpdateTimestamp); + s.setDocumentationUrl(prepareSoftwareDocumentationUrls(doc, info)); + s.setLicense(prepareSoftwareLicenses(doc, info)); + s.setCodeRepositoryUrl(prepareSoftwareCodeRepositoryUrl(doc, info)); + s.setProgrammingLanguage(prepareSoftwareProgrammingLanguage(doc, info)); + return s; + case "": + case "otherresearchproducts": + default: + final OtherResearchProduct o = new OtherResearchProduct(); + populateResultFields(o, doc, instances, collectedFrom, info, lastUpdateTimestamp); + o.setContactperson(prepareOtherResearchProductContactPersons(doc, info)); + o.setContactgroup(prepareOtherResearchProductContactGroups(doc, info)); + o.setTool(prepareOtherResearchProductTools(doc, info)); + return o; } } private List addProjectRels( - final Document doc, - final OafEntity entity, - final DataInfo info) { + final Document doc, + final OafEntity entity, + final DataInfo info) { final List res = new ArrayList<>(); @@ -307,13 +317,21 @@ public abstract class AbstractMdRecordToOafMapper { final String projectId = createOpenaireId(40, originalId, true); res - .add(OafMapperUtils - .getRelation(docId, projectId, RESULT_PROJECT, OUTCOME, IS_PRODUCED_BY, entity.getCollectedfrom(), info, entity - .getLastupdatetimestamp(), validationdDate, null)); + .add( + OafMapperUtils + .getRelation( + docId, projectId, RESULT_PROJECT, OUTCOME, IS_PRODUCED_BY, entity.getCollectedfrom(), + info, entity + .getLastupdatetimestamp(), + validationdDate, null)); res - .add(OafMapperUtils - .getRelation(projectId, docId, RESULT_PROJECT, OUTCOME, PRODUCES, entity.getCollectedfrom(), info, entity - .getLastupdatetimestamp(), validationdDate, null)); + .add( + OafMapperUtils + .getRelation( + projectId, docId, RESULT_PROJECT, OUTCOME, PRODUCES, entity.getCollectedfrom(), info, + entity + .getLastupdatetimestamp(), + validationdDate, null)); } } @@ -333,11 +351,11 @@ public abstract class AbstractMdRecordToOafMapper { final String relClass = element.attributeValue("relClass"); if (StringUtils.isNotBlank(target) && StringUtils.isNotBlank(relType) && StringUtils.isNotBlank(subRelType) - && StringUtils.isNotBlank(relClass)) { + && StringUtils.isNotBlank(relClass)) { final String relClassInverse = ModelSupport - .findInverse(ModelSupport.rel(relType, subRelType, relClass)) - .getInverseRelClass(); + .findInverse(ModelSupport.rel(relType, subRelType, relClass)) + .getInverseRelClass(); final String validationDate = ((Node) o).valueOf("@validationDate"); if (StringUtils.isNotBlank(target)) { @@ -345,13 +363,21 @@ public abstract class AbstractMdRecordToOafMapper { if (StringUtils.isNotBlank(targetType)) { final String targetId = createOpenaireId(targetType, target, true); rels - .add(OafMapperUtils - .getRelation(entity.getId(), targetId, relType, subRelType, relClass, entity.getCollectedfrom(), info, entity - .getLastupdatetimestamp(), validationDate, null)); + .add( + OafMapperUtils + .getRelation( + entity.getId(), targetId, relType, subRelType, relClass, + entity.getCollectedfrom(), info, entity + .getLastupdatetimestamp(), + validationDate, null)); rels - .add(OafMapperUtils - .getRelation(targetId, entity.getId(), relType, subRelType, relClassInverse, entity.getCollectedfrom(), info, entity - .getLastupdatetimestamp(), validationDate, null)); + .add( + OafMapperUtils + .getRelation( + targetId, entity.getId(), relType, subRelType, relClassInverse, + entity.getCollectedfrom(), info, entity + .getLastupdatetimestamp(), + validationDate, null)); } } } @@ -384,30 +410,37 @@ public abstract class AbstractMdRecordToOafMapper { } rels - .add(OafMapperUtils - .getRelation(resultId, orgId, RESULT_ORGANIZATION, AFFILIATION, HAS_AUTHOR_INSTITUTION, entity.getCollectedfrom(), info, entity - .getLastupdatetimestamp(), null, properties)); + .add( + OafMapperUtils + .getRelation( + resultId, orgId, RESULT_ORGANIZATION, AFFILIATION, HAS_AUTHOR_INSTITUTION, + entity.getCollectedfrom(), info, entity + .getLastupdatetimestamp(), + null, properties)); rels - .add(OafMapperUtils - .getRelation(orgId, resultId, RESULT_ORGANIZATION, AFFILIATION, IS_AUTHOR_INSTITUTION_OF, entity - .getCollectedfrom(), info, entity.getLastupdatetimestamp(), null, properties)); + .add( + OafMapperUtils + .getRelation( + orgId, resultId, RESULT_ORGANIZATION, AFFILIATION, IS_AUTHOR_INSTITUTION_OF, entity + .getCollectedfrom(), + info, entity.getLastupdatetimestamp(), null, properties)); } } return rels; } protected abstract List addOtherResultRels( - final Document doc, - final OafEntity entity, - DataInfo info); + final Document doc, + final OafEntity entity, + DataInfo info); private void populateResultFields( - final Result r, - final Document doc, - final List instances, - final KeyValue collectedFrom, - final DataInfo info, - final long lastUpdateTimestamp) { + final Result r, + final Document doc, + final List instances, + final KeyValue collectedFrom, + final DataInfo info, + final long lastUpdateTimestamp) { r.setDataInfo(info); r.setLastupdatetimestamp(lastUpdateTimestamp); r.setId(createOpenaireId(50, doc.valueOf("//dri:objIdentifier"), false)); @@ -499,10 +532,10 @@ public abstract class AbstractMdRecordToOafMapper { protected abstract Qualifier prepareResourceType(Document doc, DataInfo info); protected abstract List prepareInstances( - Document doc, - DataInfo info, - KeyValue collectedfrom, - KeyValue hostedby); + Document doc, + DataInfo info, + KeyValue collectedfrom, + KeyValue hostedby); protected abstract List> prepareSources(Document doc, DataInfo info); @@ -527,16 +560,16 @@ public abstract class AbstractMdRecordToOafMapper { protected abstract List prepareAuthors(Document doc, DataInfo info); protected abstract List> prepareOtherResearchProductTools( - Document doc, - DataInfo info); + Document doc, + DataInfo info); protected abstract List> prepareOtherResearchProductContactGroups( - Document doc, - DataInfo info); + Document doc, + DataInfo info); protected abstract List> prepareOtherResearchProductContactPersons( - Document doc, - DataInfo info); + Document doc, + DataInfo info); protected abstract Qualifier prepareSoftwareProgrammingLanguage(Document doc, DataInfo info); @@ -545,8 +578,8 @@ public abstract class AbstractMdRecordToOafMapper { protected abstract List prepareSoftwareLicenses(Document doc, DataInfo info); protected abstract List> prepareSoftwareDocumentationUrls( - Document doc, - DataInfo info); + Document doc, + DataInfo info); protected abstract List prepareDatasetGeoLocations(Document doc, DataInfo info); @@ -566,13 +599,13 @@ public abstract class AbstractMdRecordToOafMapper { protected List prepareInstanceTypeMapping(final Document doc) { return Optional - .ofNullable(findOriginalType(doc)) - .map(originalType -> { - final List mappings = Lists.newArrayList(); - mappings.add(OafMapperUtils.instanceTypeMapping(originalType, OPENAIRE_COAR_RESOURCE_TYPES_3_1)); - return mappings; - }) - .orElse(new ArrayList<>()); + .ofNullable(findOriginalType(doc)) + .map(originalType -> { + final List mappings = Lists.newArrayList(); + mappings.add(OafMapperUtils.instanceTypeMapping(originalType, OPENAIRE_COAR_RESOURCE_TYPES_3_1)); + return mappings; + }) + .orElse(new ArrayList<>()); } private Journal prepareJournal(final Document doc, final DataInfo info) { @@ -587,7 +620,9 @@ public abstract class AbstractMdRecordToOafMapper { final String sp = n.valueOf("@sp"); final String vol = n.valueOf("@vol"); final String edition = n.valueOf("@edition"); - if (StringUtils.isNotBlank(name)) { return journal(name, issnPrinted, issnOnline, issnLinking, ep, iss, sp, vol, edition, null, null, info); } + if (StringUtils.isNotBlank(name)) { + return journal(name, issnPrinted, issnOnline, issnLinking, ep, iss, sp, vol, edition, null, null, info); + } } return null; } @@ -596,13 +631,18 @@ public abstract class AbstractMdRecordToOafMapper { final Node n = doc.selectSingleNode("//*[local-name()='provenance']/*[local-name()='originDescription']"); if (n != null) { final String id = n.valueOf("./*[local-name()='identifier']"); - if (StringUtils.isNotBlank(id)) { return Lists.newArrayList(id); } + if (StringUtils.isNotBlank(id)) { + return Lists.newArrayList(id); + } } final List idList = doc - .selectNodes("normalize-space(//*[local-name()='header']/*[local-name()='identifier' or local-name()='recordIdentifier']/text())"); + .selectNodes( + "normalize-space(//*[local-name()='header']/*[local-name()='identifier' or local-name()='recordIdentifier']/text())"); final Set originalIds = Sets.newHashSet(idList); - if (originalIds.isEmpty()) { throw new IllegalStateException("missing originalID on " + doc.asXML()); } + if (originalIds.isEmpty()) { + throw new IllegalStateException("missing originalID on " + doc.asXML()); + } return Lists.newArrayList(originalIds); } @@ -628,11 +668,11 @@ public abstract class AbstractMdRecordToOafMapper { } protected List prepareListStructPropsWithValidQualifier( - final Node node, - final String xpath, - final String xpathClassId, - final String schemeId, - final DataInfo info) { + final Node node, + final String xpath, + final String xpathClassId, + final String schemeId, + final DataInfo info) { final List res = new ArrayList<>(); for (final Object o : node.selectNodes(xpath)) { @@ -646,10 +686,10 @@ public abstract class AbstractMdRecordToOafMapper { } protected List prepareListStructProps( - final Node node, - final String xpath, - final Qualifier qualifier, - final DataInfo info) { + final Node node, + final String xpath, + final Qualifier qualifier, + final DataInfo info) { final List res = new ArrayList<>(); for (final Object o : node.selectNodes(xpath)) { final Node n = (Node) o; @@ -659,28 +699,34 @@ public abstract class AbstractMdRecordToOafMapper { } protected List prepareListStructProps( - final Node node, - final String xpath, - final DataInfo info) { + final Node node, + final String xpath, + final DataInfo info) { final List res = new ArrayList<>(); for (final Object o : node.selectNodes(xpath)) { final Node n = (Node) o; res - .add(structuredProperty(n.getText(), n.valueOf("@classid"), n.valueOf("@classname"), n.valueOf("@schemeid"), n - .valueOf("@schemename"), info)); + .add( + structuredProperty( + n.getText(), n.valueOf("@classid"), n.valueOf("@classname"), n.valueOf("@schemeid"), n + .valueOf("@schemename"), + info)); } return res; } protected List prepareSubjectList( - final Node node, - final String xpath, - final DataInfo info) { + final Node node, + final String xpath, + final DataInfo info) { final List res = new ArrayList<>(); for (final Object o : node.selectNodes(xpath)) { final Node n = (Node) o; res - .add(subject(n.getText(), n.valueOf("@classid"), n.valueOf("@classname"), n.valueOf("@schemeid"), n.valueOf("@schemename"), info)); + .add( + subject( + n.getText(), n.valueOf("@classid"), n.valueOf("@classname"), n.valueOf("@schemeid"), + n.valueOf("@schemename"), info)); } return res; } @@ -688,7 +734,9 @@ public abstract class AbstractMdRecordToOafMapper { protected OAIProvenance prepareOAIprovenance(final Document doc) { final Node n = doc.selectSingleNode("//*[local-name()='provenance']/*[local-name()='originDescription']"); - if (n == null) { return null; } + if (n == null) { + return null; + } final String identifier = n.valueOf("./*[local-name()='identifier']"); final String baseURL = n.valueOf("./*[local-name()='baseURL']"); @@ -703,7 +751,9 @@ public abstract class AbstractMdRecordToOafMapper { protected DataInfo prepareDataInfo(final Document doc, final boolean invisible) { final Node n = doc.selectSingleNode("//oaf:datainfo"); - if (n == null) { return dataInfo(false, null, false, invisible, REPOSITORY_PROVENANCE_ACTIONS, "0.9"); } + if (n == null) { + return dataInfo(false, null, false, invisible, REPOSITORY_PROVENANCE_ACTIONS, "0.9"); + } final String paClassId = n.valueOf("./oaf:provenanceaction/@classid"); final String paClassName = n.valueOf("./oaf:provenanceaction/@classname"); @@ -715,11 +765,14 @@ public abstract class AbstractMdRecordToOafMapper { final Boolean inferred = Boolean.parseBoolean(n.valueOf("./oaf:inferred")); final String trust = n.valueOf("./oaf:trust"); - return dataInfo(deletedbyinference, inferenceprovenance, inferred, invisible, qualifier(paClassId, paClassName, paSchemeId, paSchemeName), trust); + return dataInfo( + deletedbyinference, inferenceprovenance, inferred, invisible, + qualifier(paClassId, paClassName, paSchemeId, paSchemeName), trust); } protected List> prepareListURL(final Node node, final String xpath, final DataInfo info) { - return listFields(info, prepareListString(node, xpath) + return listFields( + info, prepareListString(node, xpath) .stream() .filter(URL_VALIDATOR::isValid) .collect(Collectors.toList())); @@ -730,9 +783,9 @@ public abstract class AbstractMdRecordToOafMapper { } protected List> prepareListFields( - final Node node, - final String xpath, - final DataInfo info) { + final Node node, + final String xpath, + final DataInfo info) { return listFields(info, prepareListString(node, xpath)); } @@ -749,11 +802,13 @@ public abstract class AbstractMdRecordToOafMapper { protected Set validateUrl(final Collection url) { - if (Objects.isNull(url)) { return new HashSet<>(); } + if (Objects.isNull(url)) { + return new HashSet<>(); + } return url - .stream() - .filter(URL_VALIDATOR::isValid) - .collect(Collectors.toCollection(HashSet::new)); + .stream() + .filter(URL_VALIDATOR::isValid) + .collect(Collectors.toCollection(HashSet::new)); } } diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/GenerateEntitiesApplication.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/GenerateEntitiesApplication.java index 9e1f3c78c..df8728851 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/GenerateEntitiesApplication.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/GenerateEntitiesApplication.java @@ -1,16 +1,13 @@ package eu.dnetlib.dhp.oa.graph.raw; -import com.fasterxml.jackson.databind.ObjectMapper; -import eu.dnetlib.dhp.application.ArgumentApplicationParser; -import eu.dnetlib.dhp.common.HdfsSupport; -import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup; -import eu.dnetlib.dhp.oa.graph.raw.common.AbstractMigrationApplication; -import eu.dnetlib.dhp.schema.common.ModelSupport; -import eu.dnetlib.dhp.schema.oaf.*; -import eu.dnetlib.dhp.schema.oaf.utils.MergeUtils; -import eu.dnetlib.dhp.utils.ISLookupClientFactory; -import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; +import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; + +import java.util.Arrays; +import java.util.List; +import java.util.Objects; +import java.util.Optional; + import org.apache.commons.io.IOUtils; import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.io.Text; @@ -21,15 +18,20 @@ import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.SparkSession; import org.slf4j.Logger; import org.slf4j.LoggerFactory; + +import com.fasterxml.jackson.databind.ObjectMapper; + +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.common.HdfsSupport; +import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup; +import eu.dnetlib.dhp.oa.graph.raw.common.AbstractMigrationApplication; +import eu.dnetlib.dhp.schema.common.ModelSupport; +import eu.dnetlib.dhp.schema.oaf.*; +import eu.dnetlib.dhp.schema.oaf.utils.MergeUtils; +import eu.dnetlib.dhp.utils.ISLookupClientFactory; +import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; import scala.Tuple2; -import java.util.Arrays; -import java.util.List; -import java.util.Objects; -import java.util.Optional; - -import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; - public class GenerateEntitiesApplication extends AbstractMigrationApplication { private static final Logger log = LoggerFactory.getLogger(GenerateEntitiesApplication.class); diff --git a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/raw/GenerateEntitiesApplicationTest.java b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/raw/GenerateEntitiesApplicationTest.java index bf275814c..c2f3faf29 100644 --- a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/raw/GenerateEntitiesApplicationTest.java +++ b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/raw/GenerateEntitiesApplicationTest.java @@ -1,12 +1,13 @@ package eu.dnetlib.dhp.oa.graph.raw; -import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup; -import eu.dnetlib.dhp.schema.common.ModelConstants; -import eu.dnetlib.dhp.schema.oaf.*; -import eu.dnetlib.dhp.schema.oaf.utils.MergeUtils; -import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException; -import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Mockito.lenient; + +import java.io.IOException; +import java.util.List; + import org.apache.commons.io.IOUtils; import org.dom4j.DocumentException; import org.junit.jupiter.api.BeforeEach; @@ -15,12 +16,12 @@ import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; -import java.io.IOException; -import java.util.List; - -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertTrue; -import static org.mockito.Mockito.lenient; +import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup; +import eu.dnetlib.dhp.schema.common.ModelConstants; +import eu.dnetlib.dhp.schema.oaf.*; +import eu.dnetlib.dhp.schema.oaf.utils.MergeUtils; +import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException; +import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; @ExtendWith(MockitoExtension.class) class GenerateEntitiesApplicationTest { diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/utils/XmlRecordFactory.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/utils/XmlRecordFactory.java index d20996d27..63597c61e 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/utils/XmlRecordFactory.java +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/utils/XmlRecordFactory.java @@ -1137,7 +1137,8 @@ public class XmlRecordFactory implements Serializable { XmlSerializationUtils .asXmlElement("dateofacceptance", i.getDateofacceptance().getValue())); } - if (i.getInstancetype() != null && StringUtils.isNotBlank(i.getInstancetype().getClassid())) { + if (i.getInstancetype() != null + && StringUtils.isNotBlank(i.getInstancetype().getClassid())) { instanceFields .add(XmlSerializationUtils.mapQualifier("instancetype", i.getInstancetype())); } @@ -1178,7 +1179,8 @@ public class XmlRecordFactory implements Serializable { if (re.getDatasourcetypeui() != null && StringUtils.isNotBlank(re.getDatasourcetypeui().getClassid())) { metadata.add(XmlSerializationUtils.mapQualifier("datasourcetypeui", re.getDatasourcetypeui())); } - if (re.getOpenairecompatibility() != null && StringUtils.isNotBlank(re.getOpenairecompatibility().getClassid())) { + if (re.getOpenairecompatibility() != null + && StringUtils.isNotBlank(re.getOpenairecompatibility().getClassid())) { metadata .add( XmlSerializationUtils @@ -1285,7 +1287,8 @@ public class XmlRecordFactory implements Serializable { groupInstancesByUrl(((Result) entity).getInstance()).forEach(instance -> { final List fields = Lists.newArrayList(); - if (instance.getAccessright() != null && StringUtils.isNotBlank(instance.getAccessright().getClassid())) { + if (instance.getAccessright() != null + && StringUtils.isNotBlank(instance.getAccessright().getClassid())) { fields .add(XmlSerializationUtils.mapQualifier("accessright", instance.getAccessright())); } diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/utils/XmlSerializationUtils.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/utils/XmlSerializationUtils.java index 8ec09090e..deacac3ad 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/utils/XmlSerializationUtils.java +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/utils/XmlSerializationUtils.java @@ -7,10 +7,11 @@ import static org.apache.commons.lang3.StringUtils.isNotBlank; import java.util.List; +import org.apache.commons.lang3.StringUtils; + import com.google.common.collect.Lists; import eu.dnetlib.dhp.schema.oaf.*; -import org.apache.commons.lang3.StringUtils; import scala.Tuple2; public class XmlSerializationUtils { diff --git a/dhp-workflows/dhp-stats-hist-snaps/pom.xml b/dhp-workflows/dhp-stats-hist-snaps/pom.xml index 08ddcef49..b31d909f9 100644 --- a/dhp-workflows/dhp-stats-hist-snaps/pom.xml +++ b/dhp-workflows/dhp-stats-hist-snaps/pom.xml @@ -3,7 +3,7 @@ dhp-workflows eu.dnetlib.dhp - 1.2.4-SNAPSHOT + 1.2.5-SNAPSHOT 4.0.0 dhp-stats-hist-snaps diff --git a/dhp-workflows/dhp-stats-monitor-irish/pom.xml b/dhp-workflows/dhp-stats-monitor-irish/pom.xml index 9d850a4bb..6ab19dced 100644 --- a/dhp-workflows/dhp-stats-monitor-irish/pom.xml +++ b/dhp-workflows/dhp-stats-monitor-irish/pom.xml @@ -3,7 +3,7 @@ dhp-workflows eu.dnetlib.dhp - 1.2.4-SNAPSHOT + 1.2.5-SNAPSHOT 4.0.0 dhp-stats-monitor-irish diff --git a/dhp-workflows/dhp-stats-monitor-update/pom.xml b/dhp-workflows/dhp-stats-monitor-update/pom.xml index f63c7568a..f2bc35f8d 100644 --- a/dhp-workflows/dhp-stats-monitor-update/pom.xml +++ b/dhp-workflows/dhp-stats-monitor-update/pom.xml @@ -3,7 +3,7 @@ dhp-workflows eu.dnetlib.dhp - 1.2.4-SNAPSHOT + 1.2.5-SNAPSHOT 4.0.0 dhp-stats-monitor-update diff --git a/dhp-workflows/pom.xml b/dhp-workflows/pom.xml index 369c71b5b..1c331d126 100644 --- a/dhp-workflows/pom.xml +++ b/dhp-workflows/pom.xml @@ -31,6 +31,10 @@ dhp-enrichment dhp-graph-provision dhp-blacklist + dhp-stats-actionsets + dhp-stats-hist-snaps + dhp-stats-monitor-irish + dhp-stats-monitor-update dhp-stats-update dhp-stats-promote dhp-usage-stats-build