[MakeTar] added second step of tar creation to include all the funders with fundRef identifier as id to be in the same folder (Zenodo allows maximum 100 files per deposition)

This commit is contained in:
Miriam Baglioni 2024-07-30 13:54:53 +02:00
parent c54f4a7d09
commit 695a6cbc31
7 changed files with 91 additions and 24 deletions

View File

@ -51,7 +51,7 @@ public class MakeTarArchive implements Serializable {
final boolean rename = Optional final boolean rename = Optional
.ofNullable(parser.get("rename")) .ofNullable(parser.get("rename"))
.map(Boolean::valueOf) .map(Boolean::valueOf)
.orElse(Boolean.FALSE); .orElse(Boolean.TRUE);
Configuration conf = new Configuration(); Configuration conf = new Configuration();
conf.set("fs.defaultFS", hdfsNameNode); conf.set("fs.defaultFS", hdfsNameNode);

View File

@ -5,12 +5,14 @@ import java.io.IOException;
import java.io.Serializable; import java.io.Serializable;
import java.util.Optional; import java.util.Optional;
import org.apache.commons.compress.archivers.tar.TarArchiveOutputStream;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.yarn.webapp.hamlet.Hamlet;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -45,18 +47,50 @@ public class MakeTar implements Serializable {
.map(Integer::valueOf) .map(Integer::valueOf)
.orElse(10); .orElse(10);
final boolean secondTar = Optional
.ofNullable(parser.get("secondTar"))
.map(Boolean::valueOf)
.orElse(Boolean.FALSE);
Configuration conf = new Configuration(); Configuration conf = new Configuration();
conf.set("fs.defaultFS", hdfsNameNode); conf.set("fs.defaultFS", hdfsNameNode);
FileSystem fileSystem = FileSystem.get(conf); FileSystem fileSystem = FileSystem.get(conf);
makeTArArchive(fileSystem, inputPath, outputPath, gBperSplit); makeTArArchive(fileSystem, inputPath, outputPath, gBperSplit, secondTar);
} }
public static void makeTArArchive(FileSystem fileSystem, String inputPath, String outputPath, int gBperSplit) public static void makeTArArchive(FileSystem fileSystem, String inputPath, String outputPath, int gBperSplit,
boolean secondTar)
throws IOException { throws IOException {
if (secondTar) {
Path movePath = new Path("/tmp/temp_tar/fundRef");
fileSystem.mkdirs(movePath);
RemoteIterator<LocatedFileStatus> fileStatusListIterator = fileSystem
.listFiles(
new Path(outputPath), true);
while (fileStatusListIterator.hasNext()) {
Path p = fileStatusListIterator.next().getPath();
String pString = p.toString();
if (!pString.endsWith("_SUCCESS")) {
String name = pString.substring(pString.lastIndexOf("/") + 1);
if (name.startsWith("10") || name.startsWith("50")) {
fileSystem.rename(p, new Path("/tmp/temp_tar/fundRef/" + name));
}
}
}
execOnDir(fileSystem, "/tmp/temp_tar", outputPath, gBperSplit);
} else
execOnDir(fileSystem, inputPath, outputPath, gBperSplit);
}
private static void execOnDir(FileSystem fileSystem, String inputPath, String outputPath, int gBperSplit)
throws IOException {
RemoteIterator<LocatedFileStatus> dirIterator = fileSystem.listLocatedStatus(new Path(inputPath)); RemoteIterator<LocatedFileStatus> dirIterator = fileSystem.listLocatedStatus(new Path(inputPath));
while (dirIterator.hasNext()) { while (dirIterator.hasNext()) {
@ -68,7 +102,6 @@ public class MakeTar implements Serializable {
MakeTarArchive.tarMaxSize(fileSystem, pathString, outputPath + "/" + entity, entity, gBperSplit, true); MakeTarArchive.tarMaxSize(fileSystem, pathString, outputPath + "/" + entity, entity, gBperSplit, true);
} }
} }
} }

View File

@ -76,11 +76,13 @@ public class ProjectsSubsetSparkJob implements Serializable {
.mode(SaveMode.Overwrite) .mode(SaveMode.Overwrite)
.option("compression", "gzip") .option("compression", "gzip")
.json(outputPath); .json(outputPath);
projectList.write().mode(SaveMode.Overwrite).option("compression", "gzip").text("/workingDir/temp");
Utils Utils
.readPath(spark, outputPath, Project.class) .readPath(spark, outputPath, Project.class)
.map((MapFunction<Project, String>) p -> p.getId(), Encoders.STRING()) .map((MapFunction<Project, String>) eu.dnetlib.dhp.oa.model.Project::getId, Encoders.STRING())
.union(spark.read().textFile("/workingDir/temp"))
.write() .write()
.mode(SaveMode.Append) .mode(SaveMode.Overwrite)
.option("compression", "gzip") .option("compression", "gzip")
.text(projectListPath); .text(projectListPath);
} }

View File

@ -23,6 +23,12 @@
"paramLongName":"splitSize", "paramLongName":"splitSize",
"paramDescription": "the maximum size of the archive", "paramDescription": "the maximum size of the archive",
"paramRequired": false "paramRequired": false
},
{
"paramName":"st",
"paramLongName":"secondTar",
"paramDescription": "the maximum size of the archive",
"paramRequired": false
} }
] ]

View File

@ -305,6 +305,25 @@
<arg>--nameNode</arg><arg>${nameNode}</arg> <arg>--nameNode</arg><arg>${nameNode}</arg>
<arg>--sourcePath</arg><arg>${outputPath}/dump</arg> <arg>--sourcePath</arg><arg>${outputPath}/dump</arg>
</java> </java>
<ok to="second_archive"/>
<error to="Kill"/>
</action>
<decision name="second_archive">
<switch>
<case to="make_second_tar">${wf:conf('dumpType') eq "funder"}</case>
<default to="should_upload"/>
</switch>
</decision>
<action name="make_second_tar">
<java>
<main-class>eu.dnetlib.dhp.oa.graph.dump.MakeTar</main-class>
<arg>--hdfsPath</arg><arg>${outputPath}/tar</arg>
<arg>--nameNode</arg><arg>${nameNode}</arg>
<arg>--sourcePath</arg><arg>${outputPath}/dump</arg>
<arg>--secondTar</arg><arg>true</arg>
</java>
<ok to="should_upload"/> <ok to="should_upload"/>
<error to="Kill"/> <error to="Kill"/>
</action> </action>

View File

@ -56,7 +56,7 @@ public class MakeTarTest {
String inputPath = workingDir + "/zenodo/"; String inputPath = workingDir + "/zenodo/";
MakeTar.makeTArArchive(fs, inputPath, "/tmp/out", 0); MakeTar.makeTArArchive(fs, inputPath, "/tmp/out", 0, false);
} }
} }

View File

@ -73,8 +73,11 @@ public class CreateEntityTest {
Assertions.assertTrue(((ResearchCommunity) c).getSubject().contains("fish")); Assertions.assertTrue(((ResearchCommunity) c).getSubject().contains("fish"));
Assertions.assertTrue(((ResearchCommunity) c).getSubject().contains("aqua")); Assertions.assertTrue(((ResearchCommunity) c).getSubject().contains("aqua"));
Assertions.assertTrue(((ResearchCommunity) c).getSubject().contains("sea")); Assertions.assertTrue(((ResearchCommunity) c).getSubject().contains("sea"));
Assertions.assertEquals(c Assertions
.getId(), String .assertEquals(
c
.getId(),
String
.format( .format(
"%s::%s", Constants.CONTEXT_NS_PREFIX, "%s::%s", Constants.CONTEXT_NS_PREFIX,
DHPUtils.md5(c.getAcronym()))); DHPUtils.md5(c.getAcronym())));
@ -82,9 +85,13 @@ public class CreateEntityTest {
Assertions.assertEquals("mes", c.getAcronym()); Assertions.assertEquals("mes", c.getAcronym());
break; break;
case "clarin": case "clarin":
Assertions.assertEquals(eu.dnetlib.dhp.oa.graph.dump.Constants.RESEARCH_INFRASTRUCTURE, c.getType()); Assertions
Assertions.assertEquals(c .assertEquals(eu.dnetlib.dhp.oa.graph.dump.Constants.RESEARCH_INFRASTRUCTURE, c.getType());
.getId(), String Assertions
.assertEquals(
c
.getId(),
String
.format( .format(
"%s::%s", Constants.CONTEXT_NS_PREFIX, "%s::%s", Constants.CONTEXT_NS_PREFIX,
DHPUtils.md5(c.getAcronym()))); DHPUtils.md5(c.getAcronym())));