diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/community/SaveCommunityMap.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/SaveCommunityMap.java similarity index 59% rename from dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/community/SaveCommunityMap.java rename to dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/SaveCommunityMap.java index 87f7dfe61..2c77eccd2 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/community/SaveCommunityMap.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/SaveCommunityMap.java @@ -1,31 +1,28 @@ +/** + * This class connects with the IS related to the isLookUpUrl got as parameter. + * It saves the information about the context that will guide the dump of the results. + * The information saved is a HashMap. The key is the id of a community - research infrastructure/initiative , the + * value is the label of the research community - research infrastructure/initiative. + * + */ -package eu.dnetlib.dhp.oa.graph.dump.community; - -import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; +package eu.dnetlib.dhp.oa.graph.dump; import java.io.BufferedWriter; import java.io.IOException; import java.io.OutputStreamWriter; import java.io.Serializable; import java.nio.charset.StandardCharsets; -import java.util.Optional; import org.apache.commons.io.IOUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.spark.SparkConf; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.ObjectMapper; - import eu.dnetlib.dhp.application.ArgumentApplicationParser; -import eu.dnetlib.dhp.oa.graph.dump.QueryInformationSystem; -import eu.dnetlib.dhp.oa.graph.dump.Utils; -import eu.dnetlib.dhp.oa.graph.dump.graph.CreateContextEntities; import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException; public class SaveCommunityMap implements Serializable { @@ -74,45 +71,10 @@ public class SaveCommunityMap implements Serializable { final String isLookUpUrl = parser.get("isLookUpUrl"); log.info("isLookUpUrl: {}", isLookUpUrl); -// Boolean isSparkSessionManaged = Optional -// .ofNullable(parser.get("isSparkSessionManaged")) -// .map(Boolean::valueOf) -// .orElse(Boolean.TRUE); -// log.info("isSparkSessionManaged: {}", isSparkSessionManaged); - final SaveCommunityMap scm = new SaveCommunityMap(outputPath, nameNode, isLookUpUrl); scm.saveCommunityMap(); - // CommunityMap communityMap = queryInformationSystem.getCommunityMap(); - -// SparkConf conf = new SparkConf(); -// -// runWithSparkSession( -// conf, -// isSparkSessionManaged, -// spark -> { -// Utils.removeOutputDir(spark, outputPath); -// -//// execDump(spark, inputPath, outputPath, communityMap, inputClazz, outputClazz, graph);// , -// // dumpClazz); -// }); - -// Configuration conf = new Configuration(); -// conf.set("fs.defaultFS", nameNode); -// FileSystem fileSystem = FileSystem.get(conf); -// Path hdfsWritePath = new Path(outputPath); -// FSDataOutputStream fsDataOutputStream = null; -// if (fileSystem.exists(hdfsWritePath)) { -// fsDataOutputStream = fileSystem.append(hdfsWritePath); -// } else { -// fsDataOutputStream = fileSystem.create(hdfsWritePath); -// } -// -// BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(fsDataOutputStream, StandardCharsets.UTF_8)); -// -// writer.write(OBJECT_MAPPER.writeValueAsString(communityMap)); -// writer.close(); } private void saveCommunityMap() throws ISLookUpException, IOException { diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/SendToZenodoHDFS.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/SendToZenodoHDFS.java index 0154dad99..9ffc31399 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/SendToZenodoHDFS.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/SendToZenodoHDFS.java @@ -1,25 +1,17 @@ package eu.dnetlib.dhp.oa.graph.dump; -import java.io.BufferedReader; -import java.io.IOException; -import java.io.InputStreamReader; import java.io.Serializable; import java.util.Optional; - import org.apache.commons.io.IOUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.*; - -import com.google.gson.Gson; - import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.common.api.MissingConceptDoiException; import eu.dnetlib.dhp.common.api.ZenodoAPIClient; import eu.dnetlib.dhp.oa.graph.dump.community.CommunityMap; -import eu.dnetlib.dhp.utils.ISLookupClientFactory; public class SendToZenodoHDFS implements Serializable { @@ -36,7 +28,7 @@ public class SendToZenodoHDFS implements Serializable { parser.parseArgument(args); final String hdfsPath = parser.get("hdfsPath"); - final String hdfsNameNode = parser.get("hdfsNameNode"); + final String hdfsNameNode = parser.get("nameNode"); final String access_token = parser.get("accessToken"); final String connection_url = parser.get("connectionUrl"); final String metadata = parser.get("metadata"); @@ -51,7 +43,7 @@ public class SendToZenodoHDFS implements Serializable { FileSystem fileSystem = FileSystem.get(conf); - CommunityMap communityMap = readCommunityMap(fileSystem, communityMapPath); + CommunityMap communityMap = Utils.readCommunityMap(fileSystem, communityMapPath); RemoteIterator fileStatusListIterator = fileSystem .listFiles( @@ -72,13 +64,15 @@ public class SendToZenodoHDFS implements Serializable { Path p = fileStatus.getPath(); String p_string = p.toString(); if (!p_string.endsWith("_SUCCESS")) { - String tmp = p_string.substring(0, p_string.lastIndexOf("/")); - String community = tmp.substring(tmp.lastIndexOf("/") + 1); - log.info("Sending information for community: " + community); - String community_name = communityMap.get(community).replace(" ", "_") + ".json.gz"; + // String tmp = p_string.substring(0, p_string.lastIndexOf("/")); + String name = p_string.substring(p_string.lastIndexOf("/") + 1); + log.info("Sending information for community: " + name); + if (communityMap.containsKey(name.substring(0, name.lastIndexOf(".")))) { + name = communityMap.get(name.substring(0, name.lastIndexOf("."))).replace(" ", "_") + ".tar"; + } FSDataInputStream inputStream = fileSystem.open(p); - zenodoApiClient.uploadIS(inputStream, community_name, fileStatus.getLen()); + zenodoApiClient.uploadIS(inputStream, name, fileStatus.getLen()); } @@ -89,20 +83,4 @@ public class SendToZenodoHDFS implements Serializable { } - public static CommunityMap readCommunityMap(FileSystem fileSystem, String communityMapPath) throws IOException { - BufferedReader br = new BufferedReader(new InputStreamReader(fileSystem.open(new Path(communityMapPath)))); - StringBuffer sb = new StringBuffer(); - try { - String line; - while ((line = br.readLine()) != null) { - sb.append(line); - } - } finally { - br.close(); - - } - - return new Gson().fromJson(sb.toString(), CommunityMap.class); - } - } diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/Utils.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/Utils.java index 83cb6a3cc..c112c5c72 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/Utils.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/Utils.java @@ -1,6 +1,12 @@ package eu.dnetlib.dhp.oa.graph.dump; +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoders; @@ -48,4 +54,20 @@ public class Utils { return new Gson().fromJson(spark.read().textFile(communityMapPath).collectAsList().get(0), CommunityMap.class); } + + public static CommunityMap readCommunityMap(FileSystem fileSystem, String communityMapPath) throws IOException { + BufferedReader br = new BufferedReader(new InputStreamReader(fileSystem.open(new Path(communityMapPath)))); + StringBuffer sb = new StringBuffer(); + try { + String line; + while ((line = br.readLine()) != null) { + sb.append(line); + } + } finally { + br.close(); + + } + + return new Gson().fromJson(sb.toString(), CommunityMap.class); + } }