From 9d44418d381b14ab7a6301f1979b2a3af5efbd2f Mon Sep 17 00:00:00 2001 From: Serafeim Chatzopoulos Date: Thu, 14 Sep 2023 18:43:25 +0300 Subject: [PATCH] Add collecting software code repository URLs --- dhp-workflows/dhp-swh/pom.xml | 104 +++++++++ .../swh/CollectSoftwareRepositoryURLs.java | 211 ++++++++++++++++++ .../dhp/swh/models/LastVisitResponse.java | 40 ++++ .../eu/dnetlib/dhp/swh/input_parameters.json | 26 +++ .../eu/dnetlib/dhp/swh/job.properties | 25 +++ .../eu/dnetlib/dhp/swh/oozie_app/workflow.xml | 101 +++++++++ dhp-workflows/pom.xml | 1 + 7 files changed, 508 insertions(+) create mode 100644 dhp-workflows/dhp-swh/pom.xml create mode 100644 dhp-workflows/dhp-swh/src/main/java/eu/dnetlib/dhp/swh/CollectSoftwareRepositoryURLs.java create mode 100644 dhp-workflows/dhp-swh/src/main/java/eu/dnetlib/dhp/swh/models/LastVisitResponse.java create mode 100644 dhp-workflows/dhp-swh/src/main/resources/eu/dnetlib/dhp/swh/input_parameters.json create mode 100644 dhp-workflows/dhp-swh/src/main/resources/eu/dnetlib/dhp/swh/job.properties create mode 100644 dhp-workflows/dhp-swh/src/main/resources/eu/dnetlib/dhp/swh/oozie_app/workflow.xml diff --git a/dhp-workflows/dhp-swh/pom.xml b/dhp-workflows/dhp-swh/pom.xml new file mode 100644 index 000000000..501b2aef8 --- /dev/null +++ b/dhp-workflows/dhp-swh/pom.xml @@ -0,0 +1,104 @@ + + + 4.0.0 + + eu.dnetlib.dhp + dhp-workflows + 1.2.5-SNAPSHOT + + dhp-swh + + + + org.apache.spark + spark-core_${scala.binary.version} + + + + org.apache.spark + spark-sql_${scala.binary.version} + + + + eu.dnetlib.dhp + dhp-common + ${project.version} + + + net.sf.saxon + Saxon-HE + + + + + + dom4j + dom4j + + + + xml-apis + xml-apis + + + + jaxen + jaxen + + + + org.apache.hadoop + hadoop-distcp + + + + eu.dnetlib + dnet-actionmanager-api + + + eu.dnetlib + dnet-actionmanager-common + + + eu.dnetlib + dnet-openaireplus-mapping-utils + + + saxonica + saxon + + + saxonica + saxon-dom + + + jgrapht + jgrapht + + + net.sf.ehcache + ehcache + + + org.springframework + spring-test + + + org.apache.* + * + + + apache + * + + + + + + org.apache.httpcomponents + httpclient + 4.5.13 + + + + diff --git a/dhp-workflows/dhp-swh/src/main/java/eu/dnetlib/dhp/swh/CollectSoftwareRepositoryURLs.java b/dhp-workflows/dhp-swh/src/main/java/eu/dnetlib/dhp/swh/CollectSoftwareRepositoryURLs.java new file mode 100644 index 000000000..c91f2bb8c --- /dev/null +++ b/dhp-workflows/dhp-swh/src/main/java/eu/dnetlib/dhp/swh/CollectSoftwareRepositoryURLs.java @@ -0,0 +1,211 @@ + +package eu.dnetlib.dhp.swh; + +import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkHiveSession; +import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; + +import java.io.IOException; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.TimeUnit; + +import org.apache.commons.io.IOUtils; +import org.apache.http.Header; +import org.apache.http.HttpEntity; +import org.apache.http.client.methods.CloseableHttpResponse; +import org.apache.http.client.methods.HttpGet; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.impl.client.HttpClients; +import org.apache.http.util.EntityUtils; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.function.FlatMapFunction; +import org.apache.spark.sql.*; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.types.DataTypes; +import org.apache.spark.sql.types.StructType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.fasterxml.jackson.databind.ObjectMapper; + +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.schema.common.ModelConstants; +import eu.dnetlib.dhp.schema.oaf.*; + +/** + * Creates action sets for Crossref affiliation relations inferred by BIP! + */ +public class CollectSoftwareRepositoryURLs implements Serializable { + + private static final Logger log = LoggerFactory.getLogger(CollectSoftwareRepositoryURLs.class); + // public static final String BIP_AFFILIATIONS_CLASSID = "result:organization:bipinference"; +// public static final String BIP_AFFILIATIONS_CLASSNAME = "Affiliation relation inferred by BIP!"; +// public static final String BIP_INFERENCE_PROVENANCE = "bip:affiliation:crossref"; + private static final String DEFAULT_VISIT_TYPE = "git"; + private static final int CONCURRENT_API_CALLS = 1; + + private static final String SWH_LATEST_VISIT_URL = "https://archive.softwareheritage.org/api/1/origin/%s/visit/latest/"; + + public static void main(String[] args) throws Exception { + + String jsonConfiguration = IOUtils + .toString( + CollectSoftwareRepositoryURLs.class + .getResourceAsStream("/eu/dnetlib/dhp/swh/input_parameters.json")); + + final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration); + parser.parseArgument(args); + + final Boolean isSparkSessionManaged = Optional + .ofNullable(parser.get("isSparkSessionManaged")) + .map(Boolean::valueOf) + .orElse(Boolean.TRUE); + log.info("isSparkSessionManaged: {}", isSparkSessionManaged); + + final String hiveDbName = parser.get("hiveDbName"); + log.info("hiveDbName {}: ", hiveDbName); + + final String outputPath = parser.get("softwareCodeRepositoryURLs"); + log.info("softwareCodeRepositoryURLs {}: ", outputPath); + + final String hiveMetastoreUris = parser.get("hiveMetastoreUris"); + log.info("hiveMetastoreUris: {}", hiveMetastoreUris); + + SparkConf conf = new SparkConf(); + conf.set("hive.metastore.uris", hiveMetastoreUris); + + runWithSparkHiveSession( + conf, + isSparkSessionManaged, + spark -> { + doRun(spark, hiveDbName, outputPath); + }); + } + + private static void doRun(SparkSession spark, String hiveDbName, String outputPath) { + + String queryTemplate = "SELECT distinct coderepositoryurl.value " + + "FROM %s.software " + + "WHERE coderepositoryurl.value IS NOT NULL"; + String query = String.format(queryTemplate, hiveDbName); + + log.info("Hive query to fetch software code URLs: {}", query); + + Dataset df = spark.sql(query); + + // write distinct repository URLs + df + .write() + .mode(SaveMode.Overwrite) +// .option("compression", "gzip") + .csv(outputPath); + } + + private static Dataset readSoftware(SparkSession spark, String inputPath) { + return spark + .read() + .json(inputPath) + .select( + new Column("codeRepositoryUrl.value").as("codeRepositoryUrl"), + new Column("dataInfo.deletedbyinference"), + new Column("dataInfo.invisible")); + } + + private static Dataset filterSoftware(Dataset softwareDF, Integer limit) { + + Dataset df = softwareDF + .where(softwareDF.col("codeRepositoryUrl").isNotNull()) + .where("deletedbyinference = false") + .where("invisible = false") + .drop("deletedbyinference") + .drop("invisible"); + +// TODO remove when done + df = df.limit(limit); + + return df; + } + + public static Dataset makeParallelRequests(SparkSession spark, Dataset softwareDF) { + // TODO replace with coalesce ? + Dataset df = softwareDF.repartition(CONCURRENT_API_CALLS); + + log.info("Number of partitions: {}", df.rdd().getNumPartitions()); + + ObjectMapper objectMapper = new ObjectMapper(); + + List collectedRows = df + .javaRDD() + // max parallelism should be equal to the number of partitions here + .mapPartitions((FlatMapFunction, Row>) partition -> { + List resultRows = new ArrayList<>(); + while (partition.hasNext()) { + Row row = partition.next(); + String url = String.format(SWH_LATEST_VISIT_URL, row.getString(0)); + +// String snapshotId = null; +// String type = null; +// String date = null; + + String responseBody = makeAPICall(url); + TimeUnit.SECONDS.sleep(1); +// Thread.sleep(500); +// if (responseBody != null) { +// LastVisitResponse visitResponse = objectMapper.readValue(responseBody, LastVisitResponse.class); +// snapshotId = visitResponse.getSnapshot(); +// type = visitResponse.getType(); +// date = visitResponse.getDate(); +// } +// resultRows.add(RowFactory.create(url, snapshotId, type, date)); + + resultRows.add(RowFactory.create(url, responseBody)); + } + return resultRows.iterator(); + + }) + .collect(); + + StructType resultSchema = new StructType() + .add("codeRepositoryUrl", DataTypes.StringType) + .add("response", DataTypes.StringType); + +// .add("snapshotId", DataTypes.StringType) +// .add("type", DataTypes.StringType) +// .add("date", DataTypes.StringType); + + // create a DataFrame from the collected rows + return spark.createDataFrame(collectedRows, resultSchema); + } + + private static String makeAPICall(String url) throws IOException { + System.out.println(java.time.LocalDateTime.now()); + + try (CloseableHttpClient httpClient = HttpClients.createDefault()) { + HttpGet httpGet = new HttpGet(url); + httpGet + .setHeader( + "Authorization", + "Bearer eyJhbGciOiJIUzI1NiIsInR5cCIgOiAiSldUIiwia2lkIiA6ICJhMTMxYTQ1My1hM2IyLTQwMTUtODQ2Ny05MzAyZjk3MTFkOGEifQ.eyJpYXQiOjE2OTQ2MzYwMjAsImp0aSI6IjkwZjdkNTNjLTQ5YTktNGFiMy1hY2E0LTcwMTViMjEyZTNjNiIsImlzcyI6Imh0dHBzOi8vYXV0aC5zb2Z0d2FyZWhlcml0YWdlLm9yZy9hdXRoL3JlYWxtcy9Tb2Z0d2FyZUhlcml0YWdlIiwiYXVkIjoiaHR0cHM6Ly9hdXRoLnNvZnR3YXJlaGVyaXRhZ2Uub3JnL2F1dGgvcmVhbG1zL1NvZnR3YXJlSGVyaXRhZ2UiLCJzdWIiOiIzMTY5OWZkNC0xNmE0LTQxOWItYTdhMi00NjI5MDY4ZjI3OWEiLCJ0eXAiOiJPZmZsaW5lIiwiYXpwIjoic3doLXdlYiIsInNlc3Npb25fc3RhdGUiOiIzMjYzMzEwMS00ZDRkLTQwMjItODU2NC1iMzNlMTJiNTE3ZDkiLCJzY29wZSI6Im9wZW5pZCBvZmZsaW5lX2FjY2VzcyBwcm9maWxlIGVtYWlsIn0.XHj1VIZu1dZ4Ej32-oU84mFmaox9cLNjXosNxwZM0Xs"); + try (CloseableHttpResponse response = httpClient.execute(httpGet)) { + int statusCode = response.getStatusLine().getStatusCode(); +// if (statusCode != 200) +// return null; + Header[] headers = response.getHeaders("X-RateLimit-Remaining"); + for (Header header : headers) { + System.out + .println( + "Key : " + header.getName() + + " ,Value : " + header.getValue()); + } + HttpEntity entity = response.getEntity(); + if (entity != null) { + return EntityUtils.toString(entity); + } + } + } + return null; + } +} diff --git a/dhp-workflows/dhp-swh/src/main/java/eu/dnetlib/dhp/swh/models/LastVisitResponse.java b/dhp-workflows/dhp-swh/src/main/java/eu/dnetlib/dhp/swh/models/LastVisitResponse.java new file mode 100644 index 000000000..435397590 --- /dev/null +++ b/dhp-workflows/dhp-swh/src/main/java/eu/dnetlib/dhp/swh/models/LastVisitResponse.java @@ -0,0 +1,40 @@ + +package eu.dnetlib.dhp.swh.models; + +import com.cloudera.com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; + +@JsonIgnoreProperties(ignoreUnknown = true) +public class LastVisitResponse { + + private String type; + + private String date; + + @JsonProperty("snapshot") + private String snapshotId; + + public String getType() { + return type; + } + + public void setType(String type) { + this.type = type; + } + + public String getDate() { + return date; + } + + public void setDate(String date) { + this.date = date; + } + + public String getSnapshot() { + return snapshotId; + } + + public void setSnapshot(String snapshotId) { + this.snapshotId = snapshotId; + } +} diff --git a/dhp-workflows/dhp-swh/src/main/resources/eu/dnetlib/dhp/swh/input_parameters.json b/dhp-workflows/dhp-swh/src/main/resources/eu/dnetlib/dhp/swh/input_parameters.json new file mode 100644 index 000000000..dd5432b93 --- /dev/null +++ b/dhp-workflows/dhp-swh/src/main/resources/eu/dnetlib/dhp/swh/input_parameters.json @@ -0,0 +1,26 @@ +[ + { + "paramName": "issm", + "paramLongName": "isSparkSessionManaged", + "paramDescription": "when true will stop SparkSession after job execution", + "paramRequired": false + }, + { + "paramName": "ip", + "paramLongName": "softwareCodeRepositoryURLs", + "paramDescription": "the URL where to store software repository URLs", + "paramRequired": true + }, + { + "paramName": "db", + "paramLongName": "hiveDbName", + "paramDescription": "the target hive database name", + "paramRequired": true + }, + { + "paramName": "hmu", + "paramLongName": "hiveMetastoreUris", + "paramDescription": "the hive metastore uris", + "paramRequired": true + } +] \ No newline at end of file diff --git a/dhp-workflows/dhp-swh/src/main/resources/eu/dnetlib/dhp/swh/job.properties b/dhp-workflows/dhp-swh/src/main/resources/eu/dnetlib/dhp/swh/job.properties new file mode 100644 index 000000000..a63343aed --- /dev/null +++ b/dhp-workflows/dhp-swh/src/main/resources/eu/dnetlib/dhp/swh/job.properties @@ -0,0 +1,25 @@ +# hive +hiveDbName=openaire_prod_20230914 +hiveMetastoreUris=thrift://iis-cdh5-test-m3.ocean.icm.edu.pl:9083 + +# oozie +oozie.action.sharelib.for.spark=spark2 +oozie.use.system.libpath=true +oozie.wf.application.path=${oozieTopWfApplicationPath} +oozie.wf.application.path=${oozieTopWfApplicationPath} +oozieActionShareLibForSpark2=spark2 + +# spark +spark2EventLogDir=/user/spark/spark2ApplicationHistory +spark2ExtraListeners=com.cloudera.spark.lineage.NavigatorAppListener +spark2SqlQueryExecutionListeners=com.cloudera.spark.lineage.NavigatorQueryListener +spark2YarnHistoryServerAddress=http://iis-cdh5-test-gw.ocean.icm.edu.pl:18089 +sparkSqlWarehouseDir=/user/hive/warehouse + +# misc +wfAppPath=${oozieTopWfApplicationPath} +resourceManager=http://iis-cdh5-test-m2.ocean.icm.edu.pl:8088/cluster + +# custom params +softwareCodeRepositoryURLs=${workingDir}/code_repo_urls.csv +resume=collect-software-repository-urls diff --git a/dhp-workflows/dhp-swh/src/main/resources/eu/dnetlib/dhp/swh/oozie_app/workflow.xml b/dhp-workflows/dhp-swh/src/main/resources/eu/dnetlib/dhp/swh/oozie_app/workflow.xml new file mode 100644 index 000000000..9832e5f26 --- /dev/null +++ b/dhp-workflows/dhp-swh/src/main/resources/eu/dnetlib/dhp/swh/oozie_app/workflow.xml @@ -0,0 +1,101 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + ${jobTracker} + ${nameNode} + + + + + + Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] + + + + + ${wf:conf('startFrom') eq 'collect-software-repository-urls'} + + + + + + + yarn + cluster + Collect software repository URLs + eu.dnetlib.dhp.swh.CollectSoftwareRepositoryURLs + dhp-swh-${projectVersion}.jar + + --executor-memory=${sparkExecutorMemory} + --executor-cores=${sparkExecutorCores} + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir} + + + --softwareCodeRepositoryURLs${softwareCodeRepositoryURLs} + --hiveDbName${hiveDbName} + --hiveMetastoreUris${hiveMetastoreUris} + + + + + + + + + \ No newline at end of file diff --git a/dhp-workflows/pom.xml b/dhp-workflows/pom.xml index d054ba39b..64f5f2d26 100644 --- a/dhp-workflows/pom.xml +++ b/dhp-workflows/pom.xml @@ -39,6 +39,7 @@ dhp-broker-events dhp-doiboost dhp-impact-indicators + dhp-swh