diff --git a/dhp-common/src/main/java/eu/dnetlib/dhp/common/MakeTarArchive.java b/dhp-common/src/main/java/eu/dnetlib/dhp/common/MakeTarArchive.java index 5dff1d0a8..4047fdca4 100644 --- a/dhp-common/src/main/java/eu/dnetlib/dhp/common/MakeTarArchive.java +++ b/dhp-common/src/main/java/eu/dnetlib/dhp/common/MakeTarArchive.java @@ -1,119 +1,117 @@ -package eu.dnetlib.dhp.common; -import org.apache.commons.compress.archivers.tar.TarArchiveEntry; -import org.apache.commons.compress.archivers.tar.TarArchiveOutputStream; -import org.apache.hadoop.fs.*; +package eu.dnetlib.dhp.common; import java.io.BufferedInputStream; import java.io.IOException; import java.io.InputStream; import java.io.Serializable; +import org.apache.commons.compress.archivers.tar.TarArchiveEntry; +import org.apache.commons.compress.archivers.tar.TarArchiveOutputStream; +import org.apache.hadoop.fs.*; + public class MakeTarArchive implements Serializable { - private static TarArchiveOutputStream getTar(FileSystem fileSystem, String outputPath) throws IOException { - Path hdfsWritePath = new Path(outputPath); - FSDataOutputStream fsDataOutputStream = null; - if (fileSystem.exists(hdfsWritePath)) { - fileSystem.delete(hdfsWritePath, true); + private static TarArchiveOutputStream getTar(FileSystem fileSystem, String outputPath) throws IOException { + Path hdfsWritePath = new Path(outputPath); + FSDataOutputStream fsDataOutputStream = null; + if (fileSystem.exists(hdfsWritePath)) { + fileSystem.delete(hdfsWritePath, true); - } - fsDataOutputStream = fileSystem.create(hdfsWritePath); + } + fsDataOutputStream = fileSystem.create(hdfsWritePath); - return new TarArchiveOutputStream(fsDataOutputStream.getWrappedStream()); - } + return new TarArchiveOutputStream(fsDataOutputStream.getWrappedStream()); + } - private static void write(FileSystem fileSystem, String inputPath, String outputPath, String dir_name) - throws IOException { + private static void write(FileSystem fileSystem, String inputPath, String outputPath, String dir_name) + throws IOException { - Path hdfsWritePath = new Path(outputPath); - FSDataOutputStream fsDataOutputStream = null; - if (fileSystem.exists(hdfsWritePath)) { - fileSystem.delete(hdfsWritePath, true); + Path hdfsWritePath = new Path(outputPath); + FSDataOutputStream fsDataOutputStream = null; + if (fileSystem.exists(hdfsWritePath)) { + fileSystem.delete(hdfsWritePath, true); - } - fsDataOutputStream = fileSystem.create(hdfsWritePath); + } + fsDataOutputStream = fileSystem.create(hdfsWritePath); - TarArchiveOutputStream ar = new TarArchiveOutputStream(fsDataOutputStream.getWrappedStream()); + TarArchiveOutputStream ar = new TarArchiveOutputStream(fsDataOutputStream.getWrappedStream()); - RemoteIterator fileStatusListIterator = fileSystem - .listFiles( - new Path(inputPath), true); + RemoteIterator fileStatusListIterator = fileSystem + .listFiles( + new Path(inputPath), true); - while (fileStatusListIterator.hasNext()) { - writeCurrentFile(fileSystem, dir_name, fileStatusListIterator, ar, 0); - } + while (fileStatusListIterator.hasNext()) { + writeCurrentFile(fileSystem, dir_name, fileStatusListIterator, ar, 0); + } - ar.close(); - } + ar.close(); + } - public static void tarMaxSize(FileSystem fileSystem, String inputPath, String outputPath, String dir_name, - int gBperSplit) throws IOException { - final long bytesPerSplit = 1024L * 1024L * 1024L * gBperSplit; + public static void tarMaxSize(FileSystem fileSystem, String inputPath, String outputPath, String dir_name, + int gBperSplit) throws IOException { + final long bytesPerSplit = 1024L * 1024L * 1024L * gBperSplit; - long sourceSize = fileSystem.getContentSummary(new Path(inputPath)).getSpaceConsumed(); + long sourceSize = fileSystem.getContentSummary(new Path(inputPath)).getSpaceConsumed(); - if (sourceSize < bytesPerSplit) { - write(fileSystem, inputPath, outputPath + ".tar", dir_name); - } else { - int partNum = 0; + if (sourceSize < bytesPerSplit) { + write(fileSystem, inputPath, outputPath + ".tar", dir_name); + } else { + int partNum = 0; - RemoteIterator fileStatusListIterator = fileSystem - .listFiles( - new Path(inputPath), true); - boolean next = fileStatusListIterator.hasNext(); - while (next) { - TarArchiveOutputStream ar = getTar(fileSystem, outputPath + "_" + (partNum + 1) + ".tar"); + RemoteIterator fileStatusListIterator = fileSystem + .listFiles( + new Path(inputPath), true); + boolean next = fileStatusListIterator.hasNext(); + while (next) { + TarArchiveOutputStream ar = getTar(fileSystem, outputPath + "_" + (partNum + 1) + ".tar"); - long current_size = 0; - while (next && current_size < bytesPerSplit) { - current_size = writeCurrentFile(fileSystem, dir_name, fileStatusListIterator, ar, current_size); - next = fileStatusListIterator.hasNext(); + long current_size = 0; + while (next && current_size < bytesPerSplit) { + current_size = writeCurrentFile(fileSystem, dir_name, fileStatusListIterator, ar, current_size); + next = fileStatusListIterator.hasNext(); - } + } - partNum += 1; - ar.close(); - } + partNum += 1; + ar.close(); + } - } + } - } + } - private static long writeCurrentFile(FileSystem fileSystem, String dir_name, - RemoteIterator fileStatusListIterator, - TarArchiveOutputStream ar, long current_size) throws IOException { - LocatedFileStatus fileStatus = fileStatusListIterator.next(); + private static long writeCurrentFile(FileSystem fileSystem, String dir_name, + RemoteIterator fileStatusListIterator, + TarArchiveOutputStream ar, long current_size) throws IOException { + LocatedFileStatus fileStatus = fileStatusListIterator.next(); - Path p = fileStatus.getPath(); - String p_string = p.toString(); - if (!p_string.endsWith("_SUCCESS")) { - String name = p_string.substring(p_string.lastIndexOf("/") + 1); - if (name.trim().equalsIgnoreCase("communities_infrastructures")) { - name = "communities_infrastructures.json"; - } - TarArchiveEntry entry = new TarArchiveEntry(dir_name + "/" + name); - entry.setSize(fileStatus.getLen()); - current_size += fileStatus.getLen(); - ar.putArchiveEntry(entry); + Path p = fileStatus.getPath(); + String p_string = p.toString(); + if (!p_string.endsWith("_SUCCESS")) { + String name = p_string.substring(p_string.lastIndexOf("/") + 1); + if (name.trim().equalsIgnoreCase("communities_infrastructures")) { + name = "communities_infrastructures.json"; + } + TarArchiveEntry entry = new TarArchiveEntry(dir_name + "/" + name); + entry.setSize(fileStatus.getLen()); + current_size += fileStatus.getLen(); + ar.putArchiveEntry(entry); - InputStream is = fileSystem.open(fileStatus.getPath()); - - BufferedInputStream bis = new BufferedInputStream(is); - - int count; - byte data[] = new byte[1024]; - while ((count = bis.read(data, 0, data.length)) != -1) { - ar.write(data, 0, count); - } - bis.close(); - ar.closeArchiveEntry(); - - } - return current_size; - } + InputStream is = fileSystem.open(fileStatus.getPath()); + BufferedInputStream bis = new BufferedInputStream(is); + int count; + byte data[] = new byte[1024]; + while ((count = bis.read(data, 0, data.length)) != -1) { + ar.write(data, 0, count); + } + bis.close(); + ar.closeArchiveEntry(); + } + return current_size; + } } diff --git a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/project/httpconnector/HttpConnectorTest.java b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/project/httpconnector/HttpConnectorTest.java index 9370efdb3..90b3919ed 100644 --- a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/project/httpconnector/HttpConnectorTest.java +++ b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/project/httpconnector/HttpConnectorTest.java @@ -9,7 +9,6 @@ import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; - @Disabled public class HttpConnectorTest { diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/CleaningFunctions.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/CleaningFunctions.java index 12543d8c7..56a4aaf5a 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/CleaningFunctions.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/CleaningFunctions.java @@ -109,20 +109,20 @@ public class CleaningFunctions { } if (Objects.nonNull(r.getPid())) { r - .setPid( - r - .getPid() - .stream() - .filter(Objects::nonNull) - .filter(sp -> StringUtils.isNotBlank(StringUtils.trim(sp.getValue()))) - .filter(sp -> NONE.equalsIgnoreCase(sp.getValue())) - .filter(sp -> Objects.nonNull(sp.getQualifier())) - .filter(sp -> StringUtils.isNotBlank(sp.getQualifier().getClassid())) - .map(sp -> { - sp.setValue(StringUtils.trim(sp.getValue())); - return sp; - }) - .collect(Collectors.toList())); + .setPid( + r + .getPid() + .stream() + .filter(Objects::nonNull) + .filter(sp -> StringUtils.isNotBlank(StringUtils.trim(sp.getValue()))) + .filter(sp -> NONE.equalsIgnoreCase(sp.getValue())) + .filter(sp -> Objects.nonNull(sp.getQualifier())) + .filter(sp -> StringUtils.isNotBlank(sp.getQualifier().getClassid())) + .map(sp -> { + sp.setValue(StringUtils.trim(sp.getValue())); + return sp; + }) + .collect(Collectors.toList())); } if (Objects.isNull(r.getResourcetype()) || StringUtils.isBlank(r.getResourcetype().getClassid())) { r diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/dump/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/dump/oozie_app/workflow.xml index 98081c285..161fd2dec 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/dump/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/dump/oozie_app/workflow.xml @@ -1,18 +1,18 @@ - - sourcePath - the source path - - - isLookUpUrl - the isLookup service endpoint - - - outputPath - the output path - + + sourcePath + the source path + + + isLookUpUrl + the isLookup service endpoint + + + outputPath + the output path + accessToken the access token used for the deposition in Zenodo @@ -320,6 +320,7 @@ + yarn @@ -344,6 +345,7 @@ + yarn @@ -371,43 +373,42 @@ - - - yarn - cluster - Split dumped result for community - eu.dnetlib.dhp.oa.graph.dump.community.SparkSplitForCommunity - dhp-graph-mapper-${projectVersion}.jar - - --executor-memory=${sparkExecutorMemory} - --executor-cores=${sparkExecutorCores} - --driver-memory=${sparkDriverMemory} - --conf spark.extraListeners=${spark2ExtraListeners} - --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} - --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} - --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} - --conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir} - - --sourcePath${workingDir}/ext - --outputPath${workingDir}/split - --communityMapPath${workingDir}/communityMap - - - - + + + yarn + cluster + Split dumped result for community + eu.dnetlib.dhp.oa.graph.dump.community.SparkSplitForCommunity + dhp-graph-mapper-${projectVersion}.jar + + --executor-memory=${sparkExecutorMemory} + --executor-cores=${sparkExecutorCores} + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir} + + --sourcePath${workingDir}/ext + --outputPath${workingDir}/split + --communityMapPath${workingDir}/communityMap + + + + eu.dnetlib.dhp.oa.graph.dump.MakeTar --hdfsPath${outputPath} --nameNode${nameNode} - --sourcePath${workingDir}/split + --sourcePath${workingDir}/split - eu.dnetlib.dhp.oa.graph.dump.SendToZenodoHDFS @@ -424,8 +425,6 @@ - - \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/dump_whole/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/dump_whole/oozie_app/workflow.xml index 793c1ed33..e5001bf43 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/dump_whole/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/dump_whole/oozie_app/workflow.xml @@ -1,18 +1,18 @@ - - sourcePath - the source path - - - isLookUpUrl - the isLookup service endpoint - - - outputPath - the output path - + + sourcePath + the source path + + + isLookUpUrl + the isLookup service endpoint + + + outputPath + the output path + resultAggregation true if all the result type have to be dumped under result. false otherwise @@ -357,10 +357,8 @@ - - @@ -389,7 +387,6 @@ - yarn @@ -418,7 +415,6 @@ - @@ -530,7 +526,6 @@ - @@ -568,8 +563,7 @@ - - + eu.dnetlib.dhp.oa.graph.dump.SendToZenodoHDFS diff --git a/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/java/eu/dnetlib/dhp/provision/SparkGenerateScholixIndex.scala b/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/java/eu/dnetlib/dhp/provision/SparkGenerateScholixIndex.scala index dbf6de05f..d39e38bfc 100644 --- a/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/java/eu/dnetlib/dhp/provision/SparkGenerateScholixIndex.scala +++ b/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/java/eu/dnetlib/dhp/provision/SparkGenerateScholixIndex.scala @@ -6,11 +6,36 @@ import eu.dnetlib.dhp.provision.scholix.summary.ScholixSummary import eu.dnetlib.dhp.schema.oaf.Relation import org.apache.commons.io.IOUtils import org.apache.spark.SparkConf +import org.apache.spark.sql.expressions.Aggregator import org.apache.spark.sql.{Dataset, Encoder, Encoders, SaveMode, SparkSession} object SparkGenerateScholixIndex { + + def getScholixAggregator(): Aggregator[(String, Scholix), Scholix, Scholix] = new Aggregator[(String, Scholix), Scholix, Scholix]{ + + override def zero: Scholix = new Scholix() + + override def reduce(b: Scholix, a: (String, Scholix)): Scholix = { + b.mergeFrom(a._2) + b + } + + override def merge(wx: Scholix, wy: Scholix): Scholix = { + wx.mergeFrom(wy) + wx + } + override def finish(reduction: Scholix): Scholix = reduction + + override def bufferEncoder: Encoder[Scholix] = + Encoders.kryo(classOf[Scholix]) + + override def outputEncoder: Encoder[Scholix] = + Encoders.kryo(classOf[Scholix]) + } + + def main(args: Array[String]): Unit = { val parser = new ArgumentApplicationParser(IOUtils.toString(SparkGenerateScholixIndex.getClass.getResourceAsStream("/eu/dnetlib/dhp/provision/input_generate_summary_parameters.json"))) parser.parseArgument(args) @@ -40,7 +65,7 @@ object SparkGenerateScholixIndex { (relation.getTarget, Scholix.generateScholixWithSource(summary,relation)) - }).write.mode(SaveMode.Overwrite).save(s"$workingDirPath/scholix_source") + }).repartition(6000).write.mode(SaveMode.Overwrite).save(s"$workingDirPath/scholix_source") val sTarget:Dataset[(String,Scholix)] = spark.read.load(s"$workingDirPath/scholix_source").as[(String, Scholix)] @@ -53,9 +78,16 @@ object SparkGenerateScholixIndex { scholix.generateIdentifier() scholix.generatelinkPublisher() scholix - }).write.mode(SaveMode.Overwrite).save(s"$workingDirPath/scholix") + }).repartition(6000).write.mode(SaveMode.Overwrite).save(s"$workingDirPath/scholix_r") + val finalScholix:Dataset[Scholix] = spark.read.load(s"$workingDirPath/scholix_r").as[Scholix] + + finalScholix.map(d => (d.getIdentifier, d))(Encoders.tuple(Encoders.STRING, scholixEncoder)) + .groupByKey(_._1)(Encoders.STRING) + .agg(getScholixAggregator().toColumn) + .map(p => p._2) + .write.mode(SaveMode.Overwrite).save(s"$workingDirPath/scholix") } diff --git a/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/java/eu/dnetlib/dhp/provision/scholix/Scholix.java b/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/java/eu/dnetlib/dhp/provision/scholix/Scholix.java index d71415513..6ea8ff735 100644 --- a/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/java/eu/dnetlib/dhp/provision/scholix/Scholix.java +++ b/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/java/eu/dnetlib/dhp/provision/scholix/Scholix.java @@ -5,6 +5,8 @@ import java.io.Serializable; import java.util.*; import java.util.stream.Collectors; +import org.apache.commons.lang3.StringUtils; + import com.fasterxml.jackson.databind.ObjectMapper; import eu.dnetlib.dhp.provision.scholix.summary.ScholixSummary; @@ -91,13 +93,91 @@ public class Scholix implements Serializable { s.setSource(ScholixResource.fromSummary(scholixSummary)); s.setIdentifier(rel.getTarget()); - // ScholixResource mockTarget = new ScholixResource(); - // mockTarget.setDnetIdentifier(rel.getTarget()); - // s.setTarget(mockTarget); - // s.generateIdentifier(); return s; } + private List mergeScholixEntityId(final List a, final List b) { + final List m = new ArrayList<>(a); + if (b != null) + b.forEach(s -> { + int tt = (int) m.stream().filter(t -> t.getName().equalsIgnoreCase(s.getName())).count(); + if (tt == 0) { + m.add(s); + } + }); + return m; + } + + private List mergeScholixIdnetifier(final List a, + final List b) { + final List m = new ArrayList<>(a); + if (b != null) + b.forEach(s -> { + int tt = (int) m.stream().filter(t -> t.getIdentifier().equalsIgnoreCase(s.getIdentifier())).count(); + if (tt == 0) { + m.add(s); + } + }); + return m; + } + + private List mergeScholixCollectedFrom(final List a, + final List b) { + final List m = new ArrayList<>(a); + if (b != null) + b.forEach(s -> { + int tt = (int) m + .stream() + .filter(t -> t.getProvider().getName().equalsIgnoreCase(s.getProvider().getName())) + .count(); + if (tt == 0) { + m.add(s); + } + }); + return m; + } + + private ScholixRelationship mergeRelationships(final ScholixRelationship a, final ScholixRelationship b) { + ScholixRelationship result = new ScholixRelationship(); + result.setName(StringUtils.isEmpty(a.getName()) ? b.getName() : a.getName()); + result.setInverse(StringUtils.isEmpty(a.getInverse()) ? b.getInverse() : a.getInverse()); + result.setSchema(StringUtils.isEmpty(a.getSchema()) ? b.getSchema() : a.getSchema()); + return result; + } + + private ScholixResource mergeResource(final ScholixResource a, final ScholixResource b) { + + final ScholixResource result = new ScholixResource(); + result.setCollectedFrom(mergeScholixCollectedFrom(a.getCollectedFrom(), b.getCollectedFrom())); + result.setCreator(mergeScholixEntityId(a.getCreator(), b.getCreator())); + result + .setDnetIdentifier( + StringUtils.isBlank(a.getDnetIdentifier()) ? b.getDnetIdentifier() : a.getDnetIdentifier()); + result.setIdentifier(mergeScholixIdnetifier(a.getIdentifier(), b.getIdentifier())); + result.setObjectType(StringUtils.isNotBlank(a.getObjectType()) ? a.getObjectType() : b.getObjectType()); + result + .setObjectSubType( + StringUtils.isNotBlank(a.getObjectSubType()) ? a.getObjectSubType() : b.getObjectSubType()); + result.setPublisher(mergeScholixEntityId(a.getPublisher(), b.getPublisher())); + result + .setPublicationDate( + StringUtils.isNotBlank(a.getPublicationDate()) ? a.getPublicationDate() : b.getPublicationDate()); + result.setTitle(StringUtils.isNotBlank(a.getTitle()) ? a.getTitle() : b.getTitle()); + return result; + + } + + public void mergeFrom(final Scholix other) { + linkprovider = mergeScholixEntityId(linkprovider, other.getLinkprovider()); + publisher = mergeScholixEntityId(publisher, other.getPublisher()); + if (StringUtils.isEmpty(publicationDate)) + publicationDate = other.getPublicationDate(); + relationship = mergeRelationships(relationship, other.getRelationship()); + source = mergeResource(source, other.getSource()); + target = mergeResource(target, other.getTarget()); + generateIdentifier(); + } + public void generatelinkPublisher() { Set publisher = new HashSet<>(); if (source.getPublisher() != null) diff --git a/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/resources/eu/dnetlib/dhp/sx/provision/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/resources/eu/dnetlib/dhp/sx/provision/oozie_app/workflow.xml index 83c70fa25..c2c2a78fb 100644 --- a/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/resources/eu/dnetlib/dhp/sx/provision/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-graph-provision-scholexplorer/src/main/resources/eu/dnetlib/dhp/sx/provision/oozie_app/workflow.xml @@ -108,7 +108,7 @@ -m yarn-cluster --workingPath${workingDirPath} - +