diff --git a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/project/httpconnector/HttpConnectorTest.java b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/project/httpconnector/HttpConnectorTest.java index 9370efdb34..90b3919ed9 100644 --- a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/project/httpconnector/HttpConnectorTest.java +++ b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/project/httpconnector/HttpConnectorTest.java @@ -9,7 +9,6 @@ import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; - @Disabled public class HttpConnectorTest { diff --git a/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/java/eu/dnetlib/dhp/provision/SparkGenerateScholixIndex.scala b/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/java/eu/dnetlib/dhp/provision/SparkGenerateScholixIndex.scala index dbf6de05f3..d39e38bfcc 100644 --- a/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/java/eu/dnetlib/dhp/provision/SparkGenerateScholixIndex.scala +++ b/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/java/eu/dnetlib/dhp/provision/SparkGenerateScholixIndex.scala @@ -6,11 +6,36 @@ import eu.dnetlib.dhp.provision.scholix.summary.ScholixSummary import eu.dnetlib.dhp.schema.oaf.Relation import org.apache.commons.io.IOUtils import org.apache.spark.SparkConf +import org.apache.spark.sql.expressions.Aggregator import org.apache.spark.sql.{Dataset, Encoder, Encoders, SaveMode, SparkSession} object SparkGenerateScholixIndex { + + def getScholixAggregator(): Aggregator[(String, Scholix), Scholix, Scholix] = new Aggregator[(String, Scholix), Scholix, Scholix]{ + + override def zero: Scholix = new Scholix() + + override def reduce(b: Scholix, a: (String, Scholix)): Scholix = { + b.mergeFrom(a._2) + b + } + + override def merge(wx: Scholix, wy: Scholix): Scholix = { + wx.mergeFrom(wy) + wx + } + override def finish(reduction: Scholix): Scholix = reduction + + override def bufferEncoder: Encoder[Scholix] = + Encoders.kryo(classOf[Scholix]) + + override def outputEncoder: Encoder[Scholix] = + Encoders.kryo(classOf[Scholix]) + } + + def main(args: Array[String]): Unit = { val parser = new ArgumentApplicationParser(IOUtils.toString(SparkGenerateScholixIndex.getClass.getResourceAsStream("/eu/dnetlib/dhp/provision/input_generate_summary_parameters.json"))) parser.parseArgument(args) @@ -40,7 +65,7 @@ object SparkGenerateScholixIndex { (relation.getTarget, Scholix.generateScholixWithSource(summary,relation)) - }).write.mode(SaveMode.Overwrite).save(s"$workingDirPath/scholix_source") + }).repartition(6000).write.mode(SaveMode.Overwrite).save(s"$workingDirPath/scholix_source") val sTarget:Dataset[(String,Scholix)] = spark.read.load(s"$workingDirPath/scholix_source").as[(String, Scholix)] @@ -53,9 +78,16 @@ object SparkGenerateScholixIndex { scholix.generateIdentifier() scholix.generatelinkPublisher() scholix - }).write.mode(SaveMode.Overwrite).save(s"$workingDirPath/scholix") + }).repartition(6000).write.mode(SaveMode.Overwrite).save(s"$workingDirPath/scholix_r") + val finalScholix:Dataset[Scholix] = spark.read.load(s"$workingDirPath/scholix_r").as[Scholix] + + finalScholix.map(d => (d.getIdentifier, d))(Encoders.tuple(Encoders.STRING, scholixEncoder)) + .groupByKey(_._1)(Encoders.STRING) + .agg(getScholixAggregator().toColumn) + .map(p => p._2) + .write.mode(SaveMode.Overwrite).save(s"$workingDirPath/scholix") } diff --git a/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/java/eu/dnetlib/dhp/provision/scholix/Scholix.java b/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/java/eu/dnetlib/dhp/provision/scholix/Scholix.java index d714155131..6ea8ff735c 100644 --- a/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/java/eu/dnetlib/dhp/provision/scholix/Scholix.java +++ b/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/java/eu/dnetlib/dhp/provision/scholix/Scholix.java @@ -5,6 +5,8 @@ import java.io.Serializable; import java.util.*; import java.util.stream.Collectors; +import org.apache.commons.lang3.StringUtils; + import com.fasterxml.jackson.databind.ObjectMapper; import eu.dnetlib.dhp.provision.scholix.summary.ScholixSummary; @@ -91,13 +93,91 @@ public class Scholix implements Serializable { s.setSource(ScholixResource.fromSummary(scholixSummary)); s.setIdentifier(rel.getTarget()); - // ScholixResource mockTarget = new ScholixResource(); - // mockTarget.setDnetIdentifier(rel.getTarget()); - // s.setTarget(mockTarget); - // s.generateIdentifier(); return s; } + private List mergeScholixEntityId(final List a, final List b) { + final List m = new ArrayList<>(a); + if (b != null) + b.forEach(s -> { + int tt = (int) m.stream().filter(t -> t.getName().equalsIgnoreCase(s.getName())).count(); + if (tt == 0) { + m.add(s); + } + }); + return m; + } + + private List mergeScholixIdnetifier(final List a, + final List b) { + final List m = new ArrayList<>(a); + if (b != null) + b.forEach(s -> { + int tt = (int) m.stream().filter(t -> t.getIdentifier().equalsIgnoreCase(s.getIdentifier())).count(); + if (tt == 0) { + m.add(s); + } + }); + return m; + } + + private List mergeScholixCollectedFrom(final List a, + final List b) { + final List m = new ArrayList<>(a); + if (b != null) + b.forEach(s -> { + int tt = (int) m + .stream() + .filter(t -> t.getProvider().getName().equalsIgnoreCase(s.getProvider().getName())) + .count(); + if (tt == 0) { + m.add(s); + } + }); + return m; + } + + private ScholixRelationship mergeRelationships(final ScholixRelationship a, final ScholixRelationship b) { + ScholixRelationship result = new ScholixRelationship(); + result.setName(StringUtils.isEmpty(a.getName()) ? b.getName() : a.getName()); + result.setInverse(StringUtils.isEmpty(a.getInverse()) ? b.getInverse() : a.getInverse()); + result.setSchema(StringUtils.isEmpty(a.getSchema()) ? b.getSchema() : a.getSchema()); + return result; + } + + private ScholixResource mergeResource(final ScholixResource a, final ScholixResource b) { + + final ScholixResource result = new ScholixResource(); + result.setCollectedFrom(mergeScholixCollectedFrom(a.getCollectedFrom(), b.getCollectedFrom())); + result.setCreator(mergeScholixEntityId(a.getCreator(), b.getCreator())); + result + .setDnetIdentifier( + StringUtils.isBlank(a.getDnetIdentifier()) ? b.getDnetIdentifier() : a.getDnetIdentifier()); + result.setIdentifier(mergeScholixIdnetifier(a.getIdentifier(), b.getIdentifier())); + result.setObjectType(StringUtils.isNotBlank(a.getObjectType()) ? a.getObjectType() : b.getObjectType()); + result + .setObjectSubType( + StringUtils.isNotBlank(a.getObjectSubType()) ? a.getObjectSubType() : b.getObjectSubType()); + result.setPublisher(mergeScholixEntityId(a.getPublisher(), b.getPublisher())); + result + .setPublicationDate( + StringUtils.isNotBlank(a.getPublicationDate()) ? a.getPublicationDate() : b.getPublicationDate()); + result.setTitle(StringUtils.isNotBlank(a.getTitle()) ? a.getTitle() : b.getTitle()); + return result; + + } + + public void mergeFrom(final Scholix other) { + linkprovider = mergeScholixEntityId(linkprovider, other.getLinkprovider()); + publisher = mergeScholixEntityId(publisher, other.getPublisher()); + if (StringUtils.isEmpty(publicationDate)) + publicationDate = other.getPublicationDate(); + relationship = mergeRelationships(relationship, other.getRelationship()); + source = mergeResource(source, other.getSource()); + target = mergeResource(target, other.getTarget()); + generateIdentifier(); + } + public void generatelinkPublisher() { Set publisher = new HashSet<>(); if (source.getPublisher() != null) diff --git a/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/resources/eu/dnetlib/dhp/sx/provision/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/resources/eu/dnetlib/dhp/sx/provision/oozie_app/workflow.xml index 83c70fa255..c2c2a78fb3 100644 --- a/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/resources/eu/dnetlib/dhp/sx/provision/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/resources/eu/dnetlib/dhp/sx/provision/oozie_app/workflow.xml @@ -108,7 +108,7 @@ -m yarn-cluster --workingPath${workingDirPath} - +