This commit is contained in:
Miriam Baglioni 2020-12-03 14:51:47 +01:00
parent f6b7c297a8
commit 6b8e947bcf
3 changed files with 19 additions and 26 deletions

View File

@ -10,10 +10,7 @@ import org.apache.commons.io.IOUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.fs.*;
import org.apache.http.HttpStatus;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
@ -56,16 +53,17 @@ public class SendToCatalogue implements Serializable {
Path p = fileStatus.getPath();
String p_string = p.toString();
if (!p_string.endsWith("_SUCCESS")) {
String tmp = p_string.substring(0, p_string.lastIndexOf("/"));
String name = tmp.substring(tmp.lastIndexOf("/") + 1);
log.info("Copying information for : " + name);
fileSystem.copyToLocalFile(p, new Path("/tmp/" + name));
// String tmp = p_string.substring(0, p_string.lastIndexOf("/"));
// String name = tmp.substring(tmp.lastIndexOf("/") + 1);
// log.info("Copying information for : " + name);
// fileSystem.copyToLocalFile(p, new Path("/tmp/" + name));
try {
InputStream in = new GZIPInputStream(new FileInputStream("/tmp/" + name));
// InputStream in = new GZIPInputStream(new FileInputStream("/tmp/" + name));
BufferedReader reader = new BufferedReader(
new InputStreamReader(in));
// BufferedReader reader = new BufferedReader(
// new InputStreamReader(in));
BufferedReader reader = new BufferedReader(new InputStreamReader(fileSystem.open(p)));
String line;
while ((line = reader.readLine()) != null) {
if (HttpStatus.SC_CREATED != gCatAPIClient.publish(line)) {
@ -73,20 +71,20 @@ public class SendToCatalogue implements Serializable {
}
}
reader.close();
in.close();
} finally {
log.info("deleting information for: " + name);
File f = new File("/tmp/" + name);
if (f.exists()) {
f.delete();
}
// in.close();
} finally {
// log.info("deleting information for: " + name);
// File f = new File("/tmp/" + name);
// if (f.exists()) {
// f.delete();
}
}
}
}
}
}
//}

View File

@ -211,7 +211,7 @@
<error to="Kill"/>
</action>
<join name="join_dump" to="populate_catalogue"/>
<join name="join_dump" to="End"/>
<action name="populate_catalogue">
<java>

View File

@ -4,9 +4,7 @@ package eu.dnetlib.dhp.oa.graph.gcat;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import org.apache.commons.io.FileUtils;
import org.apache.spark.SparkConf;
@ -22,10 +20,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.gson.Gson;
import eu.dnetlib.dhp.oa.graph.dump.SparkDumpCommunityProducts;
import eu.dnetlib.dhp.oa.graph.dump.gcat.Mapper;
import eu.dnetlib.dhp.oa.graph.dump.gcat.SparkDumpRISISCatalogue;
public class DumpJobTest {