diff --git a/dhp-common/src/main/java/eu/dnetlib/dhp/common/MakeTarArchive.java b/dhp-common/src/main/java/eu/dnetlib/dhp/common/MakeTarArchive.java index 5dff1d0a8f..4047fdca4f 100644 --- a/dhp-common/src/main/java/eu/dnetlib/dhp/common/MakeTarArchive.java +++ b/dhp-common/src/main/java/eu/dnetlib/dhp/common/MakeTarArchive.java @@ -1,119 +1,117 @@ -package eu.dnetlib.dhp.common; -import org.apache.commons.compress.archivers.tar.TarArchiveEntry; -import org.apache.commons.compress.archivers.tar.TarArchiveOutputStream; -import org.apache.hadoop.fs.*; +package eu.dnetlib.dhp.common; import java.io.BufferedInputStream; import java.io.IOException; import java.io.InputStream; import java.io.Serializable; +import org.apache.commons.compress.archivers.tar.TarArchiveEntry; +import org.apache.commons.compress.archivers.tar.TarArchiveOutputStream; +import org.apache.hadoop.fs.*; + public class MakeTarArchive implements Serializable { - private static TarArchiveOutputStream getTar(FileSystem fileSystem, String outputPath) throws IOException { - Path hdfsWritePath = new Path(outputPath); - FSDataOutputStream fsDataOutputStream = null; - if (fileSystem.exists(hdfsWritePath)) { - fileSystem.delete(hdfsWritePath, true); + private static TarArchiveOutputStream getTar(FileSystem fileSystem, String outputPath) throws IOException { + Path hdfsWritePath = new Path(outputPath); + FSDataOutputStream fsDataOutputStream = null; + if (fileSystem.exists(hdfsWritePath)) { + fileSystem.delete(hdfsWritePath, true); - } - fsDataOutputStream = fileSystem.create(hdfsWritePath); + } + fsDataOutputStream = fileSystem.create(hdfsWritePath); - return new TarArchiveOutputStream(fsDataOutputStream.getWrappedStream()); - } + return new TarArchiveOutputStream(fsDataOutputStream.getWrappedStream()); + } - private static void write(FileSystem fileSystem, String inputPath, String outputPath, String dir_name) - throws IOException { + private static void write(FileSystem fileSystem, String inputPath, String outputPath, String dir_name) + throws IOException { - Path hdfsWritePath = new Path(outputPath); - FSDataOutputStream fsDataOutputStream = null; - if (fileSystem.exists(hdfsWritePath)) { - fileSystem.delete(hdfsWritePath, true); + Path hdfsWritePath = new Path(outputPath); + FSDataOutputStream fsDataOutputStream = null; + if (fileSystem.exists(hdfsWritePath)) { + fileSystem.delete(hdfsWritePath, true); - } - fsDataOutputStream = fileSystem.create(hdfsWritePath); + } + fsDataOutputStream = fileSystem.create(hdfsWritePath); - TarArchiveOutputStream ar = new TarArchiveOutputStream(fsDataOutputStream.getWrappedStream()); + TarArchiveOutputStream ar = new TarArchiveOutputStream(fsDataOutputStream.getWrappedStream()); - RemoteIterator fileStatusListIterator = fileSystem - .listFiles( - new Path(inputPath), true); + RemoteIterator fileStatusListIterator = fileSystem + .listFiles( + new Path(inputPath), true); - while (fileStatusListIterator.hasNext()) { - writeCurrentFile(fileSystem, dir_name, fileStatusListIterator, ar, 0); - } + while (fileStatusListIterator.hasNext()) { + writeCurrentFile(fileSystem, dir_name, fileStatusListIterator, ar, 0); + } - ar.close(); - } + ar.close(); + } - public static void tarMaxSize(FileSystem fileSystem, String inputPath, String outputPath, String dir_name, - int gBperSplit) throws IOException { - final long bytesPerSplit = 1024L * 1024L * 1024L * gBperSplit; + public static void tarMaxSize(FileSystem fileSystem, String inputPath, String outputPath, String dir_name, + int gBperSplit) throws IOException { + final long bytesPerSplit = 1024L * 1024L * 1024L * gBperSplit; - long sourceSize = fileSystem.getContentSummary(new Path(inputPath)).getSpaceConsumed(); + long sourceSize = fileSystem.getContentSummary(new Path(inputPath)).getSpaceConsumed(); - if (sourceSize < bytesPerSplit) { - write(fileSystem, inputPath, outputPath + ".tar", dir_name); - } else { - int partNum = 0; + if (sourceSize < bytesPerSplit) { + write(fileSystem, inputPath, outputPath + ".tar", dir_name); + } else { + int partNum = 0; - RemoteIterator fileStatusListIterator = fileSystem - .listFiles( - new Path(inputPath), true); - boolean next = fileStatusListIterator.hasNext(); - while (next) { - TarArchiveOutputStream ar = getTar(fileSystem, outputPath + "_" + (partNum + 1) + ".tar"); + RemoteIterator fileStatusListIterator = fileSystem + .listFiles( + new Path(inputPath), true); + boolean next = fileStatusListIterator.hasNext(); + while (next) { + TarArchiveOutputStream ar = getTar(fileSystem, outputPath + "_" + (partNum + 1) + ".tar"); - long current_size = 0; - while (next && current_size < bytesPerSplit) { - current_size = writeCurrentFile(fileSystem, dir_name, fileStatusListIterator, ar, current_size); - next = fileStatusListIterator.hasNext(); + long current_size = 0; + while (next && current_size < bytesPerSplit) { + current_size = writeCurrentFile(fileSystem, dir_name, fileStatusListIterator, ar, current_size); + next = fileStatusListIterator.hasNext(); - } + } - partNum += 1; - ar.close(); - } + partNum += 1; + ar.close(); + } - } + } - } + } - private static long writeCurrentFile(FileSystem fileSystem, String dir_name, - RemoteIterator fileStatusListIterator, - TarArchiveOutputStream ar, long current_size) throws IOException { - LocatedFileStatus fileStatus = fileStatusListIterator.next(); + private static long writeCurrentFile(FileSystem fileSystem, String dir_name, + RemoteIterator fileStatusListIterator, + TarArchiveOutputStream ar, long current_size) throws IOException { + LocatedFileStatus fileStatus = fileStatusListIterator.next(); - Path p = fileStatus.getPath(); - String p_string = p.toString(); - if (!p_string.endsWith("_SUCCESS")) { - String name = p_string.substring(p_string.lastIndexOf("/") + 1); - if (name.trim().equalsIgnoreCase("communities_infrastructures")) { - name = "communities_infrastructures.json"; - } - TarArchiveEntry entry = new TarArchiveEntry(dir_name + "/" + name); - entry.setSize(fileStatus.getLen()); - current_size += fileStatus.getLen(); - ar.putArchiveEntry(entry); + Path p = fileStatus.getPath(); + String p_string = p.toString(); + if (!p_string.endsWith("_SUCCESS")) { + String name = p_string.substring(p_string.lastIndexOf("/") + 1); + if (name.trim().equalsIgnoreCase("communities_infrastructures")) { + name = "communities_infrastructures.json"; + } + TarArchiveEntry entry = new TarArchiveEntry(dir_name + "/" + name); + entry.setSize(fileStatus.getLen()); + current_size += fileStatus.getLen(); + ar.putArchiveEntry(entry); - InputStream is = fileSystem.open(fileStatus.getPath()); - - BufferedInputStream bis = new BufferedInputStream(is); - - int count; - byte data[] = new byte[1024]; - while ((count = bis.read(data, 0, data.length)) != -1) { - ar.write(data, 0, count); - } - bis.close(); - ar.closeArchiveEntry(); - - } - return current_size; - } + InputStream is = fileSystem.open(fileStatus.getPath()); + BufferedInputStream bis = new BufferedInputStream(is); + int count; + byte data[] = new byte[1024]; + while ((count = bis.read(data, 0, data.length)) != -1) { + ar.write(data, 0, count); + } + bis.close(); + ar.closeArchiveEntry(); + } + return current_size; + } } diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/CleaningFunctions.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/CleaningFunctions.java index 12543d8c7e..56a4aaf5a1 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/CleaningFunctions.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/CleaningFunctions.java @@ -109,20 +109,20 @@ public class CleaningFunctions { } if (Objects.nonNull(r.getPid())) { r - .setPid( - r - .getPid() - .stream() - .filter(Objects::nonNull) - .filter(sp -> StringUtils.isNotBlank(StringUtils.trim(sp.getValue()))) - .filter(sp -> NONE.equalsIgnoreCase(sp.getValue())) - .filter(sp -> Objects.nonNull(sp.getQualifier())) - .filter(sp -> StringUtils.isNotBlank(sp.getQualifier().getClassid())) - .map(sp -> { - sp.setValue(StringUtils.trim(sp.getValue())); - return sp; - }) - .collect(Collectors.toList())); + .setPid( + r + .getPid() + .stream() + .filter(Objects::nonNull) + .filter(sp -> StringUtils.isNotBlank(StringUtils.trim(sp.getValue()))) + .filter(sp -> NONE.equalsIgnoreCase(sp.getValue())) + .filter(sp -> Objects.nonNull(sp.getQualifier())) + .filter(sp -> StringUtils.isNotBlank(sp.getQualifier().getClassid())) + .map(sp -> { + sp.setValue(StringUtils.trim(sp.getValue())); + return sp; + }) + .collect(Collectors.toList())); } if (Objects.isNull(r.getResourcetype()) || StringUtils.isBlank(r.getResourcetype().getClassid())) { r diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/MakeTar.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/MakeTar.java index 08c7ee90e9..00ddcb5a86 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/MakeTar.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/MakeTar.java @@ -4,7 +4,6 @@ package eu.dnetlib.dhp.oa.graph.dump; import java.io.*; import java.util.Optional; -import eu.dnetlib.dhp.common.MakeTarArchive; import org.apache.commons.compress.archivers.ar.ArArchiveEntry; import org.apache.commons.compress.archivers.ar.ArArchiveOutputStream; import org.apache.commons.compress.archivers.tar.TarArchiveEntry; @@ -16,6 +15,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.common.MakeTarArchive; import eu.dnetlib.dhp.oa.graph.dump.community.CommunityMap; public class MakeTar implements Serializable { @@ -41,9 +41,10 @@ public class MakeTar implements Serializable { final String inputPath = parser.get("sourcePath"); log.info("input path : {}", inputPath); - final int gBperSplit = Optional.ofNullable(parser.get("splitSize")) - .map(Integer::valueOf) - .orElse(10); + final int gBperSplit = Optional + .ofNullable(parser.get("splitSize")) + .map(Integer::valueOf) + .orElse(10); Configuration conf = new Configuration(); conf.set("fs.defaultFS", hdfsNameNode); @@ -54,7 +55,8 @@ public class MakeTar implements Serializable { } - public static void makeTArArchive(FileSystem fileSystem, String inputPath, String outputPath, int gBperSplit) throws IOException { + public static void makeTArArchive(FileSystem fileSystem, String inputPath, String outputPath, int gBperSplit) + throws IOException { RemoteIterator dir_iterator = fileSystem.listLocatedStatus(new Path(inputPath)); @@ -70,5 +72,4 @@ public class MakeTar implements Serializable { } - } diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/Utils.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/Utils.java index bd2738eaab..f88980f15e 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/Utils.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/Utils.java @@ -5,7 +5,6 @@ import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; - import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.spark.api.java.function.MapFunction; @@ -13,14 +12,12 @@ import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoders; import org.apache.spark.sql.SparkSession; - import com.fasterxml.jackson.databind.ObjectMapper; import com.google.gson.Gson; import eu.dnetlib.dhp.common.HdfsSupport; import eu.dnetlib.dhp.oa.graph.dump.community.CommunityMap; import eu.dnetlib.dhp.oa.graph.dump.graph.Constants; - import eu.dnetlib.dhp.utils.DHPUtils; import eu.dnetlib.dhp.utils.ISLookupClientFactory; import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; @@ -74,5 +71,4 @@ public class Utils { return new Gson().fromJson(sb.toString(), CommunityMap.class); } - }