dhp-graph-dump/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/csv/DumpCommunities.java

120 lines
3.7 KiB
Java

package eu.dnetlib.dhp.oa.graph.dump.csv;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import static org.apache.commons.lang3.StringUtils.split;
import java.io.BufferedWriter;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.io.Serializable;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.spark.SparkConf;
import org.apache.spark.sql.SparkSession;
import org.dom4j.DocumentException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.xml.sax.SAXException;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.oa.graph.dump.QueryInformationSystem;
import eu.dnetlib.dhp.oa.graph.dump.SaveCommunityMap;
import eu.dnetlib.dhp.oa.graph.dump.Utils;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
/**
* @author miriam.baglioni
* @Date 09/05/23
*/
//STEP 1
public class DumpCommunities implements Serializable {
private static final Logger log = LoggerFactory.getLogger(DumpCommunities.class);
private final BufferedWriter writer;
private final static String HEADER = "id" + Constants.SEP + "name" + Constants.SEP + "acronym" + Constants.SEP
+ " description \n";
private final transient QueryInformationSystem queryInformationSystem;
public static void main(String[] args) throws Exception {
String jsonConfiguration = IOUtils
.toString(
DumpCommunities.class
.getResourceAsStream(
"/eu/dnetlib/dhp/oa/graph/dump/input_dump_csv_ste1.json"));
final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
parser.parseArgument(args);
final String outputPath = parser.get("outputPath");
log.info("outputPath: {}", outputPath);
final String nameNode = parser.get("nameNode");
log.info("nameNode: {}", nameNode);
final List<String> communities = Arrays.asList(split(parser.get("communities"), ";"));
final DumpCommunities dc = new DumpCommunities(outputPath, nameNode, parser.get("isLookUpUrl"));
dc.writeCommunity(communities);
}
private void writeCommunity(List<String> communities)
throws IOException, ISLookUpException, DocumentException, SAXException {
writer.write(HEADER);
writer.flush();
String a = IOUtils
.toString(
DumpCommunities.class
.getResourceAsStream("/eu/dnetlib/dhp/oa/graph/dump/xqueries/set_of_communities.xq"));
final String xquery = String
.format(
a,
communities
.stream()
.map(t -> String.format("$x//CONFIGURATION/context[./@id= '%s']", t))
.collect(Collectors.joining(" or ")));
for (String community : queryInformationSystem
.getCommunityCsv(xquery)) {
writer
.write(
community);
writer.write("\n");
}
writer.close();
}
public DumpCommunities(String hdfsPath, String hdfsNameNode, String isLookUpUrl) throws Exception {
final Configuration conf = new Configuration();
queryInformationSystem = new QueryInformationSystem();
queryInformationSystem.setIsLookUp(Utils.getIsLookUpService(isLookUpUrl));
conf.set("fs.defaultFS", hdfsNameNode);
FileSystem fileSystem = FileSystem.get(conf);
Path hdfsWritePath = new Path(hdfsPath);
if (fileSystem.exists(hdfsWritePath)) {
fileSystem.delete(hdfsWritePath, true);
}
FSDataOutputStream fos = fileSystem.create(hdfsWritePath);
writer = new BufferedWriter(new OutputStreamWriter(fos, StandardCharsets.UTF_8));
}
}