Add action for creating actionsets

This commit is contained in:
Serafeim Chatzopoulos 2023-10-02 23:50:38 +03:00
parent ab0d70691c
commit 839a8524e7
13 changed files with 362 additions and 112 deletions

View File

@ -1,7 +1,6 @@
package eu.dnetlib.dhp.swh;
import static eu.dnetlib.dhp.common.Constants.REQUEST_METHOD;
import static eu.dnetlib.dhp.utils.DHPUtils.getHadoopConfiguration;
import java.io.IOException;
@ -116,8 +115,8 @@ public class ArchiveRepositoryURLs {
// a previous attempt for archival has been made, and repository URL was not found
// avoid performing the same archive request again
if (lastVisit.getType() != null &&
lastVisit.getType().equals(SWHConstants.VISIT_STATUS_NOT_FOUND)) {
if (lastVisit.getStatus() != null &&
lastVisit.getStatus().equals(SWHConstants.VISIT_STATUS_NOT_FOUND)) {
log.info("Avoid request -- previous archive request returned NOT_FOUND");
return null;

View File

@ -40,7 +40,7 @@ public class CollectLastVisitRepositoryData {
private static SWHConnection swhConnection = null;
public static void main(final String[] args)
throws IOException, ParseException, InterruptedException, URISyntaxException, CollectorException {
throws IOException, ParseException {
final ArgumentApplicationParser argumentParser = new ArgumentApplicationParser(
IOUtils
.toString(

View File

@ -3,7 +3,6 @@ package eu.dnetlib.dhp.swh;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkHiveSession;
import java.io.Serializable;
import java.util.Optional;
import org.apache.commons.io.IOUtils;
@ -23,7 +22,7 @@ import eu.dnetlib.dhp.schema.oaf.Result;
*
* @author Serafeim Chatzopoulos
*/
public class CollectSoftwareRepositoryURLs implements Serializable {
public class CollectSoftwareRepositoryURLs {
private static final Logger log = LoggerFactory.getLogger(CollectSoftwareRepositoryURLs.class);
@ -44,10 +43,10 @@ public class CollectSoftwareRepositoryURLs implements Serializable {
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
final String hiveDbName = parser.get("hiveDbName");
log.info("hiveDbName {}: ", hiveDbName);
log.info("hiveDbName: {}", hiveDbName);
final String outputPath = parser.get("softwareCodeRepositoryURLs");
log.info("softwareCodeRepositoryURLs {}: ", outputPath);
log.info("softwareCodeRepositoryURLs: {}", outputPath);
final String hiveMetastoreUris = parser.get("hiveMetastoreUris");
log.info("hiveMetastoreUris: {}", hiveMetastoreUris);
@ -70,7 +69,7 @@ public class CollectSoftwareRepositoryURLs implements Serializable {
"WHERE coderepositoryurl.value IS NOT NULL " +
"AND datainfo.deletedbyinference = FALSE " +
"AND datainfo.invisible = FALSE " +
"LIMIT 1000";
"LIMIT 5000";
String query = String.format(queryTemplate, hiveDbName);
log.info("Hive query to fetch software code URLs: {}", query);

View File

@ -0,0 +1,177 @@
package eu.dnetlib.dhp.swh;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import static org.apache.spark.sql.functions.col;
import java.io.Serializable;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.SequenceFileOutputFormat;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.*;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.schema.action.AtomicAction;
import eu.dnetlib.dhp.schema.common.ModelConstants;
import eu.dnetlib.dhp.schema.oaf.*;
import eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils;
import eu.dnetlib.dhp.swh.models.LastVisitData;
import eu.dnetlib.dhp.swh.utils.SWHConstants;
import scala.Tuple2;
/**
* Creates action sets for Software Heritage data
*
* @author Serafeim Chatzopoulos
*/
public class PrepareSWHActionsets {
private static final Logger log = LoggerFactory.getLogger(PrepareSWHActionsets.class);
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
public static <I extends Result> void main(String[] args) throws Exception {
String jsonConfiguration = IOUtils
.toString(
PrepareSWHActionsets.class
.getResourceAsStream(
"/eu/dnetlib/dhp/swh/input_prepare_swh_actionsets.json"));
final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
parser.parseArgument(args);
final Boolean isSparkSessionManaged = Optional
.ofNullable(parser.get("isSparkSessionManaged"))
.map(Boolean::valueOf)
.orElse(Boolean.TRUE);
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
final String inputPath = parser.get("lastVisitsPath");
log.info("inputPath: {}", inputPath);
final String softwareInputPath = parser.get("softwareInputPath");
log.info("softwareInputPath: {}", softwareInputPath);
final String outputPath = parser.get("actionsetsPath");
log.info("outputPath: {}", outputPath);
SparkConf conf = new SparkConf();
runWithSparkSession(
conf,
isSparkSessionManaged,
spark -> {
JavaPairRDD<Text, Text> softwareRDD = prepareActionsets(spark, inputPath, softwareInputPath);
softwareRDD
.saveAsHadoopFile(
outputPath, Text.class, Text.class, SequenceFileOutputFormat.class);
// , GzipCodec.class);
});
}
private static Dataset<Row> loadSWHData(SparkSession spark, String inputPath) {
JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
// read from file and transform to <origin, snapshotId> tuples
// Note: snapshot id is the SWH id for us
JavaRDD<Row> swhRDD = sc
.sequenceFile(inputPath, Text.class, Text.class)
.map(t -> t._2().toString())
.map(t -> OBJECT_MAPPER.readValue(t, LastVisitData.class))
.filter(t -> t.getOrigin() != null && t.getSnapshot() != null) // response from SWH API is empty if repo URL
// was not found
.map(item -> RowFactory.create(item.getOrigin(), item.getSnapshot()));
// convert RDD to 2-column DF
List<StructField> fields = Arrays
.asList(
DataTypes.createStructField("repoUrl", DataTypes.StringType, true),
DataTypes.createStructField("swhId", DataTypes.StringType, true));
StructType schema = DataTypes.createStructType(fields);
return spark.createDataFrame(swhRDD, schema);
}
private static Dataset<Row> loadGraphSoftwareData(SparkSession spark, String softwareInputPath) {
return spark
.read()
.textFile(softwareInputPath)
.map(
(MapFunction<String, Software>) t -> OBJECT_MAPPER.readValue(t, Software.class),
Encoders.bean(Software.class))
.filter(t -> t.getCodeRepositoryUrl() != null)
.select(col("id"), col("codeRepositoryUrl.value").as("repoUrl"));
}
private static <I extends Software> JavaPairRDD<Text, Text> prepareActionsets(SparkSession spark, String inputPath,
String softwareInputPath) {
Dataset<Row> swhDF = loadSWHData(spark, inputPath);
// swhDF.show(false);
Dataset<Row> graphSoftwareDF = loadGraphSoftwareData(spark, softwareInputPath);
// graphSoftwareDF.show(5);
Dataset<Row> joinedDF = graphSoftwareDF.join(swhDF, "repoUrl").select("id", "swhid");
// joinedDF.show(false);
return joinedDF.map((MapFunction<Row, Software>) row -> {
Software s = new Software();
// set openaire id
s.setId(row.getString(row.fieldIndex("id")));
// set swh id
Qualifier qualifier = OafMapperUtils
.qualifier(
SWHConstants.SWHID,
SWHConstants.SWHID_CLASSNAME,
ModelConstants.DNET_PID_TYPES,
ModelConstants.DNET_PID_TYPES);
DataInfo dataInfo = OafMapperUtils
.dataInfo(
false,
null,
false,
false,
ModelConstants.PROVENANCE_ACTION_SET_QUALIFIER,
"");
s
.setPid(
Arrays
.asList(
OafMapperUtils
.structuredProperty(
row.getString(row.fieldIndex("swhid")),
qualifier,
dataInfo)));
return s;
}, Encoders.bean(Software.class))
.toJavaRDD()
.map(p -> new AtomicAction(Software.class, p))
.mapToPair(
aa -> new Tuple2<>(new Text(aa.getClazz().getCanonicalName()),
new Text(OBJECT_MAPPER.writeValueAsString(aa))));
}
}

View File

@ -1,15 +1,15 @@
package eu.dnetlib.dhp.swh.models;
import java.util.Date;
import java.io.Serializable;
import com.cloudera.com.fasterxml.jackson.annotation.JsonFormat;
import com.cloudera.com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
@JsonIgnoreProperties(ignoreUnknown = true)
public class LastVisitData {
public class LastVisitData implements Serializable {
private String origin;
private String type;
private String date;
@ -49,4 +49,23 @@ public class LastVisitData {
public void setStatus(String status) {
this.status = status;
}
public String getOrigin() {
return origin;
}
public void setOrigin(String origin) {
this.origin = origin;
}
@Override
public String toString() {
return "LastVisitData{" +
"origin='" + origin + '\'' +
", type='" + type + '\'' +
", date='" + date + '\'' +
", snapshotId='" + snapshotId + '\'' +
", status='" + status + '\'' +
'}';
}
}

View File

@ -1,50 +1,21 @@
package eu.dnetlib.dhp.swh.utils;
import java.io.IOException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.commons.lang3.math.NumberUtils;
import org.apache.http.Header;
import org.apache.http.HttpHeaders;
import org.apache.http.HttpStatus;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.util.EntityUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import eu.dnetlib.dhp.common.Constants;
import eu.dnetlib.dhp.common.collection.CollectorException;
import eu.dnetlib.dhp.common.collection.HttpClientParams;
import eu.dnetlib.dhp.common.collection.HttpConnector2;
public class SWHConnection {
private static final Logger log = LoggerFactory.getLogger(SWHConnection.class);
CloseableHttpClient httpClient;
HttpClientParams clientParams;
HttpConnector2 conn;
public SWHConnection(HttpClientParams clientParams) {
// // force http client to NOT transform double quotes (//) to single quote (/)
// RequestConfig requestConfig = RequestConfig.custom().setNormalizeUri(false).build();
//
// // Create an HttpClient instance
// httpClient = HttpClientBuilder
// .create()
// .setDefaultRequestConfig(requestConfig)
// .build();
//
// this.clientParams = clientParams;
// set custom headers
Map<String, String> headers = new HashMap<String, String>() {
{
@ -64,75 +35,4 @@ public class SWHConnection {
return conn.getInputSource(url);
}
public String getLib(String url) throws IOException, CollectorException {
// delay between requests
if (this.clientParams.getRequestDelay() > 0) {
log.info("Request delay: {}", this.clientParams.getRequestDelay());
this.backOff(this.clientParams.getRequestDelay());
}
// Create an HttpGet request with the URL
HttpGet httpGet = new HttpGet(url);
httpGet.setHeader("Accept", "application/json");
httpGet.setHeader("Authorization", String.format("Bearer %s", SWHConstants.ACCESS_TOKEN));
// Execute the request and get the response
try (CloseableHttpResponse response = httpClient.execute(httpGet)) {
System.out.println(url);
int responseCode = response.getStatusLine().getStatusCode();
if (responseCode != HttpStatus.SC_OK) {
}
System.out.println(responseCode);
List<Header> httpHeaders = Arrays.asList(response.getAllHeaders());
for (Header header : httpHeaders) {
System.out.println(header.getName() + ":\t" + header.getValue());
}
String rateRemaining = this.getRateRemaining(response);
// back off when rate remaining limit is approaching
if (rateRemaining != null && (Integer.parseInt(rateRemaining) < 2)) {
int retryAfter = this.getRetryAfter(response);
log.info("Rate Limit: {} - Backing off: {}", rateRemaining, retryAfter);
this.backOff(retryAfter);
}
return EntityUtils.toString(response.getEntity());
}
}
private String getRateRemaining(CloseableHttpResponse response) {
Header header = response.getFirstHeader(Constants.HTTPHEADER_IETF_DRAFT_RATELIMIT_REMAINING);
if (header != null) {
return header.getValue();
}
return null;
}
private int getRetryAfter(CloseableHttpResponse response) {
Header header = response.getFirstHeader(HttpHeaders.RETRY_AFTER);
if (header != null) {
String retryAfter = header.getValue();
if (NumberUtils.isCreatable(retryAfter)) {
return Integer.parseInt(retryAfter) + 10;
}
}
return 1000;
}
private void backOff(int sleepTimeMs) throws CollectorException {
try {
Thread.sleep(sleepTimeMs);
} catch (InterruptedException e) {
throw new CollectorException(e);
}
}
}

View File

@ -12,4 +12,8 @@ public class SWHConstants {
public static final String VISIT_STATUS_NOT_FOUND = "not_found";
public static final String SWHID = "swhid";
public static final String SWHID_CLASSNAME = "Software Heritage Identifier";
}

View File

@ -0,0 +1,26 @@
[
{
"paramName": "issm",
"paramLongName": "isSparkSessionManaged",
"paramDescription": "when true will stop SparkSession after job execution",
"paramRequired": false
},
{
"paramName": "lv",
"paramLongName": "lastVisitsPath",
"paramDescription": "the URL where to store last visits data",
"paramRequired": true
},
{
"paramName": "ap",
"paramLongName": "actionsetsPath",
"paramDescription": "the URL path where to store actionsets",
"paramRequired": true
},
{
"paramName": "sip",
"paramLongName": "softwareInputPath",
"paramDescription": "the URL path of the software in the graph",
"paramRequired": true
}
]

View File

@ -7,6 +7,8 @@ sparkSqlWarehouseDir=/user/hive/warehouse
softwareCodeRepositoryURLs=${workingDir}/1_code_repo_urls.csv
lastVisitsPath=${workingDir}/2_last_visits.seq
archiveRequestsPath=${workingDir}/3_archive_requests.seq
actionsetsPath=${workingDir}/4_actionsets
graphPath=/tmp/prod_provision/graph/18_graph_blacklisted
maxNumberOfRetry=2
retryDelay=1

View File

@ -57,6 +57,7 @@
<decision name="startFrom">
<switch>
<case to="collect-software-repository-urls">${wf:conf('startFrom') eq 'collect-software-repository-urls'}</case>
<case to="create-swh-actionsets">${wf:conf('startFrom') eq 'create-swh-actionsets'}</case>
<default to="collect-software-repository-urls"/>
</switch>
</decision>
@ -120,6 +121,32 @@
<arg>--requestMethod</arg><arg>POST</arg>
</java>
<ok to="create-swh-actionsets"/>
<error to="Kill"/>
</action>
<action name="create-swh-actionsets">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Create actionsets for SWH data</name>
<class>eu.dnetlib.dhp.swh.PrepareSWHActionsets</class>
<jar>dhp-swh-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
</spark-opts>
<arg>--lastVisitsPath</arg><arg>${lastVisitsPath}</arg>
<arg>--actionsetsPath</arg><arg>${actionsetsPath}</arg>
<arg>--softwareInputPath</arg><arg>${graphPath}/software</arg>
</spark>
<ok to="End"/>
<error to="Kill"/>
</action>

View File

@ -0,0 +1,97 @@
package eu.dnetlib.dhp.swh;
import static org.junit.jupiter.api.Assertions.assertEquals;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.io.Text;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.schema.action.AtomicAction;
import eu.dnetlib.dhp.schema.common.ModelConstants;
import eu.dnetlib.dhp.schema.oaf.Relation;
import eu.dnetlib.dhp.schema.oaf.utils.CleaningFunctions;
import eu.dnetlib.dhp.schema.oaf.utils.IdentifierFactory;
public class PrepareSWHActionsetsTest {
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
private static SparkSession spark;
private static Path workingDir;
private static final Logger log = LoggerFactory
.getLogger(PrepareSWHActionsetsTest.class);
@BeforeAll
public static void beforeAll() throws IOException {
workingDir = Files.createTempDirectory(PrepareSWHActionsetsTest.class.getSimpleName());
log.info("Using work dir {}", workingDir);
SparkConf conf = new SparkConf();
conf.setAppName(PrepareSWHActionsetsTest.class.getSimpleName());
conf.setMaster("local[*]");
conf.set("spark.driver.host", "localhost");
conf.set("hive.metastore.local", "true");
conf.set("spark.ui.enabled", "false");
conf.set("spark.sql.warehouse.dir", workingDir.toString());
conf.set("hive.metastore.warehouse.dir", workingDir.resolve("warehouse").toString());
spark = SparkSession
.builder()
.appName(PrepareSWHActionsetsTest.class.getSimpleName())
.config(conf)
.getOrCreate();
}
@AfterAll
public static void afterAll() throws IOException {
FileUtils.deleteDirectory(workingDir.toFile());
spark.stop();
}
@Test
void testRun() throws Exception {
String lastVisitsPath = getClass()
.getResource("/eu/dnetlib/dhp/swh/last_visits_data.seq")
.getPath();
String outputPath = workingDir.toString() + "/actionSet";
String softwareInputPath = getClass()
.getResource("/eu/dnetlib/dhp/swh/software.json.gz")
.getPath();
PrepareSWHActionsets
.main(
new String[] {
"-isSparkSessionManaged", Boolean.FALSE.toString(),
"-lastVisitsPath", lastVisitsPath,
"-softwareInputPath", softwareInputPath,
"-actionsetsPath", outputPath
});
}
}