added logic to directly send records to catalogue

This commit is contained in:
Miriam Baglioni 2020-07-02 11:12:14 +02:00
parent 566a763175
commit 2d380aea1d
2 changed files with 36 additions and 22 deletions

View File

@ -24,7 +24,7 @@ import eu.dnetlib.dhp.schema.oaf.Qualifier;
public class Mapper implements Serializable {
private static final List<String> publishers = Arrays
.asList("zenodo", "hal", "figshare", "inria", "digital.csic", "dans");
.asList("zenodo", "hal", "figshare", "digital-csic", "dans", "datacite");
private static final List<String> access = Arrays.asList("open", "closed", "embargoed", "restricted");
public static <I extends eu.dnetlib.dhp.schema.oaf.Result> CatalogueEntry map(I input) {

View File

@ -3,14 +3,23 @@ package eu.dnetlib.dhp.oa.graph.dump.gcat;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import org.apache.commons.io.IOUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.io.Serializable;
public class SendToCatalogue implements Serializable {
private static final Log log = LogFactory.getLog(SendToCatalogue.class);
public static void main(final String[] args) throws Exception {
final ArgumentApplicationParser parser = new ArgumentApplicationParser(
IOUtils
@ -25,38 +34,43 @@ public class SendToCatalogue implements Serializable {
final String hdfsPath = parser.get("hdfsPath");
final String hdfsNameNode = parser.get("hdfsNameNode");
fileStatusIterator = new FileStat
Configuration conf = new Configuration();
conf.set("fs.defaultFS", hdfsNameNode);
FileSystem fileSystem = FileSystem.get(conf);
RemoteIterator<LocatedFileStatus> fileStatusListIterator = fileSystem
.listFiles(
new Path(hdfsPath), true);
GCatAPIClient gCatAPIClient = new GCatAPIClient();
gCatAPIClient.setApplicationToken(access_token);
gCatAPIClient.purgeAll();
while (fileStatusListIterator.hasNext()) {
LocatedFileStatus fileStatus = fileStatusListIterator.next();
Path p = fileStatus.getPath();
String p_string = p.toString();
String tmp = p_string.substring(0, p_string.lastIndexOf("/"));
String community = tmp.substring(tmp.lastIndexOf("/") + 1);
log.info("Sending information for community: " + community);
String community_name = communityMap.get(community).replace(" ", "_");
log.info("Copying information for community: " + community);
fileSystem.copyToLocalFile(p, new Path("/tmp/" + community_name));
File f = new File("/tmp/" + community_name);
try {
apiClient.upload(f, community_name);
apiClient.sendMretadata(metadata);
apiClient.publish();
} catch (Exception e) {
if (f.exists()) {
log.info("Deleting information for community: " + community);
f.delete();
}
} finally {
if (f.exists()) {
log.info("Deleting information for community: " + community);
f.delete();
}
String name = tmp.substring(tmp.lastIndexOf("/") + 1);
log.info("Sending information for : " + name);
//String community_name = communityMap.get(community).replace(" ", "_");
log.info("Copying information for : " + name);
fileSystem.copyToLocalFile(p, new Path("/tmp/" + name));
BufferedReader reader = new BufferedReader(new FileReader("/tmp/" + name));
String line;
while((line=reader.readLine())!= null){
gCatAPIClient.publish(line);
}
reader.close();
log.info("deleting information for: " + name);
File f = new File("/tmp/"+name);
f.delete();
}
}