dhp-graph-dump/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/csv/DumpCommunities.java

97 lines
2.7 KiB
Java

package eu.dnetlib.dhp.oa.graph.dump.csv;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import static org.apache.commons.lang3.StringUtils.split;
import java.io.BufferedWriter;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.io.Serializable;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.List;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.oa.graph.dump.UtilCommunityAPI;
/**
* @author miriam.baglioni
* @Date 09/05/23
*/
//STEP 1
public class DumpCommunities implements Serializable {
private static final Logger log = LoggerFactory.getLogger(DumpCommunities.class);
private final BufferedWriter writer;
private final static String HEADER = "id" + Constants.SEP + "name" + Constants.SEP + "acronym" + Constants.SEP
+ " description \n";
private final transient UtilCommunityAPI queryCommunityAPI;
public static void main(String[] args) throws Exception {
String jsonConfiguration = IOUtils
.toString(
DumpCommunities.class
.getResourceAsStream(
"/eu/dnetlib/dhp/oa/graph/dump/input_dump_csv_ste1.json"));
final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
parser.parseArgument(args);
final String outputPath = parser.get("outputPath");
log.info("outputPath: {}", outputPath);
final String nameNode = parser.get("nameNode");
log.info("nameNode: {}", nameNode);
final List<String> communities = Arrays.asList(split(parser.get("communities"), ";"));
final DumpCommunities dc = new DumpCommunities(outputPath, nameNode);
dc.writeCommunity(communities);
}
private void writeCommunity(List<String> communities)
throws IOException {
writer.write(HEADER);
writer.flush();
for (String community : queryCommunityAPI
.getCommunityCsv(communities)) {
writer
.write(
community);
writer.write("\n");
}
writer.close();
}
public DumpCommunities(String hdfsPath, String hdfsNameNode) throws Exception {
final Configuration conf = new Configuration();
queryCommunityAPI = new UtilCommunityAPI();
conf.set("fs.defaultFS", hdfsNameNode);
FileSystem fileSystem = FileSystem.get(conf);
Path hdfsWritePath = new Path(hdfsPath);
if (fileSystem.exists(hdfsWritePath)) {
fileSystem.delete(hdfsWritePath, true);
}
FSDataOutputStream fos = fileSystem.create(hdfsWritePath);
writer = new BufferedWriter(new OutputStreamWriter(fos, StandardCharsets.UTF_8));
}
}