forked from D-Net/dnet-hadoop
Add steps to collect last visit data && archive not found repository URLs
This commit is contained in:
parent
9d44418d38
commit
ed9c81a0b7
|
@ -51,6 +51,7 @@ public class Constants {
|
||||||
public static final String RETRY_DELAY = "retryDelay";
|
public static final String RETRY_DELAY = "retryDelay";
|
||||||
public static final String CONNECT_TIMEOUT = "connectTimeOut";
|
public static final String CONNECT_TIMEOUT = "connectTimeOut";
|
||||||
public static final String READ_TIMEOUT = "readTimeOut";
|
public static final String READ_TIMEOUT = "readTimeOut";
|
||||||
|
public static final String REQUEST_METHOD = "requestMethod";
|
||||||
public static final String FROM_DATE_OVERRIDE = "fromDateOverride";
|
public static final String FROM_DATE_OVERRIDE = "fromDateOverride";
|
||||||
public static final String UNTIL_DATE_OVERRIDE = "untilDateOverride";
|
public static final String UNTIL_DATE_OVERRIDE = "untilDateOverride";
|
||||||
|
|
||||||
|
|
|
@ -1,6 +1,9 @@
|
||||||
|
|
||||||
package eu.dnetlib.dhp.common.collection;
|
package eu.dnetlib.dhp.common.collection;
|
||||||
|
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Bundles the http connection parameters driving the client behaviour.
|
* Bundles the http connection parameters driving the client behaviour.
|
||||||
*/
|
*/
|
||||||
|
@ -13,6 +16,8 @@ public class HttpClientParams {
|
||||||
public static int _connectTimeOut = 10; // seconds
|
public static int _connectTimeOut = 10; // seconds
|
||||||
public static int _readTimeOut = 30; // seconds
|
public static int _readTimeOut = 30; // seconds
|
||||||
|
|
||||||
|
public static String _requestMethod = "GET";
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Maximum number of allowed retires before failing
|
* Maximum number of allowed retires before failing
|
||||||
*/
|
*/
|
||||||
|
@ -38,17 +43,30 @@ public class HttpClientParams {
|
||||||
*/
|
*/
|
||||||
private int readTimeOut;
|
private int readTimeOut;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Custom http headers
|
||||||
|
*/
|
||||||
|
private Map<String, String> headers;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Request method (i.e., GET, POST etc)
|
||||||
|
*/
|
||||||
|
private String requestMethod;
|
||||||
|
|
||||||
|
|
||||||
public HttpClientParams() {
|
public HttpClientParams() {
|
||||||
this(_maxNumberOfRetry, _requestDelay, _retryDelay, _connectTimeOut, _readTimeOut);
|
this(_maxNumberOfRetry, _requestDelay, _retryDelay, _connectTimeOut, _readTimeOut, new HashMap<>(), _requestMethod);
|
||||||
}
|
}
|
||||||
|
|
||||||
public HttpClientParams(int maxNumberOfRetry, int requestDelay, int retryDelay, int connectTimeOut,
|
public HttpClientParams(int maxNumberOfRetry, int requestDelay, int retryDelay, int connectTimeOut,
|
||||||
int readTimeOut) {
|
int readTimeOut, Map<String, String> headers, String requestMethod) {
|
||||||
this.maxNumberOfRetry = maxNumberOfRetry;
|
this.maxNumberOfRetry = maxNumberOfRetry;
|
||||||
this.requestDelay = requestDelay;
|
this.requestDelay = requestDelay;
|
||||||
this.retryDelay = retryDelay;
|
this.retryDelay = retryDelay;
|
||||||
this.connectTimeOut = connectTimeOut;
|
this.connectTimeOut = connectTimeOut;
|
||||||
this.readTimeOut = readTimeOut;
|
this.readTimeOut = readTimeOut;
|
||||||
|
this.headers = headers;
|
||||||
|
this.requestMethod = requestMethod;
|
||||||
}
|
}
|
||||||
|
|
||||||
public int getMaxNumberOfRetry() {
|
public int getMaxNumberOfRetry() {
|
||||||
|
@ -91,4 +109,19 @@ public class HttpClientParams {
|
||||||
this.readTimeOut = readTimeOut;
|
this.readTimeOut = readTimeOut;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public Map<String, String> getHeaders() {
|
||||||
|
return headers;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setHeaders(Map<String, String> headers) {
|
||||||
|
this.headers = headers;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getRequestMethod() {
|
||||||
|
return requestMethod;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setRequestMethod(String requestMethod) {
|
||||||
|
this.requestMethod = requestMethod;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -107,7 +107,14 @@ public class HttpConnector2 {
|
||||||
urlConn.setReadTimeout(getClientParams().getReadTimeOut() * 1000);
|
urlConn.setReadTimeout(getClientParams().getReadTimeOut() * 1000);
|
||||||
urlConn.setConnectTimeout(getClientParams().getConnectTimeOut() * 1000);
|
urlConn.setConnectTimeout(getClientParams().getConnectTimeOut() * 1000);
|
||||||
urlConn.addRequestProperty(HttpHeaders.USER_AGENT, userAgent);
|
urlConn.addRequestProperty(HttpHeaders.USER_AGENT, userAgent);
|
||||||
|
urlConn.setRequestMethod(getClientParams().getRequestMethod());
|
||||||
|
|
||||||
|
// if provided, add custom headers
|
||||||
|
if (!getClientParams().getHeaders().isEmpty()) {
|
||||||
|
for (Map.Entry<String, String> headerEntry : getClientParams().getHeaders().entrySet()) {
|
||||||
|
urlConn.addRequestProperty(headerEntry.getKey(), headerEntry.getValue());
|
||||||
|
}
|
||||||
|
}
|
||||||
if (log.isDebugEnabled()) {
|
if (log.isDebugEnabled()) {
|
||||||
logHeaderFields(urlConn);
|
logHeaderFields(urlConn);
|
||||||
}
|
}
|
||||||
|
|
|
@ -99,6 +99,12 @@
|
||||||
<artifactId>httpclient</artifactId>
|
<artifactId>httpclient</artifactId>
|
||||||
<version>4.5.13</version>
|
<version>4.5.13</version>
|
||||||
</dependency>
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.datanucleus</groupId>
|
||||||
|
<artifactId>datanucleus-core</artifactId>
|
||||||
|
<version>3.2.10</version>
|
||||||
|
<scope>compile</scope>
|
||||||
|
</dependency>
|
||||||
|
|
||||||
</dependencies>
|
</dependencies>
|
||||||
</project>
|
</project>
|
||||||
|
|
|
@ -0,0 +1,137 @@
|
||||||
|
|
||||||
|
package eu.dnetlib.dhp.swh;
|
||||||
|
|
||||||
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
|
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
||||||
|
import eu.dnetlib.dhp.common.collection.CollectorException;
|
||||||
|
import eu.dnetlib.dhp.common.collection.HttpClientParams;
|
||||||
|
import eu.dnetlib.dhp.swh.models.LastVisitData;
|
||||||
|
import eu.dnetlib.dhp.swh.utils.SWHConnection;
|
||||||
|
import eu.dnetlib.dhp.swh.utils.SWHConstants;
|
||||||
|
import eu.dnetlib.dhp.swh.utils.SWHUtils;
|
||||||
|
import org.apache.commons.cli.ParseException;
|
||||||
|
import org.apache.commons.io.IOUtils;
|
||||||
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
|
import org.apache.hadoop.io.SequenceFile;
|
||||||
|
import org.apache.hadoop.io.Text;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.net.URL;
|
||||||
|
import java.util.Date;
|
||||||
|
import java.util.Optional;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
|
import static eu.dnetlib.dhp.common.Constants.REQUEST_METHOD;
|
||||||
|
import static eu.dnetlib.dhp.utils.DHPUtils.getHadoopConfiguration;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Sends archive requests to the SWH API for those software repository URLs that are missing from them
|
||||||
|
*
|
||||||
|
* @author Serafeim Chatzopoulos
|
||||||
|
*/
|
||||||
|
public class ArchiveRepositoryURLs {
|
||||||
|
|
||||||
|
private static final Logger log = LoggerFactory.getLogger(ArchiveRepositoryURLs.class);
|
||||||
|
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
|
||||||
|
|
||||||
|
private static SWHConnection swhConnection = null;
|
||||||
|
|
||||||
|
public static void main(final String[] args) throws IOException, ParseException {
|
||||||
|
final ArgumentApplicationParser argumentParser = new ArgumentApplicationParser(
|
||||||
|
IOUtils
|
||||||
|
.toString(
|
||||||
|
CollectLastVisitRepositoryData.class
|
||||||
|
.getResourceAsStream(
|
||||||
|
"/eu/dnetlib/dhp/swh/input_archive_repository_urls.json")));
|
||||||
|
argumentParser.parseArgument(args);
|
||||||
|
|
||||||
|
final String hdfsuri = argumentParser.get("namenode");
|
||||||
|
log.info("hdfsURI: {}", hdfsuri);
|
||||||
|
|
||||||
|
final String inputPath = argumentParser.get("lastVisitsPath");
|
||||||
|
log.info("inputPath: {}", inputPath);
|
||||||
|
|
||||||
|
final String outputPath = argumentParser.get("archiveRequestsPath");
|
||||||
|
log.info("outputPath: {}", outputPath);
|
||||||
|
|
||||||
|
final Integer archiveThresholdInDays = Integer.parseInt(argumentParser.get("archiveThresholdInDays"));
|
||||||
|
log.info("archiveThresholdInDays: {}", archiveThresholdInDays);
|
||||||
|
|
||||||
|
final HttpClientParams clientParams = SWHUtils.getClientParams(argumentParser);
|
||||||
|
|
||||||
|
swhConnection = new SWHConnection(clientParams);
|
||||||
|
|
||||||
|
final FileSystem fs = FileSystem.get(getHadoopConfiguration(hdfsuri));
|
||||||
|
|
||||||
|
archive(fs, inputPath, outputPath, archiveThresholdInDays);
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
private static void archive(FileSystem fs, String inputPath, String outputPath, Integer archiveThresholdInDays) throws IOException {
|
||||||
|
|
||||||
|
SequenceFile.Reader fr = SWHUtils.getSequenceFileReader(fs, inputPath);
|
||||||
|
SequenceFile.Writer fw = SWHUtils.getSequenceFileWriter(fs, outputPath);
|
||||||
|
|
||||||
|
// Create key and value objects to hold data
|
||||||
|
Text repoUrl = new Text();
|
||||||
|
Text lastVisitData = new Text();
|
||||||
|
|
||||||
|
// Read key-value pairs from the SequenceFile and handle appropriately
|
||||||
|
while (fr.next(repoUrl, lastVisitData)) {
|
||||||
|
|
||||||
|
String response = handleRecord(repoUrl.toString(), lastVisitData.toString(), archiveThresholdInDays);
|
||||||
|
|
||||||
|
// response is equal to null when no need for request
|
||||||
|
if (response != null) {
|
||||||
|
SWHUtils.appendToSequenceFile(fw, repoUrl.toString(), response);
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
// Close readers
|
||||||
|
fw.close();
|
||||||
|
fr.close();
|
||||||
|
}
|
||||||
|
|
||||||
|
public static String handleRecord(String repoUrl, String lastVisitData, Integer archiveThresholdInDays) throws IOException {
|
||||||
|
System.out.println("Key: " + repoUrl + ", Value: " + lastVisitData);
|
||||||
|
|
||||||
|
LastVisitData lastVisit = OBJECT_MAPPER.readValue(lastVisitData, LastVisitData.class);
|
||||||
|
|
||||||
|
// perform an archive request when no repoUrl was not found in previous step
|
||||||
|
if (lastVisit.getSnapshot() != null) {
|
||||||
|
|
||||||
|
// OR last visit was before (now() - archiveThresholdInDays)
|
||||||
|
long diffInMillies = Math.abs((new Date()).getTime() - lastVisit.getDate().getTime());
|
||||||
|
long diffInDays = TimeUnit.DAYS.convert(diffInMillies, TimeUnit.MILLISECONDS);
|
||||||
|
|
||||||
|
if (archiveThresholdInDays >= diffInDays) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// if last visit data are available, re-use version control type, else use the default one (i.e., git)
|
||||||
|
String visitType = Optional
|
||||||
|
.ofNullable(lastVisit.getType())
|
||||||
|
.orElse(SWHConstants.DEFAULT_VISIT_TYPE);
|
||||||
|
|
||||||
|
URL url = new URL(String.format(SWHConstants.SWH_ARCHIVE_URL, visitType, repoUrl.trim()));
|
||||||
|
System.out.println(url.toString());
|
||||||
|
|
||||||
|
String response;
|
||||||
|
try {
|
||||||
|
response = swhConnection.call(url.toString());
|
||||||
|
} catch (CollectorException e) {
|
||||||
|
log.info("Error in request: {}", url);
|
||||||
|
response = "{}";
|
||||||
|
}
|
||||||
|
|
||||||
|
return response;
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,120 @@
|
||||||
|
|
||||||
|
package eu.dnetlib.dhp.swh;
|
||||||
|
|
||||||
|
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
||||||
|
import eu.dnetlib.dhp.common.collection.CollectorException;
|
||||||
|
import eu.dnetlib.dhp.common.collection.HttpClientParams;
|
||||||
|
import eu.dnetlib.dhp.swh.utils.SWHConnection;
|
||||||
|
import eu.dnetlib.dhp.swh.utils.SWHConstants;
|
||||||
|
import eu.dnetlib.dhp.swh.utils.SWHUtils;
|
||||||
|
import org.apache.commons.cli.ParseException;
|
||||||
|
import org.apache.commons.io.IOUtils;
|
||||||
|
import org.apache.hadoop.fs.FSDataInputStream;
|
||||||
|
import org.apache.hadoop.fs.FileStatus;
|
||||||
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.io.SequenceFile;
|
||||||
|
import org.apache.hadoop.io.Text;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import java.io.BufferedReader;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.io.InputStreamReader;
|
||||||
|
import java.net.URISyntaxException;
|
||||||
|
import java.net.URL;
|
||||||
|
import java.nio.charset.StandardCharsets;
|
||||||
|
|
||||||
|
import static eu.dnetlib.dhp.utils.DHPUtils.getHadoopConfiguration;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Given a file with software repository URLs, this class
|
||||||
|
* collects last visit data from the Software Heritage API.
|
||||||
|
*
|
||||||
|
* @author Serafeim Chatzopoulos
|
||||||
|
*/
|
||||||
|
public class CollectLastVisitRepositoryData {
|
||||||
|
|
||||||
|
private static final Logger log = LoggerFactory.getLogger(CollectLastVisitRepositoryData.class);
|
||||||
|
private static SWHConnection swhConnection = null;
|
||||||
|
|
||||||
|
public static void main(final String[] args)
|
||||||
|
throws IOException, ParseException, InterruptedException, URISyntaxException, CollectorException {
|
||||||
|
final ArgumentApplicationParser argumentParser = new ArgumentApplicationParser(
|
||||||
|
IOUtils
|
||||||
|
.toString(
|
||||||
|
CollectLastVisitRepositoryData.class
|
||||||
|
.getResourceAsStream(
|
||||||
|
"/eu/dnetlib/dhp/swh/input_collect_last_visit_repository_data.json")));
|
||||||
|
argumentParser.parseArgument(args);
|
||||||
|
|
||||||
|
log.info("Java Xmx: {}m", Runtime.getRuntime().maxMemory() / (1024 * 1024));
|
||||||
|
|
||||||
|
final String hdfsuri = argumentParser.get("namenode");
|
||||||
|
log.info("hdfsURI: {}", hdfsuri);
|
||||||
|
|
||||||
|
final String inputPath = argumentParser.get("softwareCodeRepositoryURLs");
|
||||||
|
log.info("inputPath: {}", inputPath);
|
||||||
|
|
||||||
|
final String outputPath = argumentParser.get("lastVisitsPath");
|
||||||
|
log.info("outputPath: {}", outputPath);
|
||||||
|
|
||||||
|
final HttpClientParams clientParams = SWHUtils.getClientParams(argumentParser);
|
||||||
|
|
||||||
|
swhConnection = new SWHConnection(clientParams);
|
||||||
|
|
||||||
|
final FileSystem fs = FileSystem.get(getHadoopConfiguration(hdfsuri));
|
||||||
|
|
||||||
|
collect(fs, inputPath, outputPath);
|
||||||
|
|
||||||
|
fs.close();
|
||||||
|
}
|
||||||
|
|
||||||
|
private static void collect(FileSystem fs, String inputPath, String outputPath)
|
||||||
|
throws IOException {
|
||||||
|
|
||||||
|
SequenceFile.Writer fw = SWHUtils.getSequenceFileWriter(fs, outputPath);
|
||||||
|
|
||||||
|
// Specify the HDFS directory path you want to read
|
||||||
|
Path directoryPath = new Path(inputPath);
|
||||||
|
|
||||||
|
// List all files in the directory
|
||||||
|
FileStatus[] partStatuses = fs.listStatus(directoryPath);
|
||||||
|
|
||||||
|
for (FileStatus partStatus : partStatuses) {
|
||||||
|
|
||||||
|
// Check if it's a file (not a directory)
|
||||||
|
if (partStatus.isFile()) {
|
||||||
|
handleFile(fs, partStatus.getPath(), fw);
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
fw.close();
|
||||||
|
}
|
||||||
|
|
||||||
|
private static void handleFile(FileSystem fs, Path partInputPath, SequenceFile.Writer fw)
|
||||||
|
throws IOException {
|
||||||
|
|
||||||
|
BufferedReader br = SWHUtils.getFileReader(fs, partInputPath);
|
||||||
|
|
||||||
|
String repoUrl;
|
||||||
|
while ((repoUrl = br.readLine()) != null) {
|
||||||
|
|
||||||
|
URL url = new URL(String.format(SWHConstants.SWH_LATEST_VISIT_URL, repoUrl.trim()));
|
||||||
|
|
||||||
|
String response;
|
||||||
|
try {
|
||||||
|
response = swhConnection.call(url.toString());
|
||||||
|
} catch (CollectorException e) {
|
||||||
|
log.info("Error in request: {}", url);
|
||||||
|
response = "{}";
|
||||||
|
}
|
||||||
|
|
||||||
|
SWHUtils.appendToSequenceFile(fw, repoUrl, response);
|
||||||
|
}
|
||||||
|
|
||||||
|
br.close();
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -1,60 +1,37 @@
|
||||||
|
|
||||||
package eu.dnetlib.dhp.swh;
|
package eu.dnetlib.dhp.swh;
|
||||||
|
|
||||||
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkHiveSession;
|
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
||||||
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
|
import eu.dnetlib.dhp.schema.oaf.Result;
|
||||||
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.io.Serializable;
|
|
||||||
import java.util.ArrayList;
|
|
||||||
import java.util.Iterator;
|
|
||||||
import java.util.List;
|
|
||||||
import java.util.Optional;
|
|
||||||
import java.util.concurrent.TimeUnit;
|
|
||||||
|
|
||||||
import org.apache.commons.io.IOUtils;
|
import org.apache.commons.io.IOUtils;
|
||||||
import org.apache.http.Header;
|
|
||||||
import org.apache.http.HttpEntity;
|
|
||||||
import org.apache.http.client.methods.CloseableHttpResponse;
|
|
||||||
import org.apache.http.client.methods.HttpGet;
|
|
||||||
import org.apache.http.impl.client.CloseableHttpClient;
|
|
||||||
import org.apache.http.impl.client.HttpClients;
|
|
||||||
import org.apache.http.util.EntityUtils;
|
|
||||||
import org.apache.spark.SparkConf;
|
import org.apache.spark.SparkConf;
|
||||||
import org.apache.spark.api.java.function.FlatMapFunction;
|
|
||||||
import org.apache.spark.sql.*;
|
|
||||||
import org.apache.spark.sql.Dataset;
|
import org.apache.spark.sql.Dataset;
|
||||||
import org.apache.spark.sql.types.DataTypes;
|
import org.apache.spark.sql.Row;
|
||||||
import org.apache.spark.sql.types.StructType;
|
import org.apache.spark.sql.SaveMode;
|
||||||
|
import org.apache.spark.sql.SparkSession;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
import java.io.Serializable;
|
||||||
|
import java.util.Optional;
|
||||||
|
|
||||||
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkHiveSession;
|
||||||
import eu.dnetlib.dhp.schema.common.ModelConstants;
|
|
||||||
import eu.dnetlib.dhp.schema.oaf.*;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Creates action sets for Crossref affiliation relations inferred by BIP!
|
* Collects unique software repository URLs in the Graph using Hive
|
||||||
|
*
|
||||||
|
* @author Serafeim Chatzopoulos
|
||||||
*/
|
*/
|
||||||
public class CollectSoftwareRepositoryURLs implements Serializable {
|
public class CollectSoftwareRepositoryURLs implements Serializable {
|
||||||
|
|
||||||
private static final Logger log = LoggerFactory.getLogger(CollectSoftwareRepositoryURLs.class);
|
private static final Logger log = LoggerFactory.getLogger(CollectSoftwareRepositoryURLs.class);
|
||||||
// public static final String BIP_AFFILIATIONS_CLASSID = "result:organization:bipinference";
|
|
||||||
// public static final String BIP_AFFILIATIONS_CLASSNAME = "Affiliation relation inferred by BIP!";
|
|
||||||
// public static final String BIP_INFERENCE_PROVENANCE = "bip:affiliation:crossref";
|
|
||||||
private static final String DEFAULT_VISIT_TYPE = "git";
|
|
||||||
private static final int CONCURRENT_API_CALLS = 1;
|
|
||||||
|
|
||||||
private static final String SWH_LATEST_VISIT_URL = "https://archive.softwareheritage.org/api/1/origin/%s/visit/latest/";
|
|
||||||
|
|
||||||
public static <I extends Result> void main(String[] args) throws Exception {
|
public static <I extends Result> void main(String[] args) throws Exception {
|
||||||
|
|
||||||
String jsonConfiguration = IOUtils
|
String jsonConfiguration = IOUtils
|
||||||
.toString(
|
.toString(
|
||||||
CollectSoftwareRepositoryURLs.class
|
CollectSoftwareRepositoryURLs.class
|
||||||
.getResourceAsStream("/eu/dnetlib/dhp/swh/input_parameters.json"));
|
.getResourceAsStream("/eu/dnetlib/dhp/swh/input_collect_software_repository_urls.json"));
|
||||||
|
|
||||||
final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
|
final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
|
||||||
parser.parseArgument(args);
|
parser.parseArgument(args);
|
||||||
|
@ -89,7 +66,10 @@ public class CollectSoftwareRepositoryURLs implements Serializable {
|
||||||
|
|
||||||
String queryTemplate = "SELECT distinct coderepositoryurl.value " +
|
String queryTemplate = "SELECT distinct coderepositoryurl.value " +
|
||||||
"FROM %s.software " +
|
"FROM %s.software " +
|
||||||
"WHERE coderepositoryurl.value IS NOT NULL";
|
"WHERE coderepositoryurl.value IS NOT NULL " +
|
||||||
|
"AND datainfo.deletedbyinference = FALSE " +
|
||||||
|
"AND datainfo.invisible = FALSE " +
|
||||||
|
"LIMIT 1000"; // TODO remove
|
||||||
String query = String.format(queryTemplate, hiveDbName);
|
String query = String.format(queryTemplate, hiveDbName);
|
||||||
|
|
||||||
log.info("Hive query to fetch software code URLs: {}", query);
|
log.info("Hive query to fetch software code URLs: {}", query);
|
||||||
|
@ -100,112 +80,6 @@ public class CollectSoftwareRepositoryURLs implements Serializable {
|
||||||
df
|
df
|
||||||
.write()
|
.write()
|
||||||
.mode(SaveMode.Overwrite)
|
.mode(SaveMode.Overwrite)
|
||||||
// .option("compression", "gzip")
|
|
||||||
.csv(outputPath);
|
.csv(outputPath);
|
||||||
}
|
}
|
||||||
|
|
||||||
private static Dataset<Row> readSoftware(SparkSession spark, String inputPath) {
|
|
||||||
return spark
|
|
||||||
.read()
|
|
||||||
.json(inputPath)
|
|
||||||
.select(
|
|
||||||
new Column("codeRepositoryUrl.value").as("codeRepositoryUrl"),
|
|
||||||
new Column("dataInfo.deletedbyinference"),
|
|
||||||
new Column("dataInfo.invisible"));
|
|
||||||
}
|
|
||||||
|
|
||||||
private static Dataset<Row> filterSoftware(Dataset<Row> softwareDF, Integer limit) {
|
|
||||||
|
|
||||||
Dataset<Row> df = softwareDF
|
|
||||||
.where(softwareDF.col("codeRepositoryUrl").isNotNull())
|
|
||||||
.where("deletedbyinference = false")
|
|
||||||
.where("invisible = false")
|
|
||||||
.drop("deletedbyinference")
|
|
||||||
.drop("invisible");
|
|
||||||
|
|
||||||
// TODO remove when done
|
|
||||||
df = df.limit(limit);
|
|
||||||
|
|
||||||
return df;
|
|
||||||
}
|
|
||||||
|
|
||||||
public static Dataset<Row> makeParallelRequests(SparkSession spark, Dataset<Row> softwareDF) {
|
|
||||||
// TODO replace with coalesce ?
|
|
||||||
Dataset<Row> df = softwareDF.repartition(CONCURRENT_API_CALLS);
|
|
||||||
|
|
||||||
log.info("Number of partitions: {}", df.rdd().getNumPartitions());
|
|
||||||
|
|
||||||
ObjectMapper objectMapper = new ObjectMapper();
|
|
||||||
|
|
||||||
List<Row> collectedRows = df
|
|
||||||
.javaRDD()
|
|
||||||
// max parallelism should be equal to the number of partitions here
|
|
||||||
.mapPartitions((FlatMapFunction<Iterator<Row>, Row>) partition -> {
|
|
||||||
List<Row> resultRows = new ArrayList<>();
|
|
||||||
while (partition.hasNext()) {
|
|
||||||
Row row = partition.next();
|
|
||||||
String url = String.format(SWH_LATEST_VISIT_URL, row.getString(0));
|
|
||||||
|
|
||||||
// String snapshotId = null;
|
|
||||||
// String type = null;
|
|
||||||
// String date = null;
|
|
||||||
|
|
||||||
String responseBody = makeAPICall(url);
|
|
||||||
TimeUnit.SECONDS.sleep(1);
|
|
||||||
// Thread.sleep(500);
|
|
||||||
// if (responseBody != null) {
|
|
||||||
// LastVisitResponse visitResponse = objectMapper.readValue(responseBody, LastVisitResponse.class);
|
|
||||||
// snapshotId = visitResponse.getSnapshot();
|
|
||||||
// type = visitResponse.getType();
|
|
||||||
// date = visitResponse.getDate();
|
|
||||||
// }
|
|
||||||
// resultRows.add(RowFactory.create(url, snapshotId, type, date));
|
|
||||||
|
|
||||||
resultRows.add(RowFactory.create(url, responseBody));
|
|
||||||
}
|
|
||||||
return resultRows.iterator();
|
|
||||||
|
|
||||||
})
|
|
||||||
.collect();
|
|
||||||
|
|
||||||
StructType resultSchema = new StructType()
|
|
||||||
.add("codeRepositoryUrl", DataTypes.StringType)
|
|
||||||
.add("response", DataTypes.StringType);
|
|
||||||
|
|
||||||
// .add("snapshotId", DataTypes.StringType)
|
|
||||||
// .add("type", DataTypes.StringType)
|
|
||||||
// .add("date", DataTypes.StringType);
|
|
||||||
|
|
||||||
// create a DataFrame from the collected rows
|
|
||||||
return spark.createDataFrame(collectedRows, resultSchema);
|
|
||||||
}
|
|
||||||
|
|
||||||
private static String makeAPICall(String url) throws IOException {
|
|
||||||
System.out.println(java.time.LocalDateTime.now());
|
|
||||||
|
|
||||||
try (CloseableHttpClient httpClient = HttpClients.createDefault()) {
|
|
||||||
HttpGet httpGet = new HttpGet(url);
|
|
||||||
httpGet
|
|
||||||
.setHeader(
|
|
||||||
"Authorization",
|
|
||||||
"Bearer eyJhbGciOiJIUzI1NiIsInR5cCIgOiAiSldUIiwia2lkIiA6ICJhMTMxYTQ1My1hM2IyLTQwMTUtODQ2Ny05MzAyZjk3MTFkOGEifQ.eyJpYXQiOjE2OTQ2MzYwMjAsImp0aSI6IjkwZjdkNTNjLTQ5YTktNGFiMy1hY2E0LTcwMTViMjEyZTNjNiIsImlzcyI6Imh0dHBzOi8vYXV0aC5zb2Z0d2FyZWhlcml0YWdlLm9yZy9hdXRoL3JlYWxtcy9Tb2Z0d2FyZUhlcml0YWdlIiwiYXVkIjoiaHR0cHM6Ly9hdXRoLnNvZnR3YXJlaGVyaXRhZ2Uub3JnL2F1dGgvcmVhbG1zL1NvZnR3YXJlSGVyaXRhZ2UiLCJzdWIiOiIzMTY5OWZkNC0xNmE0LTQxOWItYTdhMi00NjI5MDY4ZjI3OWEiLCJ0eXAiOiJPZmZsaW5lIiwiYXpwIjoic3doLXdlYiIsInNlc3Npb25fc3RhdGUiOiIzMjYzMzEwMS00ZDRkLTQwMjItODU2NC1iMzNlMTJiNTE3ZDkiLCJzY29wZSI6Im9wZW5pZCBvZmZsaW5lX2FjY2VzcyBwcm9maWxlIGVtYWlsIn0.XHj1VIZu1dZ4Ej32-oU84mFmaox9cLNjXosNxwZM0Xs");
|
|
||||||
try (CloseableHttpResponse response = httpClient.execute(httpGet)) {
|
|
||||||
int statusCode = response.getStatusLine().getStatusCode();
|
|
||||||
// if (statusCode != 200)
|
|
||||||
// return null;
|
|
||||||
Header[] headers = response.getHeaders("X-RateLimit-Remaining");
|
|
||||||
for (Header header : headers) {
|
|
||||||
System.out
|
|
||||||
.println(
|
|
||||||
"Key : " + header.getName()
|
|
||||||
+ " ,Value : " + header.getValue());
|
|
||||||
}
|
|
||||||
HttpEntity entity = response.getEntity();
|
|
||||||
if (entity != null) {
|
|
||||||
return EntityUtils.toString(entity);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -4,12 +4,14 @@ package eu.dnetlib.dhp.swh.models;
|
||||||
import com.cloudera.com.fasterxml.jackson.annotation.JsonProperty;
|
import com.cloudera.com.fasterxml.jackson.annotation.JsonProperty;
|
||||||
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
|
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
|
||||||
|
|
||||||
|
import java.util.Date;
|
||||||
|
|
||||||
@JsonIgnoreProperties(ignoreUnknown = true)
|
@JsonIgnoreProperties(ignoreUnknown = true)
|
||||||
public class LastVisitResponse {
|
public class LastVisitData {
|
||||||
|
|
||||||
private String type;
|
private String type;
|
||||||
|
|
||||||
private String date;
|
private Date date;
|
||||||
|
|
||||||
@JsonProperty("snapshot")
|
@JsonProperty("snapshot")
|
||||||
private String snapshotId;
|
private String snapshotId;
|
||||||
|
@ -22,11 +24,11 @@ public class LastVisitResponse {
|
||||||
this.type = type;
|
this.type = type;
|
||||||
}
|
}
|
||||||
|
|
||||||
public String getDate() {
|
public Date getDate() {
|
||||||
return date;
|
return date;
|
||||||
}
|
}
|
||||||
|
|
||||||
public void setDate(String date) {
|
public void setDate(Date date) {
|
||||||
this.date = date;
|
this.date = date;
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,138 @@
|
||||||
|
|
||||||
|
package eu.dnetlib.dhp.swh.utils;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
import org.apache.commons.lang3.math.NumberUtils;
|
||||||
|
import org.apache.http.Header;
|
||||||
|
import org.apache.http.HttpHeaders;
|
||||||
|
import org.apache.http.HttpStatus;
|
||||||
|
import org.apache.http.client.methods.CloseableHttpResponse;
|
||||||
|
import org.apache.http.client.methods.HttpGet;
|
||||||
|
import org.apache.http.impl.client.CloseableHttpClient;
|
||||||
|
import org.apache.http.util.EntityUtils;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import eu.dnetlib.dhp.common.Constants;
|
||||||
|
import eu.dnetlib.dhp.common.collection.CollectorException;
|
||||||
|
import eu.dnetlib.dhp.common.collection.HttpClientParams;
|
||||||
|
import eu.dnetlib.dhp.common.collection.HttpConnector2;
|
||||||
|
|
||||||
|
public class SWHConnection {
|
||||||
|
|
||||||
|
private static final Logger log = LoggerFactory.getLogger(SWHConnection.class);
|
||||||
|
|
||||||
|
CloseableHttpClient httpClient;
|
||||||
|
|
||||||
|
HttpClientParams clientParams;
|
||||||
|
|
||||||
|
HttpConnector2 conn;
|
||||||
|
|
||||||
|
public SWHConnection(HttpClientParams clientParams) {
|
||||||
|
|
||||||
|
// // force http client to NOT transform double quotes (//) to single quote (/)
|
||||||
|
// RequestConfig requestConfig = RequestConfig.custom().setNormalizeUri(false).build();
|
||||||
|
//
|
||||||
|
// // Create an HttpClient instance
|
||||||
|
// httpClient = HttpClientBuilder
|
||||||
|
// .create()
|
||||||
|
// .setDefaultRequestConfig(requestConfig)
|
||||||
|
// .build();
|
||||||
|
//
|
||||||
|
// this.clientParams = clientParams;
|
||||||
|
// set custom headers
|
||||||
|
Map<String, String> headers = new HashMap<String, String>() {
|
||||||
|
{
|
||||||
|
put(HttpHeaders.ACCEPT, "application/json");
|
||||||
|
put(HttpHeaders.AUTHORIZATION, String.format("Bearer %s", SWHConstants.ACCESS_TOKEN));
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
clientParams.setHeaders(headers);
|
||||||
|
|
||||||
|
// create http connector
|
||||||
|
conn = new HttpConnector2(clientParams);
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
public String call(String url) throws CollectorException {
|
||||||
|
return conn.getInputSource(url);
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getLib(String url) throws IOException, CollectorException {
|
||||||
|
|
||||||
|
// delay between requests
|
||||||
|
if (this.clientParams.getRequestDelay() > 0) {
|
||||||
|
log.info("Request delay: {}", this.clientParams.getRequestDelay());
|
||||||
|
this.backOff(this.clientParams.getRequestDelay());
|
||||||
|
}
|
||||||
|
|
||||||
|
// Create an HttpGet request with the URL
|
||||||
|
HttpGet httpGet = new HttpGet(url);
|
||||||
|
httpGet.setHeader("Accept", "application/json");
|
||||||
|
httpGet.setHeader("Authorization", String.format("Bearer %s", SWHConstants.ACCESS_TOKEN));
|
||||||
|
|
||||||
|
// Execute the request and get the response
|
||||||
|
try (CloseableHttpResponse response = httpClient.execute(httpGet)) {
|
||||||
|
|
||||||
|
System.out.println(url);
|
||||||
|
|
||||||
|
int responseCode = response.getStatusLine().getStatusCode();
|
||||||
|
if (responseCode != HttpStatus.SC_OK) {
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
System.out.println(responseCode);
|
||||||
|
|
||||||
|
List<Header> httpHeaders = Arrays.asList(response.getAllHeaders());
|
||||||
|
for (Header header : httpHeaders) {
|
||||||
|
System.out.println(header.getName() + ":\t" + header.getValue());
|
||||||
|
}
|
||||||
|
|
||||||
|
String rateRemaining = this.getRateRemaining(response);
|
||||||
|
|
||||||
|
// back off when rate remaining limit is approaching
|
||||||
|
if (rateRemaining != null && (Integer.parseInt(rateRemaining) < 2)) {
|
||||||
|
int retryAfter = this.getRetryAfter(response);
|
||||||
|
|
||||||
|
log.info("Rate Limit: {} - Backing off: {}", rateRemaining, retryAfter);
|
||||||
|
this.backOff(retryAfter);
|
||||||
|
}
|
||||||
|
|
||||||
|
return EntityUtils.toString(response.getEntity());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private String getRateRemaining(CloseableHttpResponse response) {
|
||||||
|
Header header = response.getFirstHeader(Constants.HTTPHEADER_IETF_DRAFT_RATELIMIT_REMAINING);
|
||||||
|
if (header != null) {
|
||||||
|
return header.getValue();
|
||||||
|
}
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
private int getRetryAfter(CloseableHttpResponse response) {
|
||||||
|
Header header = response.getFirstHeader(HttpHeaders.RETRY_AFTER);
|
||||||
|
if (header != null) {
|
||||||
|
String retryAfter = header.getValue();
|
||||||
|
if (NumberUtils.isCreatable(retryAfter)) {
|
||||||
|
return Integer.parseInt(retryAfter) + 10;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return 1000;
|
||||||
|
}
|
||||||
|
|
||||||
|
private void backOff(int sleepTimeMs) throws CollectorException {
|
||||||
|
try {
|
||||||
|
Thread.sleep(sleepTimeMs);
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
throw new CollectorException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,13 @@
|
||||||
|
|
||||||
|
package eu.dnetlib.dhp.swh.utils;
|
||||||
|
|
||||||
|
public class SWHConstants {
|
||||||
|
public static final String SWH_LATEST_VISIT_URL = "https://archive.softwareheritage.org/api/1/origin/%s/visit/latest/";
|
||||||
|
|
||||||
|
public static final String SWH_ARCHIVE_URL = "https://archive.softwareheritage.org/api/1/origin/save/%s/url/%s/";
|
||||||
|
|
||||||
|
public static final String ACCESS_TOKEN = "";
|
||||||
|
|
||||||
|
public static final String DEFAULT_VISIT_TYPE = "git";
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,94 @@
|
||||||
|
|
||||||
|
package eu.dnetlib.dhp.swh.utils;
|
||||||
|
|
||||||
|
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
||||||
|
import eu.dnetlib.dhp.common.collection.HttpClientParams;
|
||||||
|
import org.apache.hadoop.fs.FSDataInputStream;
|
||||||
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.io.SequenceFile;
|
||||||
|
import org.apache.hadoop.io.Text;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import java.io.BufferedReader;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.io.InputStreamReader;
|
||||||
|
import java.nio.charset.StandardCharsets;
|
||||||
|
import java.util.Optional;
|
||||||
|
|
||||||
|
import static eu.dnetlib.dhp.common.Constants.*;
|
||||||
|
|
||||||
|
public class SWHUtils {
|
||||||
|
|
||||||
|
private static final Logger log = LoggerFactory.getLogger(SWHUtils.class);
|
||||||
|
|
||||||
|
public static HttpClientParams getClientParams(ArgumentApplicationParser argumentParser) {
|
||||||
|
|
||||||
|
final HttpClientParams clientParams = new HttpClientParams();
|
||||||
|
clientParams
|
||||||
|
.setMaxNumberOfRetry(
|
||||||
|
Optional
|
||||||
|
.ofNullable(argumentParser.get(MAX_NUMBER_OF_RETRY))
|
||||||
|
.map(Integer::parseInt)
|
||||||
|
.orElse(HttpClientParams._maxNumberOfRetry));
|
||||||
|
log.info("maxNumberOfRetry is {}", clientParams.getMaxNumberOfRetry());
|
||||||
|
|
||||||
|
clientParams
|
||||||
|
.setRequestDelay(
|
||||||
|
Optional
|
||||||
|
.ofNullable(argumentParser.get(REQUEST_DELAY))
|
||||||
|
.map(Integer::parseInt)
|
||||||
|
.orElse(HttpClientParams._requestDelay));
|
||||||
|
log.info("requestDelay is {}", clientParams.getRequestDelay());
|
||||||
|
|
||||||
|
clientParams
|
||||||
|
.setRetryDelay(
|
||||||
|
Optional
|
||||||
|
.ofNullable(argumentParser.get(RETRY_DELAY))
|
||||||
|
.map(Integer::parseInt)
|
||||||
|
.orElse(HttpClientParams._retryDelay));
|
||||||
|
log.info("retryDelay is {}", clientParams.getRetryDelay());
|
||||||
|
|
||||||
|
clientParams
|
||||||
|
.setRequestMethod(
|
||||||
|
Optional
|
||||||
|
.ofNullable(argumentParser.get(REQUEST_METHOD))
|
||||||
|
.orElse(HttpClientParams._requestMethod));
|
||||||
|
log.info("requestMethod is {}", clientParams.getRequestMethod());
|
||||||
|
|
||||||
|
return clientParams;
|
||||||
|
}
|
||||||
|
|
||||||
|
public static BufferedReader getFileReader(FileSystem fs, Path inputPath) throws IOException {
|
||||||
|
FSDataInputStream inputStream = fs.open(inputPath);
|
||||||
|
return new BufferedReader(
|
||||||
|
new InputStreamReader(inputStream, StandardCharsets.UTF_8));
|
||||||
|
}
|
||||||
|
|
||||||
|
public static SequenceFile.Writer getSequenceFileWriter(FileSystem fs, String outputPath) throws IOException {
|
||||||
|
return SequenceFile
|
||||||
|
.createWriter(
|
||||||
|
fs.getConf(),
|
||||||
|
SequenceFile.Writer.file(new Path(outputPath)),
|
||||||
|
SequenceFile.Writer.keyClass(Text.class),
|
||||||
|
SequenceFile.Writer.valueClass(Text.class));
|
||||||
|
}
|
||||||
|
|
||||||
|
public static SequenceFile.Reader getSequenceFileReader(FileSystem fs, String inputPath) throws IOException {
|
||||||
|
Path filePath = new Path(inputPath);
|
||||||
|
SequenceFile.Reader.Option fileOption = SequenceFile.Reader.file(filePath);
|
||||||
|
|
||||||
|
return new SequenceFile.Reader(fs.getConf(), fileOption);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static void appendToSequenceFile(SequenceFile.Writer fw, String keyStr, String valueStr) throws IOException {
|
||||||
|
Text key = new Text();
|
||||||
|
key.set(keyStr);
|
||||||
|
|
||||||
|
Text value = new Text();
|
||||||
|
value.set(valueStr);
|
||||||
|
|
||||||
|
fw.append(key, value);
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,26 @@
|
||||||
|
[
|
||||||
|
{
|
||||||
|
"paramName": "n",
|
||||||
|
"paramLongName": "namenode",
|
||||||
|
"paramDescription": "the Name Node URI",
|
||||||
|
"paramRequired": true
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"paramName": "lv",
|
||||||
|
"paramLongName": "lastVisitsPath",
|
||||||
|
"paramDescription": "the URL where to store last visits data",
|
||||||
|
"paramRequired": true
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"paramName": "rqd",
|
||||||
|
"paramLongName": "requestDelay",
|
||||||
|
"paramDescription": "the delay (ms) between requests",
|
||||||
|
"paramRequired": false
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"paramName": "atid",
|
||||||
|
"paramLongName": "archiveThresholdInDays",
|
||||||
|
"paramDescription": "the thershold (in days) required to issue an archive request",
|
||||||
|
"paramRequired": false
|
||||||
|
}
|
||||||
|
]
|
|
@ -0,0 +1,38 @@
|
||||||
|
[
|
||||||
|
{
|
||||||
|
"paramName": "n",
|
||||||
|
"paramLongName": "namenode",
|
||||||
|
"paramDescription": "the Name Node URI",
|
||||||
|
"paramRequired": true
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"paramName": "scr",
|
||||||
|
"paramLongName": "softwareCodeRepositoryURLs",
|
||||||
|
"paramDescription": "the URL from where to read software repository URLs",
|
||||||
|
"paramRequired": true
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"paramName": "lv",
|
||||||
|
"paramLongName": "lastVisitsPath",
|
||||||
|
"paramDescription": "the URL where to store last visits data",
|
||||||
|
"paramRequired": true
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"paramName": "mnr",
|
||||||
|
"paramLongName": "maxNumberOfRetry",
|
||||||
|
"paramDescription": "the maximum number of admitted connection retries",
|
||||||
|
"paramRequired": false
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"paramName": "rqd",
|
||||||
|
"paramLongName": "requestDelay",
|
||||||
|
"paramDescription": "the delay (ms) between requests",
|
||||||
|
"paramRequired": false
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"paramName": "rtd",
|
||||||
|
"paramLongName": "retryDelay",
|
||||||
|
"paramDescription": "the delay (ms) between retries",
|
||||||
|
"paramRequired": false
|
||||||
|
}
|
||||||
|
]
|
|
@ -6,7 +6,7 @@
|
||||||
"paramRequired": false
|
"paramRequired": false
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
"paramName": "ip",
|
"paramName": "scr",
|
||||||
"paramLongName": "softwareCodeRepositoryURLs",
|
"paramLongName": "softwareCodeRepositoryURLs",
|
||||||
"paramDescription": "the URL where to store software repository URLs",
|
"paramDescription": "the URL where to store software repository URLs",
|
||||||
"paramRequired": true
|
"paramRequired": true
|
|
@ -1,25 +1,11 @@
|
||||||
# hive
|
# hive
|
||||||
hiveDbName=openaire_prod_20230914
|
hiveDbName=openaire_prod_20230914
|
||||||
hiveMetastoreUris=thrift://iis-cdh5-test-m3.ocean.icm.edu.pl:9083
|
|
||||||
|
|
||||||
# oozie
|
|
||||||
oozie.action.sharelib.for.spark=spark2
|
|
||||||
oozie.use.system.libpath=true
|
|
||||||
oozie.wf.application.path=${oozieTopWfApplicationPath}
|
|
||||||
oozie.wf.application.path=${oozieTopWfApplicationPath}
|
|
||||||
oozieActionShareLibForSpark2=spark2
|
|
||||||
|
|
||||||
# spark
|
|
||||||
spark2EventLogDir=/user/spark/spark2ApplicationHistory
|
|
||||||
spark2ExtraListeners=com.cloudera.spark.lineage.NavigatorAppListener
|
|
||||||
spark2SqlQueryExecutionListeners=com.cloudera.spark.lineage.NavigatorQueryListener
|
|
||||||
spark2YarnHistoryServerAddress=http://iis-cdh5-test-gw.ocean.icm.edu.pl:18089
|
|
||||||
sparkSqlWarehouseDir=/user/hive/warehouse
|
sparkSqlWarehouseDir=/user/hive/warehouse
|
||||||
|
|
||||||
# misc
|
# input/output files
|
||||||
wfAppPath=${oozieTopWfApplicationPath}
|
softwareCodeRepositoryURLs=${workingDir}/1_code_repo_urls.csv
|
||||||
resourceManager=http://iis-cdh5-test-m2.ocean.icm.edu.pl:8088/cluster
|
lastVisitsPath=${workingDir}/2_last_visits.seq
|
||||||
|
archiveRequestsPath=${workingDir}/3_archive_requests.seq
|
||||||
|
|
||||||
# custom params
|
|
||||||
softwareCodeRepositoryURLs=${workingDir}/code_repo_urls.csv
|
|
||||||
resume=collect-software-repository-urls
|
resume=collect-software-repository-urls
|
||||||
|
|
|
@ -0,0 +1,50 @@
|
||||||
|
<configuration>
|
||||||
|
<property>
|
||||||
|
<name>jobTracker</name>
|
||||||
|
<value>yarnRM</value>
|
||||||
|
</property>
|
||||||
|
<property>
|
||||||
|
<name>nameNode</name>
|
||||||
|
<value>hdfs://nameservice1</value>
|
||||||
|
</property>
|
||||||
|
<property>
|
||||||
|
<name>oozie.use.system.libpath</name>
|
||||||
|
<value>true</value>
|
||||||
|
</property>
|
||||||
|
<property>
|
||||||
|
<name>oozie.action.sharelib.for.spark</name>
|
||||||
|
<value>spark2</value>
|
||||||
|
</property>
|
||||||
|
<property>
|
||||||
|
<name>hiveMetastoreUris</name>
|
||||||
|
<value>thrift://iis-cdh5-test-m3.ocean.icm.edu.pl:9083</value>
|
||||||
|
</property>
|
||||||
|
<property>
|
||||||
|
<name>spark2YarnHistoryServerAddress</name>
|
||||||
|
<value>http://iis-cdh5-test-gw.ocean.icm.edu.pl:18089</value>
|
||||||
|
</property>
|
||||||
|
<property>
|
||||||
|
<name>spark2EventLogDir</name>
|
||||||
|
<value>/user/spark/spark2ApplicationHistory</value>
|
||||||
|
</property>
|
||||||
|
<property>
|
||||||
|
<name>spark2ExtraListeners</name>
|
||||||
|
<value>"com.cloudera.spark.lineage.NavigatorAppListener"</value>
|
||||||
|
</property>
|
||||||
|
<property>
|
||||||
|
<name>spark2SqlQueryExecutionListeners</name>
|
||||||
|
<value>"com.cloudera.spark.lineage.NavigatorQueryListener"</value>
|
||||||
|
</property>
|
||||||
|
<property>
|
||||||
|
<name>oozieActionShareLibForSpark2</name>
|
||||||
|
<value>spark2</value>
|
||||||
|
</property>
|
||||||
|
<property>
|
||||||
|
<name>resourceManager</name>
|
||||||
|
<value>http://iis-cdh5-test-m2.ocean.icm.edu.pl:8088/cluster</value>
|
||||||
|
</property>
|
||||||
|
<property>
|
||||||
|
<name>oozie.launcher.mapreduce.user.classpath.first</name>
|
||||||
|
<value>true</value>
|
||||||
|
</property>
|
||||||
|
</configuration>
|
|
@ -1,59 +1,31 @@
|
||||||
<workflow-app name="Software-Heritage-Integration-Workflow" xmlns="uri:oozie:workflow:0.5">
|
<workflow-app name="Software-Heritage-Integration-Workflow" xmlns="uri:oozie:workflow:0.5">
|
||||||
<!-- <parameters>-->
|
|
||||||
<!-- <property>-->
|
|
||||||
<!-- <name>apiDescription</name>-->
|
|
||||||
<!-- <description>A json encoding of the API Description class</description>-->
|
|
||||||
<!-- </property>-->
|
|
||||||
<!-- <property>-->
|
|
||||||
<!-- <name>dataSourceInfo</name>-->
|
|
||||||
<!-- <description>A json encoding of the Datasource Info</description>-->
|
|
||||||
<!-- </property>-->
|
|
||||||
<!-- <property>-->
|
|
||||||
<!-- <name>identifierPath</name>-->
|
|
||||||
<!-- <description>An xpath to retrieve the metadata identifier for the generation of DNet Identifier </description>-->
|
|
||||||
<!-- </property>-->
|
|
||||||
<!-- <property>-->
|
|
||||||
<!-- <name>metadataEncoding</name>-->
|
|
||||||
<!-- <description> The type of the metadata XML/JSON</description>-->
|
|
||||||
<!-- </property>-->
|
|
||||||
<!-- <property>-->
|
|
||||||
<!-- <name>timestamp</name>-->
|
|
||||||
<!-- <description>The timestamp of the collection date</description>-->
|
|
||||||
<!-- </property>-->
|
|
||||||
<!-- <property>-->
|
|
||||||
<!-- <name>workflowId</name>-->
|
|
||||||
<!-- <description>The identifier of the workflow</description>-->
|
|
||||||
<!-- </property>-->
|
|
||||||
<!-- <property>-->
|
|
||||||
<!-- <name>mdStoreID</name>-->
|
|
||||||
<!-- <description>The identifier of the mdStore</description>-->
|
|
||||||
<!-- </property>-->
|
|
||||||
<!-- <property>-->
|
|
||||||
<!-- <name>mdStoreManagerURI</name>-->
|
|
||||||
<!-- <description>The URI of the MDStore Manager</description>-->
|
|
||||||
<!-- </property>-->
|
|
||||||
|
|
||||||
<!-- <property>-->
|
<!-- Custom parameters -->
|
||||||
<!-- <name>dnetMessageManagerURL</name>-->
|
<parameters>
|
||||||
<!-- <description>The URI of the Dnet Message Manager</description>-->
|
<property>
|
||||||
<!-- </property>-->
|
<name>hiveDbName</name>
|
||||||
<!-- <property>-->
|
<description>The name of the Hive DB to be used</description>
|
||||||
<!-- <name>collectionMode</name>-->
|
</property>
|
||||||
<!-- <description>Should be REFRESH or INCREMENTAL</description>-->
|
<property>
|
||||||
<!-- </property>-->
|
<name>softwareCodeRepositoryURLs</name>
|
||||||
|
<description>The path in the HDSF to save the software repository URLs</description>
|
||||||
<!-- <property>-->
|
</property>
|
||||||
<!-- <name>collection_java_xmx</name>-->
|
<property>
|
||||||
<!-- <value>-Xmx200m</value>-->
|
<name>resume</name>
|
||||||
<!-- <description>Used to configure the heap size for the map JVM process. Should be 80% of mapreduce.map.memory.mb.</description>-->
|
<description>Variable that indicates the step to start from</description>
|
||||||
<!-- </property>-->
|
</property>
|
||||||
|
</parameters>
|
||||||
|
|
||||||
<!-- </parameters>-->
|
|
||||||
|
|
||||||
|
<!-- Global variables -->
|
||||||
<global>
|
<global>
|
||||||
<job-tracker>${jobTracker}</job-tracker>
|
<job-tracker>${jobTracker}</job-tracker>
|
||||||
<name-node>${nameNode}</name-node>
|
<name-node>${nameNode}</name-node>
|
||||||
|
<configuration>
|
||||||
|
<property>
|
||||||
|
<name>oozie.action.sharelib.for.spark</name>
|
||||||
|
<value>${oozieActionShareLibForSpark2}</value>
|
||||||
|
</property>
|
||||||
|
</configuration>
|
||||||
</global>
|
</global>
|
||||||
|
|
||||||
<start to="startFrom"/>
|
<start to="startFrom"/>
|
||||||
|
@ -90,8 +62,43 @@
|
||||||
<arg>--softwareCodeRepositoryURLs</arg><arg>${softwareCodeRepositoryURLs}</arg>
|
<arg>--softwareCodeRepositoryURLs</arg><arg>${softwareCodeRepositoryURLs}</arg>
|
||||||
<arg>--hiveDbName</arg><arg>${hiveDbName}</arg>
|
<arg>--hiveDbName</arg><arg>${hiveDbName}</arg>
|
||||||
<arg>--hiveMetastoreUris</arg><arg>${hiveMetastoreUris}</arg>
|
<arg>--hiveMetastoreUris</arg><arg>${hiveMetastoreUris}</arg>
|
||||||
|
|
||||||
</spark>
|
</spark>
|
||||||
|
<ok to="collect-repository-last-visit-data"/>
|
||||||
|
<error to="Kill"/>
|
||||||
|
</action>
|
||||||
|
|
||||||
|
<action name="collect-repository-last-visit-data">
|
||||||
|
<java>
|
||||||
|
<main-class>eu.dnetlib.dhp.swh.CollectLastVisitRepositoryData</main-class>
|
||||||
|
|
||||||
|
<arg>--namenode</arg><arg>${nameNode}</arg>
|
||||||
|
<arg>--softwareCodeRepositoryURLs</arg><arg>${softwareCodeRepositoryURLs}</arg>
|
||||||
|
<arg>--lastVisitsPath</arg><arg>${lastVisitsPath}</arg>
|
||||||
|
|
||||||
|
<arg>--maxNumberOfRetry</arg><arg>2</arg>
|
||||||
|
<arg>--requestDelay</arg><arg>0</arg>
|
||||||
|
<arg>--retryDelay</arg><arg>1</arg>
|
||||||
|
<arg>--requestMethod</arg><arg>GET</arg>
|
||||||
|
|
||||||
|
</java>
|
||||||
|
<ok to="archive-repository-urls"/>
|
||||||
|
<error to="Kill"/>
|
||||||
|
</action>
|
||||||
|
|
||||||
|
<action name="archive-repository-urls">
|
||||||
|
<java>
|
||||||
|
<main-class>eu.dnetlib.dhp.swh.ArchiveRepositoryURLs</main-class>
|
||||||
|
|
||||||
|
<arg>--namenode</arg><arg>${nameNode}</arg>
|
||||||
|
<arg>--lastVisitsPath</arg><arg>${lastVisitsPath}</arg>
|
||||||
|
<arg>--archiveThresholdInDays</arg><arg>365</arg>
|
||||||
|
|
||||||
|
<arg>--maxNumberOfRetry</arg><arg>2</arg>
|
||||||
|
<arg>--requestDelay</arg><arg>0</arg>
|
||||||
|
<arg>--retryDelay</arg><arg>1</arg>
|
||||||
|
<arg>--requestMethod</arg><arg>POST</arg>
|
||||||
|
|
||||||
|
</java>
|
||||||
<ok to="End"/>
|
<ok to="End"/>
|
||||||
<error to="Kill"/>
|
<error to="Kill"/>
|
||||||
</action>
|
</action>
|
||||||
|
|
|
@ -0,0 +1,35 @@
|
||||||
|
package eu.dnetlib.dhp.swh;
|
||||||
|
|
||||||
|
import eu.dnetlib.dhp.swh.utils.SWHUtils;
|
||||||
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
|
import org.junit.jupiter.api.Test;
|
||||||
|
|
||||||
|
import java.io.BufferedReader;
|
||||||
|
import java.io.File;
|
||||||
|
import java.io.FileReader;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.Arrays;
|
||||||
|
|
||||||
|
public class ArchiveRepositoryURLsTest {
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void testArchive() throws IOException {
|
||||||
|
String inputPath = getClass()
|
||||||
|
.getResource("/eu/dnetlib/dhp/swh/lastVisitDataToArchive.csv")
|
||||||
|
.getPath();
|
||||||
|
|
||||||
|
File file = new File(inputPath);
|
||||||
|
FileReader fr = new FileReader(file);
|
||||||
|
BufferedReader br = new BufferedReader(fr); //creates a buffering character input stream
|
||||||
|
|
||||||
|
String line;
|
||||||
|
while((line = br.readLine()) != null) {
|
||||||
|
String[] tokens = line.split("\t");
|
||||||
|
|
||||||
|
String response = ArchiveRepositoryURLs.handleRecord(tokens[0], tokens[1], 365);
|
||||||
|
System.out.println(tokens[0] + "\t" + response);
|
||||||
|
System.out.println();
|
||||||
|
}
|
||||||
|
fr.close();
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,57 @@
|
||||||
|
|
||||||
|
package eu.dnetlib.dhp.swh;
|
||||||
|
|
||||||
|
import eu.dnetlib.dhp.common.collection.CollectorException;
|
||||||
|
import eu.dnetlib.dhp.common.collection.HttpClientParams;
|
||||||
|
import eu.dnetlib.dhp.swh.utils.SWHConnection;
|
||||||
|
import eu.dnetlib.dhp.swh.utils.SWHConstants;
|
||||||
|
import org.junit.jupiter.api.Test;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.net.MalformedURLException;
|
||||||
|
import java.net.URL;
|
||||||
|
|
||||||
|
//import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||||
|
|
||||||
|
public class SWHConnectionTest {
|
||||||
|
private static final Logger log = LoggerFactory.getLogger(SWHConnectionTest.class);
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void testGetCall() throws IOException {
|
||||||
|
|
||||||
|
HttpClientParams clientParams = new HttpClientParams();
|
||||||
|
clientParams.setRequestMethod("GET");
|
||||||
|
|
||||||
|
SWHConnection swhConnection = new SWHConnection(clientParams);
|
||||||
|
|
||||||
|
String repoUrl = "https://github.com/stanford-futuredata/FAST";
|
||||||
|
URL url = new URL(String.format(SWHConstants.SWH_LATEST_VISIT_URL, repoUrl));
|
||||||
|
String response = null;
|
||||||
|
try {
|
||||||
|
response = swhConnection.call(url.toString());
|
||||||
|
} catch (CollectorException e) {
|
||||||
|
System.out.println("Error in request: " + url);
|
||||||
|
}
|
||||||
|
System.out.println(response);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void testPostCall() throws MalformedURLException {
|
||||||
|
HttpClientParams clientParams = new HttpClientParams();
|
||||||
|
clientParams.setRequestMethod("POST");
|
||||||
|
|
||||||
|
SWHConnection swhConnection = new SWHConnection(clientParams);
|
||||||
|
|
||||||
|
String repoUrl = "https://github.com/stanford-futuredata/FAST";
|
||||||
|
URL url = new URL(String.format(SWHConstants.SWH_ARCHIVE_URL, SWHConstants.DEFAULT_VISIT_TYPE, repoUrl));
|
||||||
|
String response = null;
|
||||||
|
try {
|
||||||
|
response = swhConnection.call(url.toString());
|
||||||
|
} catch (CollectorException e) {
|
||||||
|
System.out.println("Error in request: " + url);
|
||||||
|
}
|
||||||
|
System.out.println(response);
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,6 @@
|
||||||
|
https://github.com/bioinsilico/BIPSPI {"origin":"https://github.com/bioinsilico/BIPSPI","visit":1,"date":"2020-03-18T14:50:21.541822+00:00","status":"full","snapshot":"c6c69d2cd73ce89811448da5f031611df6f63bdb","type":"git","metadata":{},"origin_url":"https://archive.softwareheritage.org/api/1/origin/https://github.com/bioinsilico/BIPSPI/get/","snapshot_url":"https://archive.softwareheritage.org/api/1/snapshot/c6c69d2cd73ce89811448da5f031611df6f63bdb/"}
|
||||||
|
https://github.com/mloop/kdiff-type1-error-rate/blob/master/analysis/simulation.R {}
|
||||||
|
https://github.com/schwanbeck/YSMR {"origin":"https://github.com/schwanbeck/YSMR","visit":6,"date":"2023-08-02T15:25:02.650676+00:00","status":"full","snapshot":"a9d1c5f0bca2def198b89f65bc9f7da3be8439ed","type":"git","metadata":{},"origin_url":"https://archive.softwareheritage.org/api/1/origin/https://github.com/schwanbeck/YSMR/get/","snapshot_url":"https://archive.softwareheritage.org/api/1/snapshot/a9d1c5f0bca2def198b89f65bc9f7da3be8439ed/"}
|
||||||
|
https://github.com/lvclark/TASSELGBS_combine {"origin":"https://github.com/lvclark/TASSELGBS_combine","visit":1,"date":"2020-04-12T20:44:09.405589+00:00","status":"full","snapshot":"ffa6fefd3f5becefbea9fe0e6d5d93859c95c071","type":"git","metadata":{},"origin_url":"https://archive.softwareheritage.org/api/1/origin/https://github.com/lvclark/TASSELGBS_combine/get/","snapshot_url":"https://archive.softwareheritage.org/api/1/snapshot/ffa6fefd3f5becefbea9fe0e6d5d93859c95c071/"}
|
||||||
|
https://github.com/PRIDE-Toolsuite/inspector-example-files {"origin":"https://github.com/PRIDE-Toolsuite/inspector-example-files","visit":12,"date":"2021-01-25T08:54:13.394674+00:00","status":"full","snapshot":"0b56eb0ad07cf778df6dabefc4b73636e0ae8b37","type":"git","metadata":{},"origin_url":"https://archive.softwareheritage.org/api/1/origin/https://github.com/PRIDE-Toolsuite/inspector-example-files/get/","snapshot_url":"https://archive.softwareheritage.org/api/1/snapshot/0b56eb0ad07cf778df6dabefc4b73636e0ae8b37/"}
|
||||||
|
https://bitbucket.org/matwey/chelyabinsk {"origin":"https://bitbucket.org/matwey/chelyabinsk","visit":6,"date":"2021-09-24T19:32:43.322909+00:00","status":"full","snapshot":"215913858c3ee0e61e1aaea18241c5ee006da1b0","type":"hg","metadata":{},"origin_url":"https://archive.softwareheritage.org/api/1/origin/https://bitbucket.org/matwey/chelyabinsk/get/","snapshot_url":"https://archive.softwareheritage.org/api/1/snapshot/215913858c3ee0e61e1aaea18241c5ee006da1b0/"}
|
Can't render this file because it contains an unexpected character in line 1 and column 40.
|
Loading…
Reference in New Issue