diff --git a/dhp-workflows/dhp-swh/pom.xml b/dhp-workflows/dhp-swh/pom.xml
new file mode 100644
index 000000000..501b2aef8
--- /dev/null
+++ b/dhp-workflows/dhp-swh/pom.xml
@@ -0,0 +1,104 @@
+
+
+ 4.0.0
+
+ eu.dnetlib.dhp
+ dhp-workflows
+ 1.2.5-SNAPSHOT
+
+ dhp-swh
+
+
+
+ org.apache.spark
+ spark-core_${scala.binary.version}
+
+
+
+ org.apache.spark
+ spark-sql_${scala.binary.version}
+
+
+
+ eu.dnetlib.dhp
+ dhp-common
+ ${project.version}
+
+
+ net.sf.saxon
+ Saxon-HE
+
+
+
+
+
+ dom4j
+ dom4j
+
+
+
+ xml-apis
+ xml-apis
+
+
+
+ jaxen
+ jaxen
+
+
+
+ org.apache.hadoop
+ hadoop-distcp
+
+
+
+ eu.dnetlib
+ dnet-actionmanager-api
+
+
+ eu.dnetlib
+ dnet-actionmanager-common
+
+
+ eu.dnetlib
+ dnet-openaireplus-mapping-utils
+
+
+ saxonica
+ saxon
+
+
+ saxonica
+ saxon-dom
+
+
+ jgrapht
+ jgrapht
+
+
+ net.sf.ehcache
+ ehcache
+
+
+ org.springframework
+ spring-test
+
+
+ org.apache.*
+ *
+
+
+ apache
+ *
+
+
+
+
+
+ org.apache.httpcomponents
+ httpclient
+ 4.5.13
+
+
+
+
diff --git a/dhp-workflows/dhp-swh/src/main/java/eu/dnetlib/dhp/swh/CollectSoftwareRepositoryURLs.java b/dhp-workflows/dhp-swh/src/main/java/eu/dnetlib/dhp/swh/CollectSoftwareRepositoryURLs.java
new file mode 100644
index 000000000..c91f2bb8c
--- /dev/null
+++ b/dhp-workflows/dhp-swh/src/main/java/eu/dnetlib/dhp/swh/CollectSoftwareRepositoryURLs.java
@@ -0,0 +1,211 @@
+
+package eu.dnetlib.dhp.swh;
+
+import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkHiveSession;
+import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.http.Header;
+import org.apache.http.HttpEntity;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClients;
+import org.apache.http.util.EntityUtils;
+import org.apache.spark.SparkConf;
+import org.apache.spark.api.java.function.FlatMapFunction;
+import org.apache.spark.sql.*;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.StructType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+import eu.dnetlib.dhp.application.ArgumentApplicationParser;
+import eu.dnetlib.dhp.schema.common.ModelConstants;
+import eu.dnetlib.dhp.schema.oaf.*;
+
+/**
+ * Creates action sets for Crossref affiliation relations inferred by BIP!
+ */
+public class CollectSoftwareRepositoryURLs implements Serializable {
+
+ private static final Logger log = LoggerFactory.getLogger(CollectSoftwareRepositoryURLs.class);
+ // public static final String BIP_AFFILIATIONS_CLASSID = "result:organization:bipinference";
+// public static final String BIP_AFFILIATIONS_CLASSNAME = "Affiliation relation inferred by BIP!";
+// public static final String BIP_INFERENCE_PROVENANCE = "bip:affiliation:crossref";
+ private static final String DEFAULT_VISIT_TYPE = "git";
+ private static final int CONCURRENT_API_CALLS = 1;
+
+ private static final String SWH_LATEST_VISIT_URL = "https://archive.softwareheritage.org/api/1/origin/%s/visit/latest/";
+
+ public static void main(String[] args) throws Exception {
+
+ String jsonConfiguration = IOUtils
+ .toString(
+ CollectSoftwareRepositoryURLs.class
+ .getResourceAsStream("/eu/dnetlib/dhp/swh/input_parameters.json"));
+
+ final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
+ parser.parseArgument(args);
+
+ final Boolean isSparkSessionManaged = Optional
+ .ofNullable(parser.get("isSparkSessionManaged"))
+ .map(Boolean::valueOf)
+ .orElse(Boolean.TRUE);
+ log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
+
+ final String hiveDbName = parser.get("hiveDbName");
+ log.info("hiveDbName {}: ", hiveDbName);
+
+ final String outputPath = parser.get("softwareCodeRepositoryURLs");
+ log.info("softwareCodeRepositoryURLs {}: ", outputPath);
+
+ final String hiveMetastoreUris = parser.get("hiveMetastoreUris");
+ log.info("hiveMetastoreUris: {}", hiveMetastoreUris);
+
+ SparkConf conf = new SparkConf();
+ conf.set("hive.metastore.uris", hiveMetastoreUris);
+
+ runWithSparkHiveSession(
+ conf,
+ isSparkSessionManaged,
+ spark -> {
+ doRun(spark, hiveDbName, outputPath);
+ });
+ }
+
+ private static void doRun(SparkSession spark, String hiveDbName, String outputPath) {
+
+ String queryTemplate = "SELECT distinct coderepositoryurl.value " +
+ "FROM %s.software " +
+ "WHERE coderepositoryurl.value IS NOT NULL";
+ String query = String.format(queryTemplate, hiveDbName);
+
+ log.info("Hive query to fetch software code URLs: {}", query);
+
+ Dataset df = spark.sql(query);
+
+ // write distinct repository URLs
+ df
+ .write()
+ .mode(SaveMode.Overwrite)
+// .option("compression", "gzip")
+ .csv(outputPath);
+ }
+
+ private static Dataset readSoftware(SparkSession spark, String inputPath) {
+ return spark
+ .read()
+ .json(inputPath)
+ .select(
+ new Column("codeRepositoryUrl.value").as("codeRepositoryUrl"),
+ new Column("dataInfo.deletedbyinference"),
+ new Column("dataInfo.invisible"));
+ }
+
+ private static Dataset filterSoftware(Dataset softwareDF, Integer limit) {
+
+ Dataset df = softwareDF
+ .where(softwareDF.col("codeRepositoryUrl").isNotNull())
+ .where("deletedbyinference = false")
+ .where("invisible = false")
+ .drop("deletedbyinference")
+ .drop("invisible");
+
+// TODO remove when done
+ df = df.limit(limit);
+
+ return df;
+ }
+
+ public static Dataset makeParallelRequests(SparkSession spark, Dataset softwareDF) {
+ // TODO replace with coalesce ?
+ Dataset df = softwareDF.repartition(CONCURRENT_API_CALLS);
+
+ log.info("Number of partitions: {}", df.rdd().getNumPartitions());
+
+ ObjectMapper objectMapper = new ObjectMapper();
+
+ List collectedRows = df
+ .javaRDD()
+ // max parallelism should be equal to the number of partitions here
+ .mapPartitions((FlatMapFunction, Row>) partition -> {
+ List resultRows = new ArrayList<>();
+ while (partition.hasNext()) {
+ Row row = partition.next();
+ String url = String.format(SWH_LATEST_VISIT_URL, row.getString(0));
+
+// String snapshotId = null;
+// String type = null;
+// String date = null;
+
+ String responseBody = makeAPICall(url);
+ TimeUnit.SECONDS.sleep(1);
+// Thread.sleep(500);
+// if (responseBody != null) {
+// LastVisitResponse visitResponse = objectMapper.readValue(responseBody, LastVisitResponse.class);
+// snapshotId = visitResponse.getSnapshot();
+// type = visitResponse.getType();
+// date = visitResponse.getDate();
+// }
+// resultRows.add(RowFactory.create(url, snapshotId, type, date));
+
+ resultRows.add(RowFactory.create(url, responseBody));
+ }
+ return resultRows.iterator();
+
+ })
+ .collect();
+
+ StructType resultSchema = new StructType()
+ .add("codeRepositoryUrl", DataTypes.StringType)
+ .add("response", DataTypes.StringType);
+
+// .add("snapshotId", DataTypes.StringType)
+// .add("type", DataTypes.StringType)
+// .add("date", DataTypes.StringType);
+
+ // create a DataFrame from the collected rows
+ return spark.createDataFrame(collectedRows, resultSchema);
+ }
+
+ private static String makeAPICall(String url) throws IOException {
+ System.out.println(java.time.LocalDateTime.now());
+
+ try (CloseableHttpClient httpClient = HttpClients.createDefault()) {
+ HttpGet httpGet = new HttpGet(url);
+ httpGet
+ .setHeader(
+ "Authorization",
+ "Bearer eyJhbGciOiJIUzI1NiIsInR5cCIgOiAiSldUIiwia2lkIiA6ICJhMTMxYTQ1My1hM2IyLTQwMTUtODQ2Ny05MzAyZjk3MTFkOGEifQ.eyJpYXQiOjE2OTQ2MzYwMjAsImp0aSI6IjkwZjdkNTNjLTQ5YTktNGFiMy1hY2E0LTcwMTViMjEyZTNjNiIsImlzcyI6Imh0dHBzOi8vYXV0aC5zb2Z0d2FyZWhlcml0YWdlLm9yZy9hdXRoL3JlYWxtcy9Tb2Z0d2FyZUhlcml0YWdlIiwiYXVkIjoiaHR0cHM6Ly9hdXRoLnNvZnR3YXJlaGVyaXRhZ2Uub3JnL2F1dGgvcmVhbG1zL1NvZnR3YXJlSGVyaXRhZ2UiLCJzdWIiOiIzMTY5OWZkNC0xNmE0LTQxOWItYTdhMi00NjI5MDY4ZjI3OWEiLCJ0eXAiOiJPZmZsaW5lIiwiYXpwIjoic3doLXdlYiIsInNlc3Npb25fc3RhdGUiOiIzMjYzMzEwMS00ZDRkLTQwMjItODU2NC1iMzNlMTJiNTE3ZDkiLCJzY29wZSI6Im9wZW5pZCBvZmZsaW5lX2FjY2VzcyBwcm9maWxlIGVtYWlsIn0.XHj1VIZu1dZ4Ej32-oU84mFmaox9cLNjXosNxwZM0Xs");
+ try (CloseableHttpResponse response = httpClient.execute(httpGet)) {
+ int statusCode = response.getStatusLine().getStatusCode();
+// if (statusCode != 200)
+// return null;
+ Header[] headers = response.getHeaders("X-RateLimit-Remaining");
+ for (Header header : headers) {
+ System.out
+ .println(
+ "Key : " + header.getName()
+ + " ,Value : " + header.getValue());
+ }
+ HttpEntity entity = response.getEntity();
+ if (entity != null) {
+ return EntityUtils.toString(entity);
+ }
+ }
+ }
+ return null;
+ }
+}
diff --git a/dhp-workflows/dhp-swh/src/main/java/eu/dnetlib/dhp/swh/models/LastVisitResponse.java b/dhp-workflows/dhp-swh/src/main/java/eu/dnetlib/dhp/swh/models/LastVisitResponse.java
new file mode 100644
index 000000000..435397590
--- /dev/null
+++ b/dhp-workflows/dhp-swh/src/main/java/eu/dnetlib/dhp/swh/models/LastVisitResponse.java
@@ -0,0 +1,40 @@
+
+package eu.dnetlib.dhp.swh.models;
+
+import com.cloudera.com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class LastVisitResponse {
+
+ private String type;
+
+ private String date;
+
+ @JsonProperty("snapshot")
+ private String snapshotId;
+
+ public String getType() {
+ return type;
+ }
+
+ public void setType(String type) {
+ this.type = type;
+ }
+
+ public String getDate() {
+ return date;
+ }
+
+ public void setDate(String date) {
+ this.date = date;
+ }
+
+ public String getSnapshot() {
+ return snapshotId;
+ }
+
+ public void setSnapshot(String snapshotId) {
+ this.snapshotId = snapshotId;
+ }
+}
diff --git a/dhp-workflows/dhp-swh/src/main/resources/eu/dnetlib/dhp/swh/input_parameters.json b/dhp-workflows/dhp-swh/src/main/resources/eu/dnetlib/dhp/swh/input_parameters.json
new file mode 100644
index 000000000..dd5432b93
--- /dev/null
+++ b/dhp-workflows/dhp-swh/src/main/resources/eu/dnetlib/dhp/swh/input_parameters.json
@@ -0,0 +1,26 @@
+[
+ {
+ "paramName": "issm",
+ "paramLongName": "isSparkSessionManaged",
+ "paramDescription": "when true will stop SparkSession after job execution",
+ "paramRequired": false
+ },
+ {
+ "paramName": "ip",
+ "paramLongName": "softwareCodeRepositoryURLs",
+ "paramDescription": "the URL where to store software repository URLs",
+ "paramRequired": true
+ },
+ {
+ "paramName": "db",
+ "paramLongName": "hiveDbName",
+ "paramDescription": "the target hive database name",
+ "paramRequired": true
+ },
+ {
+ "paramName": "hmu",
+ "paramLongName": "hiveMetastoreUris",
+ "paramDescription": "the hive metastore uris",
+ "paramRequired": true
+ }
+]
\ No newline at end of file
diff --git a/dhp-workflows/dhp-swh/src/main/resources/eu/dnetlib/dhp/swh/job.properties b/dhp-workflows/dhp-swh/src/main/resources/eu/dnetlib/dhp/swh/job.properties
new file mode 100644
index 000000000..a63343aed
--- /dev/null
+++ b/dhp-workflows/dhp-swh/src/main/resources/eu/dnetlib/dhp/swh/job.properties
@@ -0,0 +1,25 @@
+# hive
+hiveDbName=openaire_prod_20230914
+hiveMetastoreUris=thrift://iis-cdh5-test-m3.ocean.icm.edu.pl:9083
+
+# oozie
+oozie.action.sharelib.for.spark=spark2
+oozie.use.system.libpath=true
+oozie.wf.application.path=${oozieTopWfApplicationPath}
+oozie.wf.application.path=${oozieTopWfApplicationPath}
+oozieActionShareLibForSpark2=spark2
+
+# spark
+spark2EventLogDir=/user/spark/spark2ApplicationHistory
+spark2ExtraListeners=com.cloudera.spark.lineage.NavigatorAppListener
+spark2SqlQueryExecutionListeners=com.cloudera.spark.lineage.NavigatorQueryListener
+spark2YarnHistoryServerAddress=http://iis-cdh5-test-gw.ocean.icm.edu.pl:18089
+sparkSqlWarehouseDir=/user/hive/warehouse
+
+# misc
+wfAppPath=${oozieTopWfApplicationPath}
+resourceManager=http://iis-cdh5-test-m2.ocean.icm.edu.pl:8088/cluster
+
+# custom params
+softwareCodeRepositoryURLs=${workingDir}/code_repo_urls.csv
+resume=collect-software-repository-urls
diff --git a/dhp-workflows/dhp-swh/src/main/resources/eu/dnetlib/dhp/swh/oozie_app/workflow.xml b/dhp-workflows/dhp-swh/src/main/resources/eu/dnetlib/dhp/swh/oozie_app/workflow.xml
new file mode 100644
index 000000000..9832e5f26
--- /dev/null
+++ b/dhp-workflows/dhp-swh/src/main/resources/eu/dnetlib/dhp/swh/oozie_app/workflow.xml
@@ -0,0 +1,101 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+ ${jobTracker}
+ ${nameNode}
+
+
+
+
+
+ Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]
+
+
+
+
+ ${wf:conf('startFrom') eq 'collect-software-repository-urls'}
+
+
+
+
+
+
+ yarn
+ cluster
+ Collect software repository URLs
+ eu.dnetlib.dhp.swh.CollectSoftwareRepositoryURLs
+ dhp-swh-${projectVersion}.jar
+
+ --executor-memory=${sparkExecutorMemory}
+ --executor-cores=${sparkExecutorCores}
+ --driver-memory=${sparkDriverMemory}
+ --conf spark.extraListeners=${spark2ExtraListeners}
+ --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
+ --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
+ --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
+ --conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
+
+
+ --softwareCodeRepositoryURLs${softwareCodeRepositoryURLs}
+ --hiveDbName${hiveDbName}
+ --hiveMetastoreUris${hiveMetastoreUris}
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/dhp-workflows/pom.xml b/dhp-workflows/pom.xml
index d054ba39b..64f5f2d26 100644
--- a/dhp-workflows/pom.xml
+++ b/dhp-workflows/pom.xml
@@ -39,6 +39,7 @@
dhp-broker-events
dhp-doiboost
dhp-impact-indicators
+ dhp-swh