From 695a6cbc319c3bd8ac2008f3609f8e6404cf39dc Mon Sep 17 00:00:00 2001 From: Miriam Baglioni Date: Tue, 30 Jul 2024 13:54:53 +0200 Subject: [PATCH] [MakeTar] added second step of tar creation to include all the funders with fundRef identifier as id to be in the same folder (Zenodo allows maximum 100 files per deposition) --- .../dnetlib/dhp/oa/common/MakeTarArchive.java | 2 +- .../eu/dnetlib/dhp/oa/graph/dump/MakeTar.java | 39 ++++++++++++++++-- .../ProjectsSubsetSparkJob.java | 6 ++- .../graph/dump/input_maketar_parameters.json | 6 +++ .../graph/dump/wf/main/oozie_app/workflow.xml | 19 +++++++++ .../dhp/oa/graph/dump/MakeTarTest.java | 2 +- .../graph/dump/complete/CreateEntityTest.java | 41 +++++++++++-------- 7 files changed, 91 insertions(+), 24 deletions(-) diff --git a/dump/src/main/java/eu/dnetlib/dhp/oa/common/MakeTarArchive.java b/dump/src/main/java/eu/dnetlib/dhp/oa/common/MakeTarArchive.java index 77fc287..f334844 100644 --- a/dump/src/main/java/eu/dnetlib/dhp/oa/common/MakeTarArchive.java +++ b/dump/src/main/java/eu/dnetlib/dhp/oa/common/MakeTarArchive.java @@ -51,7 +51,7 @@ public class MakeTarArchive implements Serializable { final boolean rename = Optional .ofNullable(parser.get("rename")) .map(Boolean::valueOf) - .orElse(Boolean.FALSE); + .orElse(Boolean.TRUE); Configuration conf = new Configuration(); conf.set("fs.defaultFS", hdfsNameNode); diff --git a/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/MakeTar.java b/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/MakeTar.java index 44f4ef6..294da3c 100644 --- a/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/MakeTar.java +++ b/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/MakeTar.java @@ -5,12 +5,14 @@ import java.io.IOException; import java.io.Serializable; import java.util.Optional; +import org.apache.commons.compress.archivers.tar.TarArchiveOutputStream; import org.apache.commons.io.IOUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.RemoteIterator; +import org.apache.hadoop.yarn.webapp.hamlet.Hamlet; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -45,18 +47,50 @@ public class MakeTar implements Serializable { .map(Integer::valueOf) .orElse(10); + final boolean secondTar = Optional + .ofNullable(parser.get("secondTar")) + .map(Boolean::valueOf) + .orElse(Boolean.FALSE); + Configuration conf = new Configuration(); conf.set("fs.defaultFS", hdfsNameNode); FileSystem fileSystem = FileSystem.get(conf); - makeTArArchive(fileSystem, inputPath, outputPath, gBperSplit); + makeTArArchive(fileSystem, inputPath, outputPath, gBperSplit, secondTar); } - public static void makeTArArchive(FileSystem fileSystem, String inputPath, String outputPath, int gBperSplit) + public static void makeTArArchive(FileSystem fileSystem, String inputPath, String outputPath, int gBperSplit, + boolean secondTar) throws IOException { + if (secondTar) { + Path movePath = new Path("/tmp/temp_tar/fundRef"); + fileSystem.mkdirs(movePath); + + RemoteIterator fileStatusListIterator = fileSystem + .listFiles( + new Path(outputPath), true); + + while (fileStatusListIterator.hasNext()) { + Path p = fileStatusListIterator.next().getPath(); + String pString = p.toString(); + if (!pString.endsWith("_SUCCESS")) { + String name = pString.substring(pString.lastIndexOf("/") + 1); + if (name.startsWith("10") || name.startsWith("50")) { + fileSystem.rename(p, new Path("/tmp/temp_tar/fundRef/" + name)); + } + } + } + execOnDir(fileSystem, "/tmp/temp_tar", outputPath, gBperSplit); + } else + execOnDir(fileSystem, inputPath, outputPath, gBperSplit); + + } + + private static void execOnDir(FileSystem fileSystem, String inputPath, String outputPath, int gBperSplit) + throws IOException { RemoteIterator dirIterator = fileSystem.listLocatedStatus(new Path(inputPath)); while (dirIterator.hasNext()) { @@ -68,7 +102,6 @@ public class MakeTar implements Serializable { MakeTarArchive.tarMaxSize(fileSystem, pathString, outputPath + "/" + entity, entity, gBperSplit, true); } - } } diff --git a/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/projectssubset/ProjectsSubsetSparkJob.java b/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/projectssubset/ProjectsSubsetSparkJob.java index b95312f..46784bf 100644 --- a/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/projectssubset/ProjectsSubsetSparkJob.java +++ b/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/projectssubset/ProjectsSubsetSparkJob.java @@ -76,11 +76,13 @@ public class ProjectsSubsetSparkJob implements Serializable { .mode(SaveMode.Overwrite) .option("compression", "gzip") .json(outputPath); + projectList.write().mode(SaveMode.Overwrite).option("compression", "gzip").text("/workingDir/temp"); Utils .readPath(spark, outputPath, Project.class) - .map((MapFunction) p -> p.getId(), Encoders.STRING()) + .map((MapFunction) eu.dnetlib.dhp.oa.model.Project::getId, Encoders.STRING()) + .union(spark.read().textFile("/workingDir/temp")) .write() - .mode(SaveMode.Append) + .mode(SaveMode.Overwrite) .option("compression", "gzip") .text(projectListPath); } diff --git a/dump/src/main/resources/eu/dnetlib/dhp/oa/graph/dump/input_maketar_parameters.json b/dump/src/main/resources/eu/dnetlib/dhp/oa/graph/dump/input_maketar_parameters.json index a153188..592e31a 100644 --- a/dump/src/main/resources/eu/dnetlib/dhp/oa/graph/dump/input_maketar_parameters.json +++ b/dump/src/main/resources/eu/dnetlib/dhp/oa/graph/dump/input_maketar_parameters.json @@ -23,6 +23,12 @@ "paramLongName":"splitSize", "paramDescription": "the maximum size of the archive", "paramRequired": false + }, + { + "paramName":"st", + "paramLongName":"secondTar", + "paramDescription": "the maximum size of the archive", + "paramRequired": false } ] diff --git a/dump/src/main/resources/eu/dnetlib/dhp/oa/graph/dump/wf/main/oozie_app/workflow.xml b/dump/src/main/resources/eu/dnetlib/dhp/oa/graph/dump/wf/main/oozie_app/workflow.xml index 95aafbb..bf77475 100644 --- a/dump/src/main/resources/eu/dnetlib/dhp/oa/graph/dump/wf/main/oozie_app/workflow.xml +++ b/dump/src/main/resources/eu/dnetlib/dhp/oa/graph/dump/wf/main/oozie_app/workflow.xml @@ -305,6 +305,25 @@ --nameNode${nameNode} --sourcePath${outputPath}/dump + + + + + + + ${wf:conf('dumpType') eq "funder"} + + + + + + + eu.dnetlib.dhp.oa.graph.dump.MakeTar + --hdfsPath${outputPath}/tar + --nameNode${nameNode} + --sourcePath${outputPath}/dump + --secondTartrue + diff --git a/dump/src/test/java/eu/dnetlib/dhp/oa/graph/dump/MakeTarTest.java b/dump/src/test/java/eu/dnetlib/dhp/oa/graph/dump/MakeTarTest.java index f3b9c95..1ae54ba 100644 --- a/dump/src/test/java/eu/dnetlib/dhp/oa/graph/dump/MakeTarTest.java +++ b/dump/src/test/java/eu/dnetlib/dhp/oa/graph/dump/MakeTarTest.java @@ -56,7 +56,7 @@ public class MakeTarTest { String inputPath = workingDir + "/zenodo/"; - MakeTar.makeTArArchive(fs, inputPath, "/tmp/out", 0); + MakeTar.makeTArArchive(fs, inputPath, "/tmp/out", 0, false); } } diff --git a/dump/src/test/java/eu/dnetlib/dhp/oa/graph/dump/complete/CreateEntityTest.java b/dump/src/test/java/eu/dnetlib/dhp/oa/graph/dump/complete/CreateEntityTest.java index fb94467..846d786 100644 --- a/dump/src/test/java/eu/dnetlib/dhp/oa/graph/dump/complete/CreateEntityTest.java +++ b/dump/src/test/java/eu/dnetlib/dhp/oa/graph/dump/complete/CreateEntityTest.java @@ -66,30 +66,37 @@ public class CreateEntityTest { riList.forEach(c -> { switch (c.getAcronym()) { case "mes": - Assertions.assertEquals(eu.dnetlib.dhp.oa.graph.dump.Constants.RESEARCH_COMMUNITY, c.getType()); - Assertions.assertEquals(5, ((ResearchCommunity) c).getSubject().size()); + Assertions.assertEquals(eu.dnetlib.dhp.oa.graph.dump.Constants.RESEARCH_COMMUNITY, c.getType()); + Assertions.assertEquals(5, ((ResearchCommunity) c).getSubject().size()); Assertions.assertTrue(((ResearchCommunity) c).getSubject().contains("marine")); Assertions.assertTrue(((ResearchCommunity) c).getSubject().contains("ocean")); Assertions.assertTrue(((ResearchCommunity) c).getSubject().contains("fish")); Assertions.assertTrue(((ResearchCommunity) c).getSubject().contains("aqua")); Assertions.assertTrue(((ResearchCommunity) c).getSubject().contains("sea")); - Assertions.assertEquals(c - .getId(), String - .format( - "%s::%s", Constants.CONTEXT_NS_PREFIX, - DHPUtils.md5(c.getAcronym()))); - Assertions.assertEquals("https://zenodo.org/communities/oac_mes", c.getZenodoCommunity()); - Assertions.assertEquals("mes", c.getAcronym()); + Assertions + .assertEquals( + c + .getId(), + String + .format( + "%s::%s", Constants.CONTEXT_NS_PREFIX, + DHPUtils.md5(c.getAcronym()))); + Assertions.assertEquals("https://zenodo.org/communities/oac_mes", c.getZenodoCommunity()); + Assertions.assertEquals("mes", c.getAcronym()); break; case "clarin": - Assertions.assertEquals(eu.dnetlib.dhp.oa.graph.dump.Constants.RESEARCH_INFRASTRUCTURE, c.getType()); - Assertions.assertEquals(c - .getId(), String - .format( - "%s::%s", Constants.CONTEXT_NS_PREFIX, - DHPUtils.md5(c.getAcronym()))); - Assertions.assertEquals("https://zenodo.org/communities/oac_clarin", c.getZenodoCommunity()); - Assertions.assertEquals("clarin", c.getAcronym()); + Assertions + .assertEquals(eu.dnetlib.dhp.oa.graph.dump.Constants.RESEARCH_INFRASTRUCTURE, c.getType()); + Assertions + .assertEquals( + c + .getId(), + String + .format( + "%s::%s", Constants.CONTEXT_NS_PREFIX, + DHPUtils.md5(c.getAcronym()))); + Assertions.assertEquals("https://zenodo.org/communities/oac_clarin", c.getZenodoCommunity()); + Assertions.assertEquals("clarin", c.getAcronym()); break; } // TODO add check for all the others Entities