diff --git a/.gitignore b/.gitignore index 73d9179fa..14cd4d345 100644 --- a/.gitignore +++ b/.gitignore @@ -26,3 +26,4 @@ spark-warehouse /**/*.log /**/.factorypath /**/.scalafmt.conf +/.java-version diff --git a/dhp-common/src/main/java/eu/dnetlib/dhp/common/Constants.java b/dhp-common/src/main/java/eu/dnetlib/dhp/common/Constants.java index 4f2c6341e..0477d6399 100644 --- a/dhp-common/src/main/java/eu/dnetlib/dhp/common/Constants.java +++ b/dhp-common/src/main/java/eu/dnetlib/dhp/common/Constants.java @@ -51,6 +51,7 @@ public class Constants { public static final String RETRY_DELAY = "retryDelay"; public static final String CONNECT_TIMEOUT = "connectTimeOut"; public static final String READ_TIMEOUT = "readTimeOut"; + public static final String REQUEST_METHOD = "requestMethod"; public static final String FROM_DATE_OVERRIDE = "fromDateOverride"; public static final String UNTIL_DATE_OVERRIDE = "untilDateOverride"; diff --git a/dhp-common/src/main/java/eu/dnetlib/dhp/common/collection/HttpClientParams.java b/dhp-common/src/main/java/eu/dnetlib/dhp/common/collection/HttpClientParams.java index 6fcec00dd..d26d9c0e9 100644 --- a/dhp-common/src/main/java/eu/dnetlib/dhp/common/collection/HttpClientParams.java +++ b/dhp-common/src/main/java/eu/dnetlib/dhp/common/collection/HttpClientParams.java @@ -1,6 +1,9 @@ package eu.dnetlib.dhp.common.collection; +import java.util.HashMap; +import java.util.Map; + /** * Bundles the http connection parameters driving the client behaviour. */ @@ -13,6 +16,8 @@ public class HttpClientParams { public static int _connectTimeOut = 10; // seconds public static int _readTimeOut = 30; // seconds + public static String _requestMethod = "GET"; + /** * Maximum number of allowed retires before failing */ @@ -38,17 +43,30 @@ public class HttpClientParams { */ private int readTimeOut; + /** + * Custom http headers + */ + private Map headers; + + /** + * Request method (i.e., GET, POST etc) + */ + private String requestMethod; + public HttpClientParams() { - this(_maxNumberOfRetry, _requestDelay, _retryDelay, _connectTimeOut, _readTimeOut); + this(_maxNumberOfRetry, _requestDelay, _retryDelay, _connectTimeOut, _readTimeOut, new HashMap<>(), + _requestMethod); } public HttpClientParams(int maxNumberOfRetry, int requestDelay, int retryDelay, int connectTimeOut, - int readTimeOut) { + int readTimeOut, Map headers, String requestMethod) { this.maxNumberOfRetry = maxNumberOfRetry; this.requestDelay = requestDelay; this.retryDelay = retryDelay; this.connectTimeOut = connectTimeOut; this.readTimeOut = readTimeOut; + this.headers = headers; + this.requestMethod = requestMethod; } public int getMaxNumberOfRetry() { @@ -91,4 +109,19 @@ public class HttpClientParams { this.readTimeOut = readTimeOut; } + public Map getHeaders() { + return headers; + } + + public void setHeaders(Map headers) { + this.headers = headers; + } + + public String getRequestMethod() { + return requestMethod; + } + + public void setRequestMethod(String requestMethod) { + this.requestMethod = requestMethod; + } } diff --git a/dhp-common/src/main/java/eu/dnetlib/dhp/common/collection/HttpConnector2.java b/dhp-common/src/main/java/eu/dnetlib/dhp/common/collection/HttpConnector2.java index dd46ab1f4..905457bcd 100644 --- a/dhp-common/src/main/java/eu/dnetlib/dhp/common/collection/HttpConnector2.java +++ b/dhp-common/src/main/java/eu/dnetlib/dhp/common/collection/HttpConnector2.java @@ -107,7 +107,14 @@ public class HttpConnector2 { urlConn.setReadTimeout(getClientParams().getReadTimeOut() * 1000); urlConn.setConnectTimeout(getClientParams().getConnectTimeOut() * 1000); urlConn.addRequestProperty(HttpHeaders.USER_AGENT, userAgent); + urlConn.setRequestMethod(getClientParams().getRequestMethod()); + // if provided, add custom headers + if (!getClientParams().getHeaders().isEmpty()) { + for (Map.Entry headerEntry : getClientParams().getHeaders().entrySet()) { + urlConn.addRequestProperty(headerEntry.getKey(), headerEntry.getValue()); + } + } if (log.isDebugEnabled()) { logHeaderFields(urlConn); } diff --git a/dhp-common/src/main/java/eu/dnetlib/dhp/oa/merge/GroupEntitiesSparkJob.java b/dhp-common/src/main/java/eu/dnetlib/dhp/oa/merge/GroupEntitiesSparkJob.java index 87510c108..99981bf6a 100644 --- a/dhp-common/src/main/java/eu/dnetlib/dhp/oa/merge/GroupEntitiesSparkJob.java +++ b/dhp-common/src/main/java/eu/dnetlib/dhp/oa/merge/GroupEntitiesSparkJob.java @@ -33,7 +33,7 @@ import scala.Tuple2; public class GroupEntitiesSparkJob { private static final Logger log = LoggerFactory.getLogger(GroupEntitiesSparkJob.class); - private static final Encoder OAFENTITY_KRYO_ENC = Encoders.kryo(OafEntity.class); + private static final Encoder OAFENTITY_KRYO_ENC = Encoders.kryo(OafEntity.class); public static void main(String[] args) throws Exception { @@ -114,7 +114,7 @@ public class GroupEntitiesSparkJob { Encoders.tuple(Encoders.STRING(), OAFENTITY_KRYO_ENC)); // pivot on "_1" (classname of the entity) - // created columns containing only entities of the same class + // created columns containing only entities of the same class for (Map.Entry e : ModelSupport.entityTypes.entrySet()) { String entity = e.getKey().name(); Class entityClass = e.getValue(); diff --git a/dhp-common/src/main/java/eu/dnetlib/dhp/schema/oaf/utils/PmidCleaningRule.java b/dhp-common/src/main/java/eu/dnetlib/dhp/schema/oaf/utils/PmidCleaningRule.java index d0f5a3b27..c0c451b88 100644 --- a/dhp-common/src/main/java/eu/dnetlib/dhp/schema/oaf/utils/PmidCleaningRule.java +++ b/dhp-common/src/main/java/eu/dnetlib/dhp/schema/oaf/utils/PmidCleaningRule.java @@ -7,7 +7,7 @@ import java.util.regex.Pattern; // https://researchguides.stevens.edu/c.php?g=442331&p=6577176 public class PmidCleaningRule { - public static final Pattern PATTERN = Pattern.compile("[1-9]{1,8}"); + public static final Pattern PATTERN = Pattern.compile("0*(\\d{1,8})"); public static String clean(String pmid) { String s = pmid @@ -17,7 +17,7 @@ public class PmidCleaningRule { final Matcher m = PATTERN.matcher(s); if (m.find()) { - return m.group(); + return m.group(1); } return ""; } diff --git a/dhp-common/src/test/java/eu/dnetlib/dhp/schema/oaf/utils/PmidCleaningRuleTest.java b/dhp-common/src/test/java/eu/dnetlib/dhp/schema/oaf/utils/PmidCleaningRuleTest.java index 9562adf7e..295eac85f 100644 --- a/dhp-common/src/test/java/eu/dnetlib/dhp/schema/oaf/utils/PmidCleaningRuleTest.java +++ b/dhp-common/src/test/java/eu/dnetlib/dhp/schema/oaf/utils/PmidCleaningRuleTest.java @@ -9,10 +9,16 @@ class PmidCleaningRuleTest { @Test void testCleaning() { + // leading zeros are removed assertEquals("1234", PmidCleaningRule.clean("01234")); + // tolerant to spaces in the middle assertEquals("1234567", PmidCleaningRule.clean("0123 4567")); + // stop parsing at first not numerical char assertEquals("123", PmidCleaningRule.clean("0123x4567")); + // invalid id leading to empty result assertEquals("", PmidCleaningRule.clean("abc")); + // valid id with zeroes in the number + assertEquals("20794075", PmidCleaningRule.clean("20794075")); } } diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/opencitations/CreateActionSetSparkJob.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/opencitations/CreateActionSetSparkJob.java index e3a9833b3..2db756a94 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/opencitations/CreateActionSetSparkJob.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/opencitations/CreateActionSetSparkJob.java @@ -32,18 +32,28 @@ import eu.dnetlib.dhp.schema.oaf.*; import eu.dnetlib.dhp.schema.oaf.utils.CleaningFunctions; import eu.dnetlib.dhp.schema.oaf.utils.IdentifierFactory; import eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils; +import eu.dnetlib.dhp.schema.oaf.utils.PidType; import eu.dnetlib.dhp.utils.DHPUtils; import scala.Tuple2; public class CreateActionSetSparkJob implements Serializable { public static final String OPENCITATIONS_CLASSID = "sysimport:crosswalk:opencitations"; public static final String OPENCITATIONS_CLASSNAME = "Imported from OpenCitations"; + + // DOI-to-DOI citations + public static final String COCI = "COCI"; + + // PMID-to-PMID citations + public static final String POCI = "POCI"; + private static final String DOI_PREFIX = "50|doi_________::"; private static final String PMID_PREFIX = "50|pmid________::"; + private static final String TRUST = "0.91"; private static final Logger log = LoggerFactory.getLogger(CreateActionSetSparkJob.class); + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); public static void main(final String[] args) throws IOException, ParseException { @@ -67,7 +77,7 @@ public class CreateActionSetSparkJob implements Serializable { log.info("isSparkSessionManaged: {}", isSparkSessionManaged); final String inputPath = parser.get("inputPath"); - log.info("inputPath {}", inputPath.toString()); + log.info("inputPath {}", inputPath); final String outputPath = parser.get("outputPath"); log.info("outputPath {}", outputPath); @@ -81,19 +91,16 @@ public class CreateActionSetSparkJob implements Serializable { runWithSparkSession( conf, isSparkSessionManaged, - spark -> { - extractContent(spark, inputPath, outputPath, shouldDuplicateRels); - }); + spark -> extractContent(spark, inputPath, outputPath, shouldDuplicateRels)); } private static void extractContent(SparkSession spark, String inputPath, String outputPath, boolean shouldDuplicateRels) { - getTextTextJavaPairRDD(spark, inputPath, shouldDuplicateRels, "COCI") - .union(getTextTextJavaPairRDD(spark, inputPath, shouldDuplicateRels, "POCI")) + getTextTextJavaPairRDD(spark, inputPath, shouldDuplicateRels, COCI) + .union(getTextTextJavaPairRDD(spark, inputPath, shouldDuplicateRels, POCI)) .saveAsHadoopFile(outputPath, Text.class, Text.class, SequenceFileOutputFormat.class); - } private static JavaPairRDD getTextTextJavaPairRDD(SparkSession spark, String inputPath, @@ -109,7 +116,7 @@ public class CreateActionSetSparkJob implements Serializable { value, shouldDuplicateRels, prefix) .iterator(), Encoders.bean(Relation.class)) - .filter((FilterFunction) value -> value != null) + .filter((FilterFunction) Objects::nonNull) .toJavaRDD() .map(p -> new AtomicAction(p.getClass(), p)) .mapToPair( @@ -123,20 +130,28 @@ public class CreateActionSetSparkJob implements Serializable { String prefix; String citing; String cited; - if (p.equals("COCI")) { - prefix = DOI_PREFIX; - citing = prefix - + IdentifierFactory.md5(CleaningFunctions.normalizePidValue("doi", value.getCiting())); - cited = prefix - + IdentifierFactory.md5(CleaningFunctions.normalizePidValue("doi", value.getCited())); - - } else { - prefix = PMID_PREFIX; - citing = prefix - + IdentifierFactory.md5(CleaningFunctions.normalizePidValue("pmid", value.getCiting())); - cited = prefix - + IdentifierFactory.md5(CleaningFunctions.normalizePidValue("pmid", value.getCited())); + switch (p) { + case COCI: + prefix = DOI_PREFIX; + citing = prefix + + IdentifierFactory + .md5(CleaningFunctions.normalizePidValue(PidType.doi.toString(), value.getCiting())); + cited = prefix + + IdentifierFactory + .md5(CleaningFunctions.normalizePidValue(PidType.doi.toString(), value.getCited())); + break; + case POCI: + prefix = PMID_PREFIX; + citing = prefix + + IdentifierFactory + .md5(CleaningFunctions.normalizePidValue(PidType.pmid.toString(), value.getCiting())); + cited = prefix + + IdentifierFactory + .md5(CleaningFunctions.normalizePidValue(PidType.pmid.toString(), value.getCited())); + break; + default: + throw new IllegalStateException("Invalid prefix: " + p); } if (!citing.equals(cited)) { @@ -162,7 +177,7 @@ public class CreateActionSetSparkJob implements Serializable { public static Relation getRelation( String source, String target, - String relclass) { + String relClass) { return OafMapperUtils .getRelation( @@ -170,7 +185,7 @@ public class CreateActionSetSparkJob implements Serializable { target, ModelConstants.RESULT_RESULT, ModelConstants.CITATION, - relclass, + relClass, Arrays .asList( OafMapperUtils.keyValue(ModelConstants.OPENOCITATIONS_ID, ModelConstants.OPENOCITATIONS_NAME)), @@ -183,6 +198,6 @@ public class CreateActionSetSparkJob implements Serializable { ModelConstants.DNET_PROVENANCE_ACTIONS, ModelConstants.DNET_PROVENANCE_ACTIONS), TRUST), null); - } + } diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/opencitations/GetOpenCitationsRefs.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/opencitations/GetOpenCitationsRefs.java index 60dc998ef..600cf7df1 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/opencitations/GetOpenCitationsRefs.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/opencitations/GetOpenCitationsRefs.java @@ -3,6 +3,7 @@ package eu.dnetlib.dhp.actionmanager.opencitations; import java.io.*; import java.io.Serializable; +import java.util.Arrays; import java.util.Objects; import java.util.zip.GZIPOutputStream; import java.util.zip.ZipEntry; @@ -37,7 +38,7 @@ public class GetOpenCitationsRefs implements Serializable { parser.parseArgument(args); final String[] inputFile = parser.get("inputFile").split(";"); - log.info("inputFile {}", inputFile.toString()); + log.info("inputFile {}", Arrays.asList(inputFile)); final String workingPath = parser.get("workingPath"); log.info("workingPath {}", workingPath); diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/opencitations/ReadCOCI.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/opencitations/ReadCOCI.java index 3d384de9d..b9c24df3b 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/opencitations/ReadCOCI.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/opencitations/ReadCOCI.java @@ -7,6 +7,7 @@ import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; import java.io.IOException; import java.io.Serializable; +import java.util.Arrays; import java.util.Optional; import org.apache.commons.io.IOUtils; @@ -42,7 +43,7 @@ public class ReadCOCI implements Serializable { log.info("outputPath: {}", outputPath); final String[] inputFile = parser.get("inputFile").split(";"); - log.info("inputFile {}", inputFile.toString()); + log.info("inputFile {}", Arrays.asList(inputFile)); Boolean isSparkSessionManaged = isSparkSessionManaged(parser); log.info("isSparkSessionManaged: {}", isSparkSessionManaged); @@ -74,10 +75,10 @@ public class ReadCOCI implements Serializable { private static void doRead(SparkSession spark, String workingPath, String[] inputFiles, String outputPath, - String delimiter, String format) throws IOException { + String delimiter, String format) { for (String inputFile : inputFiles) { - String p_string = workingPath + "/" + inputFile + ".gz"; + String pString = workingPath + "/" + inputFile + ".gz"; Dataset cociData = spark .read() @@ -86,7 +87,7 @@ public class ReadCOCI implements Serializable { .option("inferSchema", "true") .option("header", "true") .option("quotes", "\"") - .load(p_string) + .load(pString) .repartition(100); cociData.map((MapFunction) row -> { diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/opencitations/as_parameters.json b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/opencitations/as_parameters.json index e25d1f4b8..5244a6fe4 100644 --- a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/opencitations/as_parameters.json +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/opencitations/as_parameters.json @@ -16,15 +16,11 @@ "paramLongName": "isSparkSessionManaged", "paramDescription": "the hdfs name node", "paramRequired": false - }, { - "paramName": "sdr", - "paramLongName": "shouldDuplicateRels", - "paramDescription": "the hdfs name node", - "paramRequired": false -},{ - "paramName": "p", - "paramLongName": "prefix", - "paramDescription": "the hdfs name node", - "paramRequired": true -} + }, + { + "paramName": "sdr", + "paramLongName": "shouldDuplicateRels", + "paramDescription": "activates/deactivates the construction of bidirectional relations Cites/IsCitedBy", + "paramRequired": false + } ] diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/opencitations/oozie_app/workflow.xml b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/opencitations/oozie_app/workflow.xml index d87dfa2ba..deb32459b 100644 --- a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/opencitations/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/opencitations/oozie_app/workflow.xml @@ -34,6 +34,7 @@ Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] + ${jobTracker} @@ -54,6 +55,7 @@ + eu.dnetlib.dhp.actionmanager.opencitations.GetOpenCitationsRefs @@ -112,7 +114,6 @@ --inputPath${workingPath} --outputPath${outputPath} - --prefix${prefix} diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkPropagateRelation.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkPropagateRelation.java index 739295c91..cb1c70059 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkPropagateRelation.java +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkPropagateRelation.java @@ -67,60 +67,60 @@ public class SparkPropagateRelation extends AbstractSparkAction { log.info("graphOutputPath: '{}'", graphOutputPath); Dataset mergeRels = spark - .read() - .load(DedupUtility.createMergeRelPath(workingPath, "*", "*")) - .as(REL_BEAN_ENC); + .read() + .load(DedupUtility.createMergeRelPath(workingPath, "*", "*")) + .as(REL_BEAN_ENC); // Dataset idsToMerge = mergeRels - .where(col("relClass").equalTo(ModelConstants.MERGES)) - .select(col("source").as("dedupID"), col("target").as("mergedObjectID")) - .distinct(); + .where(col("relClass").equalTo(ModelConstants.MERGES)) + .select(col("source").as("dedupID"), col("target").as("mergedObjectID")) + .distinct(); Dataset allRels = spark - .read() - .schema(REL_BEAN_ENC.schema()) - .json(graphBasePath + "/relation"); + .read() + .schema(REL_BEAN_ENC.schema()) + .json(graphBasePath + "/relation"); Dataset dedupedRels = allRels - .joinWith(idsToMerge, allRels.col("source").equalTo(idsToMerge.col("mergedObjectID")), "left_outer") - .joinWith(idsToMerge, col("_1.target").equalTo(idsToMerge.col("mergedObjectID")), "left_outer") - .select("_1._1", "_1._2.dedupID", "_2.dedupID") - .as(Encoders.tuple(REL_BEAN_ENC, Encoders.STRING(), Encoders.STRING())) - .map((MapFunction, Relation>) t -> { - Relation rel = t._1(); - String newSource = t._2(); - String newTarget = t._3(); + .joinWith(idsToMerge, allRels.col("source").equalTo(idsToMerge.col("mergedObjectID")), "left_outer") + .joinWith(idsToMerge, col("_1.target").equalTo(idsToMerge.col("mergedObjectID")), "left_outer") + .select("_1._1", "_1._2.dedupID", "_2.dedupID") + .as(Encoders.tuple(REL_BEAN_ENC, Encoders.STRING(), Encoders.STRING())) + .map((MapFunction, Relation>) t -> { + Relation rel = t._1(); + String newSource = t._2(); + String newTarget = t._3(); - if (rel.getDataInfo() == null) { - rel.setDataInfo(new DataInfo()); - } + if (rel.getDataInfo() == null) { + rel.setDataInfo(new DataInfo()); + } - if (newSource != null || newTarget != null) { - rel.getDataInfo().setDeletedbyinference(false); + if (newSource != null || newTarget != null) { + rel.getDataInfo().setDeletedbyinference(false); - if (newSource != null) - rel.setSource(newSource); + if (newSource != null) + rel.setSource(newSource); - if (newTarget != null) - rel.setTarget(newTarget); - } + if (newTarget != null) + rel.setTarget(newTarget); + } - return rel; - }, REL_BEAN_ENC); + return rel; + }, REL_BEAN_ENC); // ids of records that are both not deletedbyinference and not invisible Dataset ids = validIds(spark, graphBasePath); // filter relations that point to valid records, can force them to be visible Dataset cleanedRels = dedupedRels - .join(ids, col("source").equalTo(ids.col("id")), "leftsemi") - .join(ids, col("target").equalTo(ids.col("id")), "leftsemi") - .as(REL_BEAN_ENC) - .map((MapFunction) r -> { - r.getDataInfo().setInvisible(false); - return r; - }, REL_KRYO_ENC); + .join(ids, col("source").equalTo(ids.col("id")), "leftsemi") + .join(ids, col("target").equalTo(ids.col("id")), "leftsemi") + .as(REL_BEAN_ENC) + .map((MapFunction) r -> { + r.getDataInfo().setInvisible(false); + return r; + }, REL_KRYO_ENC); Dataset distinctRels = cleanedRels .groupByKey( diff --git a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/group/GroupEntitiesSparkJobTest.java b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/group/GroupEntitiesSparkJobTest.java index b878e778e..0887adf45 100644 --- a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/group/GroupEntitiesSparkJobTest.java +++ b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/group/GroupEntitiesSparkJobTest.java @@ -1,14 +1,14 @@ package eu.dnetlib.dhp.oa.graph.group; -import com.fasterxml.jackson.databind.DeserializationFeature; -import com.fasterxml.jackson.databind.ObjectMapper; -import eu.dnetlib.dhp.common.HdfsSupport; -import eu.dnetlib.dhp.oa.merge.GroupEntitiesSparkJob; -import eu.dnetlib.dhp.schema.common.ModelSupport; -import eu.dnetlib.dhp.schema.oaf.OafEntity; -import eu.dnetlib.dhp.schema.oaf.Result; -import eu.dnetlib.dhp.utils.DHPUtils; +import static org.junit.jupiter.api.Assertions.assertEquals; + +import java.io.IOException; +import java.net.URISyntaxException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; + import org.apache.commons.io.FileUtils; import org.apache.spark.SparkConf; import org.apache.spark.api.java.function.FilterFunction; @@ -18,108 +18,108 @@ import org.apache.spark.sql.Encoders; import org.apache.spark.sql.SparkSession; import org.junit.jupiter.api.*; -import java.io.IOException; -import java.net.URISyntaxException; -import java.nio.file.Files; -import java.nio.file.Path; -import java.nio.file.Paths; +import com.fasterxml.jackson.databind.DeserializationFeature; +import com.fasterxml.jackson.databind.ObjectMapper; -import static org.junit.jupiter.api.Assertions.assertEquals; +import eu.dnetlib.dhp.common.HdfsSupport; +import eu.dnetlib.dhp.oa.merge.GroupEntitiesSparkJob; +import eu.dnetlib.dhp.schema.common.ModelSupport; +import eu.dnetlib.dhp.schema.oaf.OafEntity; +import eu.dnetlib.dhp.schema.oaf.Result; +import eu.dnetlib.dhp.utils.DHPUtils; @TestMethodOrder(MethodOrderer.OrderAnnotation.class) public class GroupEntitiesSparkJobTest { - private static SparkSession spark; + private static SparkSession spark; - private static ObjectMapper mapper = new ObjectMapper() - .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); + private static ObjectMapper mapper = new ObjectMapper() + .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); - private static Path workingDir; - private Path dataInputPath; + private static Path workingDir; + private Path dataInputPath; - private Path checkpointPath; + private Path checkpointPath; - private Path outputPath; + private Path outputPath; - @BeforeAll - public static void beforeAll() throws IOException { - workingDir = Files.createTempDirectory(GroupEntitiesSparkJob.class.getSimpleName()); + @BeforeAll + public static void beforeAll() throws IOException { + workingDir = Files.createTempDirectory(GroupEntitiesSparkJob.class.getSimpleName()); - SparkConf conf = new SparkConf(); - conf.setAppName(GroupEntitiesSparkJob.class.getSimpleName()); - conf.setMaster("local"); - conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); - conf.registerKryoClasses(ModelSupport.getOafModelClasses()); - spark = SparkSession.builder().config(conf).getOrCreate(); - } + SparkConf conf = new SparkConf(); + conf.setAppName(GroupEntitiesSparkJob.class.getSimpleName()); + conf.setMaster("local"); + conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); + conf.registerKryoClasses(ModelSupport.getOafModelClasses()); + spark = SparkSession.builder().config(conf).getOrCreate(); + } - @BeforeEach - public void beforeEach() throws IOException, URISyntaxException { - dataInputPath = Paths.get(ClassLoader.getSystemResource("eu/dnetlib/dhp/oa/graph/group").toURI()); - checkpointPath = workingDir.resolve("grouped_entity"); - outputPath = workingDir.resolve("dispatched_entity"); - } + @BeforeEach + public void beforeEach() throws IOException, URISyntaxException { + dataInputPath = Paths.get(ClassLoader.getSystemResource("eu/dnetlib/dhp/oa/graph/group").toURI()); + checkpointPath = workingDir.resolve("grouped_entity"); + outputPath = workingDir.resolve("dispatched_entity"); + } - @AfterAll - public static void afterAll() throws IOException { - spark.stop(); - FileUtils.deleteDirectory(workingDir.toFile()); - } + @AfterAll + public static void afterAll() throws IOException { + spark.stop(); + FileUtils.deleteDirectory(workingDir.toFile()); + } - @Test - @Order(1) - void testGroupEntities() throws Exception { - GroupEntitiesSparkJob.main(new String[]{ - "-isSparkSessionManaged", - Boolean.FALSE.toString(), - "-graphInputPath", - dataInputPath.toString(), - "-checkpointPath", - checkpointPath.toString(), - "-outputPath", - outputPath.toString(), - "-filterInvisible", - Boolean.FALSE.toString() - }); + @Test + @Order(1) + void testGroupEntities() throws Exception { + GroupEntitiesSparkJob.main(new String[] { + "-isSparkSessionManaged", + Boolean.FALSE.toString(), + "-graphInputPath", + dataInputPath.toString(), + "-checkpointPath", + checkpointPath.toString(), + "-outputPath", + outputPath.toString(), + "-filterInvisible", + Boolean.FALSE.toString() + }); - Dataset checkpointTable = spark - .read() - .load(checkpointPath.toString()) - .selectExpr("COALESCE(*)") - .as(Encoders.kryo(OafEntity.class)); + Dataset checkpointTable = spark + .read() + .load(checkpointPath.toString()) + .selectExpr("COALESCE(*)") + .as(Encoders.kryo(OafEntity.class)); + assertEquals( + 1, + checkpointTable + .filter( + (FilterFunction) r -> "50|doi_________::09821844208a5cd6300b2bfb13bca1b9" + .equals(r.getId()) && + r.getCollectedfrom().stream().anyMatch(kv -> kv.getValue().equalsIgnoreCase("zenodo"))) + .count()); - assertEquals( - 1, - checkpointTable - .filter( - (FilterFunction) r -> "50|doi_________::09821844208a5cd6300b2bfb13bca1b9" - .equals(r.getId()) && - r.getCollectedfrom().stream().anyMatch(kv -> kv.getValue().equalsIgnoreCase("zenodo"))) - .count()); + Dataset output = spark + .read() + .textFile( + DHPUtils + .toSeq( + HdfsSupport + .listFiles(outputPath.toString(), spark.sparkContext().hadoopConfiguration()))) + .map((MapFunction) s -> mapper.readValue(s, Result.class), Encoders.bean(Result.class)); - - Dataset output = spark - .read() - .textFile( - DHPUtils - .toSeq( - HdfsSupport - .listFiles(outputPath.toString(), spark.sparkContext().hadoopConfiguration()))) - .map((MapFunction) s -> mapper.readValue(s, Result.class), Encoders.bean(Result.class)); - - assertEquals(3, output.count()); - assertEquals( - 2, - output - .map((MapFunction) r -> r.getResulttype().getClassid(), Encoders.STRING()) - .filter((FilterFunction) s -> s.equals("publication")) - .count()); - assertEquals( - 1, - output - .map((MapFunction) r -> r.getResulttype().getClassid(), Encoders.STRING()) - .filter((FilterFunction) s -> s.equals("dataset")) - .count()); - } -} \ No newline at end of file + assertEquals(3, output.count()); + assertEquals( + 2, + output + .map((MapFunction) r -> r.getResulttype().getClassid(), Encoders.STRING()) + .filter((FilterFunction) s -> s.equals("publication")) + .count()); + assertEquals( + 1, + output + .map((MapFunction) r -> r.getResulttype().getClassid(), Encoders.STRING()) + .filter((FilterFunction) s -> s.equals("dataset")) + .count()); + } +} diff --git a/dhp-workflows/dhp-swh/pom.xml b/dhp-workflows/dhp-swh/pom.xml new file mode 100644 index 000000000..80fff4587 --- /dev/null +++ b/dhp-workflows/dhp-swh/pom.xml @@ -0,0 +1,110 @@ + + + 4.0.0 + + eu.dnetlib.dhp + dhp-workflows + 1.2.5-SNAPSHOT + + dhp-swh + + + + org.apache.spark + spark-core_${scala.binary.version} + + + + org.apache.spark + spark-sql_${scala.binary.version} + + + + eu.dnetlib.dhp + dhp-common + ${project.version} + + + net.sf.saxon + Saxon-HE + + + + + + dom4j + dom4j + + + + xml-apis + xml-apis + + + + jaxen + jaxen + + + + org.apache.hadoop + hadoop-distcp + + + + eu.dnetlib + dnet-actionmanager-api + + + eu.dnetlib + dnet-actionmanager-common + + + eu.dnetlib + dnet-openaireplus-mapping-utils + + + saxonica + saxon + + + saxonica + saxon-dom + + + jgrapht + jgrapht + + + net.sf.ehcache + ehcache + + + org.springframework + spring-test + + + org.apache.* + * + + + apache + * + + + + + + org.apache.httpcomponents + httpclient + 4.5.13 + + + org.datanucleus + datanucleus-core + 3.2.10 + compile + + + + diff --git a/dhp-workflows/dhp-swh/src/main/java/eu/dnetlib/dhp/swh/ArchiveRepositoryURLs.java b/dhp-workflows/dhp-swh/src/main/java/eu/dnetlib/dhp/swh/ArchiveRepositoryURLs.java new file mode 100644 index 000000000..baa510346 --- /dev/null +++ b/dhp-workflows/dhp-swh/src/main/java/eu/dnetlib/dhp/swh/ArchiveRepositoryURLs.java @@ -0,0 +1,176 @@ + +package eu.dnetlib.dhp.swh; + +import static eu.dnetlib.dhp.utils.DHPUtils.getHadoopConfiguration; + +import java.io.IOException; +import java.net.URL; +import java.text.SimpleDateFormat; +import java.util.Date; +import java.util.Optional; +import java.util.concurrent.TimeUnit; + +import org.apache.commons.cli.ParseException; +import org.apache.commons.io.IOUtils; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.io.Text; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.fasterxml.jackson.databind.ObjectMapper; + +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.common.collection.CollectorException; +import eu.dnetlib.dhp.common.collection.HttpClientParams; +import eu.dnetlib.dhp.schema.common.ModelSupport; +import eu.dnetlib.dhp.schema.oaf.utils.GraphCleaningFunctions; +import eu.dnetlib.dhp.swh.models.LastVisitData; +import eu.dnetlib.dhp.swh.utils.SWHConnection; +import eu.dnetlib.dhp.swh.utils.SWHConstants; +import eu.dnetlib.dhp.swh.utils.SWHUtils; + +/** + * Sends archive requests to the SWH API for those software repository URLs that are missing from them + * + * @author Serafeim Chatzopoulos + */ +public class ArchiveRepositoryURLs { + + private static final Logger log = LoggerFactory.getLogger(ArchiveRepositoryURLs.class); + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + + private static SWHConnection swhConnection = null; + + public static void main(final String[] args) throws IOException, ParseException { + final ArgumentApplicationParser argumentParser = new ArgumentApplicationParser( + IOUtils + .toString( + CollectLastVisitRepositoryData.class + .getResourceAsStream( + "/eu/dnetlib/dhp/swh/input_archive_repository_urls.json"))); + argumentParser.parseArgument(args); + + final String hdfsuri = argumentParser.get("namenode"); + log.info("hdfsURI: {}", hdfsuri); + + final String inputPath = argumentParser.get("lastVisitsPath"); + log.info("inputPath: {}", inputPath); + + final String outputPath = argumentParser.get("archiveRequestsPath"); + log.info("outputPath: {}", outputPath); + + final Integer archiveThresholdInDays = Integer.parseInt(argumentParser.get("archiveThresholdInDays")); + log.info("archiveThresholdInDays: {}", archiveThresholdInDays); + + final String apiAccessToken = argumentParser.get("apiAccessToken"); + log.info("apiAccessToken: {}", apiAccessToken); + + final HttpClientParams clientParams = SWHUtils.getClientParams(argumentParser); + + swhConnection = new SWHConnection(clientParams, apiAccessToken); + + final FileSystem fs = FileSystem.get(getHadoopConfiguration(hdfsuri)); + + archive(fs, inputPath, outputPath, archiveThresholdInDays); + + } + + private static void archive(FileSystem fs, String inputPath, String outputPath, Integer archiveThresholdInDays) + throws IOException { + + SequenceFile.Reader fr = SWHUtils.getSequenceFileReader(fs, inputPath); + SequenceFile.Writer fw = SWHUtils.getSequenceFileWriter(fs, outputPath); + + // Create key and value objects to hold data + Text repoUrl = new Text(); + Text lastVisitData = new Text(); + + // Read key-value pairs from the SequenceFile and handle appropriately + while (fr.next(repoUrl, lastVisitData)) { + + String response = null; + try { + response = handleRecord(repoUrl.toString(), lastVisitData.toString(), archiveThresholdInDays); + } catch (java.text.ParseException e) { + log.error("Could not handle record with repo Url: {}", repoUrl.toString()); + throw new RuntimeException(e); + } + + // response is equal to null when no need for request + if (response != null) { + SWHUtils.appendToSequenceFile(fw, repoUrl.toString(), response); + } + + } + + // Close readers + fw.close(); + fr.close(); + } + + public static String handleRecord(String repoUrl, String lastVisitData, Integer archiveThresholdInDays) + throws IOException, java.text.ParseException { + + log.info("{ Key: {}, Value: {} }", repoUrl, lastVisitData); + + LastVisitData lastVisit = OBJECT_MAPPER.readValue(lastVisitData, LastVisitData.class); + + // a previous attempt for archival has been made, and repository URL was not found + // avoid performing the same archive request again + if (lastVisit.getStatus() != null && + lastVisit.getStatus().equals(SWHConstants.VISIT_STATUS_NOT_FOUND)) { + + log.info("Avoid request -- previous archive request returned NOT_FOUND"); + return null; + } + + // if we have last visit data + if (lastVisit.getSnapshot() != null) { + + String cleanDate = GraphCleaningFunctions.cleanDate(lastVisit.getDate()); + + // and the last visit date can be parsed + if (cleanDate != null) { + + SimpleDateFormat formatter = new SimpleDateFormat(ModelSupport.DATE_FORMAT); + Date lastVisitDate = formatter.parse(cleanDate); + + // OR last visit time < (now() - archiveThresholdInDays) + long diffInMillies = Math.abs((new Date()).getTime() - lastVisitDate.getTime()); + long diffInDays = TimeUnit.DAYS.convert(diffInMillies, TimeUnit.MILLISECONDS); + log.info("Date diff from now (in days): {}", diffInDays); + + // do not perform a request, if the last visit date is no older than $archiveThresholdInDays + if (archiveThresholdInDays >= diffInDays) { + log.info("Avoid request -- no older than {} days", archiveThresholdInDays); + return null; + } + } + } + + // ELSE perform an archive request + log.info("Perform archive request for: {}", repoUrl); + + // if last visit data are available, re-use version control type, + // else use the default one (i.e., git) + String visitType = Optional + .ofNullable(lastVisit.getType()) + .orElse(SWHConstants.DEFAULT_VISIT_TYPE); + + URL url = new URL(String.format(SWHConstants.SWH_ARCHIVE_URL, visitType, repoUrl.trim())); + + log.info("Sending archive request: {}", url); + + String response; + try { + response = swhConnection.call(url.toString()); + } catch (CollectorException e) { + log.error("Error in request: {}", url); + response = "{}"; + } + + return response; + } + +} diff --git a/dhp-workflows/dhp-swh/src/main/java/eu/dnetlib/dhp/swh/CollectLastVisitRepositoryData.java b/dhp-workflows/dhp-swh/src/main/java/eu/dnetlib/dhp/swh/CollectLastVisitRepositoryData.java new file mode 100644 index 000000000..ebb9176ff --- /dev/null +++ b/dhp-workflows/dhp-swh/src/main/java/eu/dnetlib/dhp/swh/CollectLastVisitRepositoryData.java @@ -0,0 +1,119 @@ + +package eu.dnetlib.dhp.swh; + +import static eu.dnetlib.dhp.utils.DHPUtils.getHadoopConfiguration; + +import java.io.BufferedReader; +import java.io.IOException; +import java.net.URL; + +import org.apache.commons.cli.ParseException; +import org.apache.commons.io.IOUtils; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.SequenceFile; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.common.collection.CollectorException; +import eu.dnetlib.dhp.common.collection.HttpClientParams; +import eu.dnetlib.dhp.swh.utils.SWHConnection; +import eu.dnetlib.dhp.swh.utils.SWHConstants; +import eu.dnetlib.dhp.swh.utils.SWHUtils; + +/** + * Given a file with software repository URLs, this class + * collects last visit data from the Software Heritage API. + * + * @author Serafeim Chatzopoulos + */ +public class CollectLastVisitRepositoryData { + + private static final Logger log = LoggerFactory.getLogger(CollectLastVisitRepositoryData.class); + private static SWHConnection swhConnection = null; + + public static void main(final String[] args) + throws IOException, ParseException { + final ArgumentApplicationParser argumentParser = new ArgumentApplicationParser( + IOUtils + .toString( + CollectLastVisitRepositoryData.class + .getResourceAsStream( + "/eu/dnetlib/dhp/swh/input_collect_last_visit_repository_data.json"))); + argumentParser.parseArgument(args); + + log.info("Java Xmx: {}m", Runtime.getRuntime().maxMemory() / (1024 * 1024)); + + final String hdfsuri = argumentParser.get("namenode"); + log.info("hdfsURI: {}", hdfsuri); + + final String inputPath = argumentParser.get("softwareCodeRepositoryURLs"); + log.info("inputPath: {}", inputPath); + + final String outputPath = argumentParser.get("lastVisitsPath"); + log.info("outputPath: {}", outputPath); + + final String apiAccessToken = argumentParser.get("apiAccessToken"); + log.info("apiAccessToken: {}", apiAccessToken); + + final HttpClientParams clientParams = SWHUtils.getClientParams(argumentParser); + + swhConnection = new SWHConnection(clientParams, apiAccessToken); + + final FileSystem fs = FileSystem.get(getHadoopConfiguration(hdfsuri)); + + collect(fs, inputPath, outputPath); + + fs.close(); + } + + private static void collect(FileSystem fs, String inputPath, String outputPath) + throws IOException { + + SequenceFile.Writer fw = SWHUtils.getSequenceFileWriter(fs, outputPath); + + // Specify the HDFS directory path you want to read + Path directoryPath = new Path(inputPath); + + // List all files in the directory + FileStatus[] partStatuses = fs.listStatus(directoryPath); + + for (FileStatus partStatus : partStatuses) { + + // Check if it's a file (not a directory) + if (partStatus.isFile()) { + handleFile(fs, partStatus.getPath(), fw); + } + + } + + fw.close(); + } + + private static void handleFile(FileSystem fs, Path partInputPath, SequenceFile.Writer fw) + throws IOException { + + BufferedReader br = SWHUtils.getFileReader(fs, partInputPath); + + String repoUrl; + while ((repoUrl = br.readLine()) != null) { + + URL url = new URL(String.format(SWHConstants.SWH_LATEST_VISIT_URL, repoUrl.trim())); + + String response; + try { + response = swhConnection.call(url.toString()); + } catch (CollectorException e) { + log.error("Error in request: {}", url); + response = "{}"; + } + + SWHUtils.appendToSequenceFile(fw, repoUrl, response); + } + + br.close(); + } + +} diff --git a/dhp-workflows/dhp-swh/src/main/java/eu/dnetlib/dhp/swh/CollectSoftwareRepositoryURLs.java b/dhp-workflows/dhp-swh/src/main/java/eu/dnetlib/dhp/swh/CollectSoftwareRepositoryURLs.java new file mode 100644 index 000000000..abd51bc5b --- /dev/null +++ b/dhp-workflows/dhp-swh/src/main/java/eu/dnetlib/dhp/swh/CollectSoftwareRepositoryURLs.java @@ -0,0 +1,93 @@ + +package eu.dnetlib.dhp.swh; + +import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkHiveSession; + +import java.util.Optional; + +import org.apache.commons.io.IOUtils; +import org.apache.spark.SparkConf; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SaveMode; +import org.apache.spark.sql.SparkSession; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.schema.oaf.Result; + +/** + * Collects unique software repository URLs in the Graph using Hive + * + * @author Serafeim Chatzopoulos + */ +public class CollectSoftwareRepositoryURLs { + + private static final Logger log = LoggerFactory.getLogger(CollectSoftwareRepositoryURLs.class); + + public static void main(String[] args) throws Exception { + + String jsonConfiguration = IOUtils + .toString( + CollectSoftwareRepositoryURLs.class + .getResourceAsStream("/eu/dnetlib/dhp/swh/input_collect_software_repository_urls.json")); + + final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration); + parser.parseArgument(args); + + final Boolean isSparkSessionManaged = Optional + .ofNullable(parser.get("isSparkSessionManaged")) + .map(Boolean::valueOf) + .orElse(Boolean.TRUE); + log.info("isSparkSessionManaged: {}", isSparkSessionManaged); + + final String hiveDbName = parser.get("hiveDbName"); + log.info("hiveDbName: {}", hiveDbName); + + final String outputPath = parser.get("softwareCodeRepositoryURLs"); + log.info("softwareCodeRepositoryURLs: {}", outputPath); + + final String hiveMetastoreUris = parser.get("hiveMetastoreUris"); + log.info("hiveMetastoreUris: {}", hiveMetastoreUris); + + final Integer softwareLimit = Integer.parseInt(parser.get("softwareLimit")); + log.info("softwareLimit: {}", softwareLimit); + + SparkConf conf = new SparkConf(); + conf.set("hive.metastore.uris", hiveMetastoreUris); + + runWithSparkHiveSession( + conf, + isSparkSessionManaged, + spark -> { + doRun(spark, hiveDbName, softwareLimit, outputPath); + }); + } + + private static void doRun(SparkSession spark, String hiveDbName, Integer limit, + String outputPath) { + + String queryTemplate = "SELECT distinct coderepositoryurl.value " + + "FROM %s.software " + + "WHERE coderepositoryurl.value IS NOT NULL " + + "AND datainfo.deletedbyinference = FALSE " + + "AND datainfo.invisible = FALSE "; + + if (limit != null) { + queryTemplate += String.format("LIMIT %s", limit); + } + + String query = String.format(queryTemplate, hiveDbName); + + log.info("Hive query to fetch software code URLs: {}", query); + + Dataset df = spark.sql(query); + + // write distinct repository URLs + df + .write() + .mode(SaveMode.Overwrite) + .csv(outputPath); + } +} diff --git a/dhp-workflows/dhp-swh/src/main/java/eu/dnetlib/dhp/swh/PrepareSWHActionsets.java b/dhp-workflows/dhp-swh/src/main/java/eu/dnetlib/dhp/swh/PrepareSWHActionsets.java new file mode 100644 index 000000000..2691d4b7e --- /dev/null +++ b/dhp-workflows/dhp-swh/src/main/java/eu/dnetlib/dhp/swh/PrepareSWHActionsets.java @@ -0,0 +1,185 @@ + +package eu.dnetlib.dhp.swh; + +import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; +import static org.apache.spark.sql.functions.col; + +import java.io.Serializable; +import java.util.Arrays; +import java.util.List; +import java.util.Optional; + +import org.apache.commons.io.IOUtils; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.compress.GzipCodec; +import org.apache.hadoop.mapred.SequenceFileOutputFormat; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.JavaPairRDD; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.api.java.function.MapFunction; +import org.apache.spark.sql.*; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.types.DataTypes; +import org.apache.spark.sql.types.StructField; +import org.apache.spark.sql.types.StructType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.fasterxml.jackson.databind.ObjectMapper; + +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.schema.action.AtomicAction; +import eu.dnetlib.dhp.schema.common.ModelConstants; +import eu.dnetlib.dhp.schema.oaf.*; +import eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils; +import eu.dnetlib.dhp.swh.models.LastVisitData; +import eu.dnetlib.dhp.swh.utils.SWHConstants; +import scala.Tuple2; + +/** + * Creates action sets for Software Heritage data + * + * @author Serafeim Chatzopoulos + */ +public class PrepareSWHActionsets { + + private static final Logger log = LoggerFactory.getLogger(PrepareSWHActionsets.class); + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + + public static void main(String[] args) throws Exception { + + String jsonConfiguration = IOUtils + .toString( + PrepareSWHActionsets.class + .getResourceAsStream( + "/eu/dnetlib/dhp/swh/input_prepare_swh_actionsets.json")); + + final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration); + parser.parseArgument(args); + + final Boolean isSparkSessionManaged = Optional + .ofNullable(parser.get("isSparkSessionManaged")) + .map(Boolean::valueOf) + .orElse(Boolean.TRUE); + log.info("isSparkSessionManaged: {}", isSparkSessionManaged); + + final String inputPath = parser.get("lastVisitsPath"); + log.info("inputPath: {}", inputPath); + + final String softwareInputPath = parser.get("softwareInputPath"); + log.info("softwareInputPath: {}", softwareInputPath); + + final String outputPath = parser.get("actionsetsPath"); + log.info("outputPath: {}", outputPath); + + SparkConf conf = new SparkConf(); + + runWithSparkSession( + conf, + isSparkSessionManaged, + spark -> { + JavaPairRDD softwareRDD = prepareActionsets(spark, inputPath, softwareInputPath); + softwareRDD + .saveAsHadoopFile( + outputPath, Text.class, Text.class, SequenceFileOutputFormat.class, GzipCodec.class); + }); + } + + private static Dataset loadSWHData(SparkSession spark, String inputPath) { + + JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); + + // read from file and transform to tuples + // Note: snapshot id is the SWH id for us + JavaRDD swhRDD = sc + .sequenceFile(inputPath, Text.class, Text.class) + .map(t -> t._2().toString()) + .map(t -> OBJECT_MAPPER.readValue(t, LastVisitData.class)) + .filter(t -> t.getOrigin() != null && t.getSnapshot() != null) // response from SWH API is empty if repo URL + // was not found + .map(item -> RowFactory.create(item.getOrigin(), item.getSnapshot())); + + // convert RDD to 2-column DF + List fields = Arrays + .asList( + DataTypes.createStructField("repoUrl", DataTypes.StringType, true), + DataTypes.createStructField("swhId", DataTypes.StringType, true)); + StructType schema = DataTypes.createStructType(fields); + + return spark.createDataFrame(swhRDD, schema); + } + + private static Dataset loadGraphSoftwareData(SparkSession spark, String softwareInputPath) { + return spark + .read() + .textFile(softwareInputPath) + .map( + (MapFunction) t -> OBJECT_MAPPER.readValue(t, Software.class), + Encoders.bean(Software.class)) + .filter(t -> t.getCodeRepositoryUrl() != null) + .select(col("id"), col("codeRepositoryUrl.value").as("repoUrl")); + } + + private static JavaPairRDD prepareActionsets(SparkSession spark, String inputPath, + String softwareInputPath) { + + Dataset swhDF = loadSWHData(spark, inputPath); +// swhDF.show(false); + + Dataset graphSoftwareDF = loadGraphSoftwareData(spark, softwareInputPath); +// graphSoftwareDF.show(5); + + Dataset joinedDF = graphSoftwareDF.join(swhDF, "repoUrl").select("id", "swhid"); +// joinedDF.show(false); + + return joinedDF.map((MapFunction) row -> { + + Software s = new Software(); + + // set openaire id + s.setId(row.getString(row.fieldIndex("id"))); + + // set swh id + Qualifier qualifier = OafMapperUtils + .qualifier( + SWHConstants.SWHID, + SWHConstants.SWHID_CLASSNAME, + ModelConstants.DNET_PID_TYPES, + ModelConstants.DNET_PID_TYPES); + + DataInfo dataInfo = OafMapperUtils + .dataInfo( + false, + null, + false, + false, + ModelConstants.PROVENANCE_ACTION_SET_QUALIFIER, + ""); + + s + .setPid( + Arrays + .asList( + OafMapperUtils + .structuredProperty( + String.format("swh:1:snp:%s", row.getString(row.fieldIndex("swhid"))), + qualifier, + dataInfo))); + + // add SWH in the `collectedFrom` field + KeyValue kv = new KeyValue(); + kv.setKey(SWHConstants.SWH_ID); + kv.setValue(SWHConstants.SWH_NAME); + + s.setCollectedfrom(Arrays.asList(kv)); + + return s; + }, Encoders.bean(Software.class)) + .toJavaRDD() + .map(p -> new AtomicAction(Software.class, p)) + .mapToPair( + aa -> new Tuple2<>(new Text(aa.getClazz().getCanonicalName()), + new Text(OBJECT_MAPPER.writeValueAsString(aa)))); + } +} diff --git a/dhp-workflows/dhp-swh/src/main/java/eu/dnetlib/dhp/swh/models/LastVisitData.java b/dhp-workflows/dhp-swh/src/main/java/eu/dnetlib/dhp/swh/models/LastVisitData.java new file mode 100644 index 000000000..5e705716c --- /dev/null +++ b/dhp-workflows/dhp-swh/src/main/java/eu/dnetlib/dhp/swh/models/LastVisitData.java @@ -0,0 +1,71 @@ + +package eu.dnetlib.dhp.swh.models; + +import java.io.Serializable; + +import com.cloudera.com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; + +@JsonIgnoreProperties(ignoreUnknown = true) +public class LastVisitData implements Serializable { + + private String origin; + private String type; + private String date; + + @JsonProperty("snapshot") + private String snapshotId; + + private String status; + + public String getType() { + return type; + } + + public void setType(String type) { + this.type = type; + } + + public String getDate() { + return date; + } + + public void setDate(String date) { + this.date = date; + } + + public String getSnapshot() { + return snapshotId; + } + + public void setSnapshot(String snapshotId) { + this.snapshotId = snapshotId; + } + + public String getStatus() { + return status; + } + + public void setStatus(String status) { + this.status = status; + } + + public String getOrigin() { + return origin; + } + + public void setOrigin(String origin) { + this.origin = origin; + } + + @Override + public String toString() { + return "LastVisitData{" + + "origin='" + origin + '\'' + + ", type='" + type + '\'' + + ", date='" + date + '\'' + + ", snapshotId='" + snapshotId + '\'' + + ", status='" + status + '\'' + + '}'; + } +} diff --git a/dhp-workflows/dhp-swh/src/main/java/eu/dnetlib/dhp/swh/utils/SWHConnection.java b/dhp-workflows/dhp-swh/src/main/java/eu/dnetlib/dhp/swh/utils/SWHConnection.java new file mode 100644 index 000000000..80249e816 --- /dev/null +++ b/dhp-workflows/dhp-swh/src/main/java/eu/dnetlib/dhp/swh/utils/SWHConnection.java @@ -0,0 +1,40 @@ + +package eu.dnetlib.dhp.swh.utils; + +import java.util.HashMap; +import java.util.Map; + +import org.apache.http.HttpHeaders; + +import eu.dnetlib.dhp.common.collection.CollectorException; +import eu.dnetlib.dhp.common.collection.HttpClientParams; +import eu.dnetlib.dhp.common.collection.HttpConnector2; + +public class SWHConnection { + + HttpConnector2 conn; + + public SWHConnection(HttpClientParams clientParams, String accessToken) { + + // set custom headers + Map headers = new HashMap() { + { + put(HttpHeaders.ACCEPT, "application/json"); + if (accessToken != null) { + put(HttpHeaders.AUTHORIZATION, String.format("Bearer %s", accessToken)); + } + } + }; + + clientParams.setHeaders(headers); + + // create http connector + conn = new HttpConnector2(clientParams); + + } + + public String call(String url) throws CollectorException { + return conn.getInputSource(url); + } + +} diff --git a/dhp-workflows/dhp-swh/src/main/java/eu/dnetlib/dhp/swh/utils/SWHConstants.java b/dhp-workflows/dhp-swh/src/main/java/eu/dnetlib/dhp/swh/utils/SWHConstants.java new file mode 100644 index 000000000..eae839cfd --- /dev/null +++ b/dhp-workflows/dhp-swh/src/main/java/eu/dnetlib/dhp/swh/utils/SWHConstants.java @@ -0,0 +1,21 @@ + +package eu.dnetlib.dhp.swh.utils; + +public class SWHConstants { + public static final String SWH_LATEST_VISIT_URL = "https://archive.softwareheritage.org/api/1/origin/%s/visit/latest/"; + + public static final String SWH_ARCHIVE_URL = "https://archive.softwareheritage.org/api/1/origin/save/%s/url/%s/"; + + public static final String DEFAULT_VISIT_TYPE = "git"; + + public static final String VISIT_STATUS_NOT_FOUND = "not_found"; + + public static final String SWHID = "swhid"; + + public static final String SWHID_CLASSNAME = "Software Heritage Identifier"; + + public static final String SWH_ID = "10|openaire____::dbfd07503aaa1ed31beed7dec942f3f4"; + + public static final String SWH_NAME = "Software Heritage"; + +} diff --git a/dhp-workflows/dhp-swh/src/main/java/eu/dnetlib/dhp/swh/utils/SWHUtils.java b/dhp-workflows/dhp-swh/src/main/java/eu/dnetlib/dhp/swh/utils/SWHUtils.java new file mode 100644 index 000000000..405ce51e4 --- /dev/null +++ b/dhp-workflows/dhp-swh/src/main/java/eu/dnetlib/dhp/swh/utils/SWHUtils.java @@ -0,0 +1,95 @@ + +package eu.dnetlib.dhp.swh.utils; + +import static eu.dnetlib.dhp.common.Constants.*; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; +import java.nio.charset.StandardCharsets; +import java.util.Optional; + +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.io.Text; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.common.collection.HttpClientParams; + +public class SWHUtils { + + private static final Logger log = LoggerFactory.getLogger(SWHUtils.class); + + public static HttpClientParams getClientParams(ArgumentApplicationParser argumentParser) { + + final HttpClientParams clientParams = new HttpClientParams(); + clientParams + .setMaxNumberOfRetry( + Optional + .ofNullable(argumentParser.get(MAX_NUMBER_OF_RETRY)) + .map(Integer::parseInt) + .orElse(HttpClientParams._maxNumberOfRetry)); + log.info("maxNumberOfRetry is {}", clientParams.getMaxNumberOfRetry()); + + clientParams + .setRequestDelay( + Optional + .ofNullable(argumentParser.get(REQUEST_DELAY)) + .map(Integer::parseInt) + .orElse(HttpClientParams._requestDelay)); + log.info("requestDelay is {}", clientParams.getRequestDelay()); + + clientParams + .setRetryDelay( + Optional + .ofNullable(argumentParser.get(RETRY_DELAY)) + .map(Integer::parseInt) + .orElse(HttpClientParams._retryDelay)); + log.info("retryDelay is {}", clientParams.getRetryDelay()); + + clientParams + .setRequestMethod( + Optional + .ofNullable(argumentParser.get(REQUEST_METHOD)) + .orElse(HttpClientParams._requestMethod)); + log.info("requestMethod is {}", clientParams.getRequestMethod()); + + return clientParams; + } + + public static BufferedReader getFileReader(FileSystem fs, Path inputPath) throws IOException { + FSDataInputStream inputStream = fs.open(inputPath); + return new BufferedReader( + new InputStreamReader(inputStream, StandardCharsets.UTF_8)); + } + + public static SequenceFile.Writer getSequenceFileWriter(FileSystem fs, String outputPath) throws IOException { + return SequenceFile + .createWriter( + fs.getConf(), + SequenceFile.Writer.file(new Path(outputPath)), + SequenceFile.Writer.keyClass(Text.class), + SequenceFile.Writer.valueClass(Text.class)); + } + + public static SequenceFile.Reader getSequenceFileReader(FileSystem fs, String inputPath) throws IOException { + Path filePath = new Path(inputPath); + SequenceFile.Reader.Option fileOption = SequenceFile.Reader.file(filePath); + + return new SequenceFile.Reader(fs.getConf(), fileOption); + } + + public static void appendToSequenceFile(SequenceFile.Writer fw, String keyStr, String valueStr) throws IOException { + Text key = new Text(); + key.set(keyStr); + + Text value = new Text(); + value.set(valueStr); + + fw.append(key, value); + } +} diff --git a/dhp-workflows/dhp-swh/src/main/resources/eu/dnetlib/dhp/swh/input_archive_repository_urls.json b/dhp-workflows/dhp-swh/src/main/resources/eu/dnetlib/dhp/swh/input_archive_repository_urls.json new file mode 100644 index 000000000..e8671f71b --- /dev/null +++ b/dhp-workflows/dhp-swh/src/main/resources/eu/dnetlib/dhp/swh/input_archive_repository_urls.json @@ -0,0 +1,56 @@ +[ + { + "paramName": "n", + "paramLongName": "namenode", + "paramDescription": "the Name Node URI", + "paramRequired": true + }, + { + "paramName": "lv", + "paramLongName": "lastVisitsPath", + "paramDescription": "the URL where to store last visits data", + "paramRequired": true + }, + { + "paramName": "arp", + "paramLongName": "archiveRequestsPath", + "paramDescription": "the URL where to store the responses of the archive requests", + "paramRequired": true + }, + { + "paramName": "mnr", + "paramLongName": "maxNumberOfRetry", + "paramDescription": "the maximum number of admitted connection retries", + "paramRequired": false + }, + { + "paramName": "rqd", + "paramLongName": "requestDelay", + "paramDescription": "the delay (ms) between requests", + "paramRequired": false + }, + { + "paramName": "rtd", + "paramLongName": "retryDelay", + "paramDescription": "the delay (ms) between retries", + "paramRequired": false + }, + { + "paramName": "rm", + "paramLongName": "requestMethod", + "paramDescription": "the method of the requests to perform", + "paramRequired": false + }, + { + "paramName": "atid", + "paramLongName": "archiveThresholdInDays", + "paramDescription": "the thershold (in days) required to issue an archive request", + "paramRequired": false + }, + { + "paramName": "aat", + "paramLongName": "apiAccessToken", + "paramDescription": "the API access token of the SWH API", + "paramRequired": false + } +] \ No newline at end of file diff --git a/dhp-workflows/dhp-swh/src/main/resources/eu/dnetlib/dhp/swh/input_collect_last_visit_repository_data.json b/dhp-workflows/dhp-swh/src/main/resources/eu/dnetlib/dhp/swh/input_collect_last_visit_repository_data.json new file mode 100644 index 000000000..662582dfe --- /dev/null +++ b/dhp-workflows/dhp-swh/src/main/resources/eu/dnetlib/dhp/swh/input_collect_last_visit_repository_data.json @@ -0,0 +1,50 @@ +[ + { + "paramName": "n", + "paramLongName": "namenode", + "paramDescription": "the Name Node URI", + "paramRequired": true + }, + { + "paramName": "scr", + "paramLongName": "softwareCodeRepositoryURLs", + "paramDescription": "the URL from where to read software repository URLs", + "paramRequired": true + }, + { + "paramName": "lv", + "paramLongName": "lastVisitsPath", + "paramDescription": "the URL where to store last visits data", + "paramRequired": true + }, + { + "paramName": "mnr", + "paramLongName": "maxNumberOfRetry", + "paramDescription": "the maximum number of admitted connection retries", + "paramRequired": false + }, + { + "paramName": "rqd", + "paramLongName": "requestDelay", + "paramDescription": "the delay (ms) between requests", + "paramRequired": false + }, + { + "paramName": "rtd", + "paramLongName": "retryDelay", + "paramDescription": "the delay (ms) between retries", + "paramRequired": false + }, + { + "paramName": "rm", + "paramLongName": "requestMethod", + "paramDescription": "the method of the requests to perform", + "paramRequired": false + }, + { + "paramName": "aat", + "paramLongName": "apiAccessToken", + "paramDescription": "the API access token of the SWH API", + "paramRequired": false + } +] \ No newline at end of file diff --git a/dhp-workflows/dhp-swh/src/main/resources/eu/dnetlib/dhp/swh/input_collect_software_repository_urls.json b/dhp-workflows/dhp-swh/src/main/resources/eu/dnetlib/dhp/swh/input_collect_software_repository_urls.json new file mode 100644 index 000000000..4459fe9df --- /dev/null +++ b/dhp-workflows/dhp-swh/src/main/resources/eu/dnetlib/dhp/swh/input_collect_software_repository_urls.json @@ -0,0 +1,32 @@ +[ + { + "paramName": "issm", + "paramLongName": "isSparkSessionManaged", + "paramDescription": "when true will stop SparkSession after job execution", + "paramRequired": false + }, + { + "paramName": "scr", + "paramLongName": "softwareCodeRepositoryURLs", + "paramDescription": "the URL where to store software repository URLs", + "paramRequired": true + }, + { + "paramName": "db", + "paramLongName": "hiveDbName", + "paramDescription": "the target hive database name", + "paramRequired": true + }, + { + "paramName": "hmu", + "paramLongName": "hiveMetastoreUris", + "paramDescription": "the hive metastore uris", + "paramRequired": true + }, + { + "paramName": "slim", + "paramLongName": "softwareLimit", + "paramDescription": "limit on the number of software repo URL to fetch", + "paramRequired": false + } +] \ No newline at end of file diff --git a/dhp-workflows/dhp-swh/src/main/resources/eu/dnetlib/dhp/swh/input_prepare_swh_actionsets.json b/dhp-workflows/dhp-swh/src/main/resources/eu/dnetlib/dhp/swh/input_prepare_swh_actionsets.json new file mode 100644 index 000000000..07ab0b1f4 --- /dev/null +++ b/dhp-workflows/dhp-swh/src/main/resources/eu/dnetlib/dhp/swh/input_prepare_swh_actionsets.json @@ -0,0 +1,26 @@ +[ + { + "paramName": "issm", + "paramLongName": "isSparkSessionManaged", + "paramDescription": "when true will stop SparkSession after job execution", + "paramRequired": false + }, + { + "paramName": "lv", + "paramLongName": "lastVisitsPath", + "paramDescription": "the URL where to store last visits data", + "paramRequired": true + }, + { + "paramName": "ap", + "paramLongName": "actionsetsPath", + "paramDescription": "the URL path where to store actionsets", + "paramRequired": true + }, + { + "paramName": "sip", + "paramLongName": "softwareInputPath", + "paramDescription": "the URL path of the software in the graph", + "paramRequired": true + } +] \ No newline at end of file diff --git a/dhp-workflows/dhp-swh/src/main/resources/eu/dnetlib/dhp/swh/job.properties b/dhp-workflows/dhp-swh/src/main/resources/eu/dnetlib/dhp/swh/job.properties new file mode 100644 index 000000000..35c068286 --- /dev/null +++ b/dhp-workflows/dhp-swh/src/main/resources/eu/dnetlib/dhp/swh/job.properties @@ -0,0 +1,19 @@ +# hive +hiveDbName=openaire_prod_20230914 + +# input/output files +softwareCodeRepositoryURLs=${workingDir}/1_code_repo_urls.csv +lastVisitsPath=${workingDir}/2_last_visits.seq +archiveRequestsPath=${workingDir}/3_archive_requests.seq +actionsetsPath=${workingDir}/4_actionsets +graphPath=/tmp/prod_provision/graph/18_graph_blacklisted + +apiAccessToken=eyJhbGciOiJIUzI1NiIsInR5cCIgOiAiSldUIiwia2lkIiA6ICJhMTMxYTQ1My1hM2IyLTQwMTUtODQ2Ny05MzAyZjk3MTFkOGEifQ.eyJpYXQiOjE2OTQ2MzYwMjAsImp0aSI6IjkwZjdkNTNjLTQ5YTktNGFiMy1hY2E0LTcwMTViMjEyZTNjNiIsImlzcyI6Imh0dHBzOi8vYXV0aC5zb2Z0d2FyZWhlcml0YWdlLm9yZy9hdXRoL3JlYWxtcy9Tb2Z0d2FyZUhlcml0YWdlIiwiYXVkIjoiaHR0cHM6Ly9hdXRoLnNvZnR3YXJlaGVyaXRhZ2Uub3JnL2F1dGgvcmVhbG1zL1NvZnR3YXJlSGVyaXRhZ2UiLCJzdWIiOiIzMTY5OWZkNC0xNmE0LTQxOWItYTdhMi00NjI5MDY4ZjI3OWEiLCJ0eXAiOiJPZmZsaW5lIiwiYXpwIjoic3doLXdlYiIsInNlc3Npb25fc3RhdGUiOiIzMjYzMzEwMS00ZDRkLTQwMjItODU2NC1iMzNlMTJiNTE3ZDkiLCJzY29wZSI6Im9wZW5pZCBvZmZsaW5lX2FjY2VzcyBwcm9maWxlIGVtYWlsIn0.XHj1VIZu1dZ4Ej32-oU84mFmaox9cLNjXosNxwZM0Xs + +maxNumberOfRetry=2 +retryDelay=1 +requestDelay=100 + +softwareLimit=500 + +resume=collect-software-repository-urls diff --git a/dhp-workflows/dhp-swh/src/main/resources/eu/dnetlib/dhp/swh/oozie_app/config-default.xml b/dhp-workflows/dhp-swh/src/main/resources/eu/dnetlib/dhp/swh/oozie_app/config-default.xml new file mode 100644 index 000000000..3e45a53fa --- /dev/null +++ b/dhp-workflows/dhp-swh/src/main/resources/eu/dnetlib/dhp/swh/oozie_app/config-default.xml @@ -0,0 +1,54 @@ + + + jobTracker + yarnRM + + + nameNode + hdfs://nameservice1 + + + oozie.use.system.libpath + true + + + oozie.action.sharelib.for.spark + spark2 + + + hiveMetastoreUris + thrift://iis-cdh5-test-m3.ocean.icm.edu.pl:9083 + + + spark2YarnHistoryServerAddress + http://iis-cdh5-test-gw.ocean.icm.edu.pl:18089 + + + spark2EventLogDir + /user/spark/spark2ApplicationHistory + + + spark2ExtraListeners + "com.cloudera.spark.lineage.NavigatorAppListener" + + + spark2SqlQueryExecutionListeners + "com.cloudera.spark.lineage.NavigatorQueryListener" + + + oozieActionShareLibForSpark2 + spark2 + + + resourceManager + http://iis-cdh5-test-m2.ocean.icm.edu.pl:8088/cluster + + + oozie.launcher.mapreduce.user.classpath.first + true + + + sparkSqlWarehouseDir + /user/hive/warehouse + + \ No newline at end of file diff --git a/dhp-workflows/dhp-swh/src/main/resources/eu/dnetlib/dhp/swh/oozie_app/workflow.xml b/dhp-workflows/dhp-swh/src/main/resources/eu/dnetlib/dhp/swh/oozie_app/workflow.xml new file mode 100644 index 000000000..c625fcb5b --- /dev/null +++ b/dhp-workflows/dhp-swh/src/main/resources/eu/dnetlib/dhp/swh/oozie_app/workflow.xml @@ -0,0 +1,183 @@ + + + + + + hiveDbName + The name of the Hive DB to be used + + + softwareCodeRepositoryURLs + The path in the HDFS to save the software repository URLs + + + lastVisitsPath + The path in the HDFS to save the responses of the last visit requests + + + archiveRequestsPath + The path in the HDFS to save the responses of the archive requests + + + actionsetsPath + The path in the HDFS to save the action sets + + + graphPath + The path in the HDFS to the base folder of the graph + + + maxNumberOfRetry + Max number of retries for failed API calls + + + retryDelay + Retry delay for failed requests (in sec) + + + requestDelay + Delay between API requests (in ms) + + + apiAccessToken + The API Key of the SWH API + + + softwareLimit + Limit on the number of repo URLs to use (Optional); for debug purposes + + + resumeFrom + Variable that indicates the step to start from + + + + + + ${jobTracker} + ${nameNode} + + + oozie.action.sharelib.for.spark + ${oozieActionShareLibForSpark2} + + + actionsetsPath + ${actionsetsPath} + + + apiAccessToken + ${apiAccessToken} + + + + + + + + Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] + + + + + ${wf:conf('resumeFrom') eq 'collect-software-repository-urls'} + ${wf:conf('resumeFrom') eq 'create-swh-actionsets'} + + + + + + + yarn + cluster + Collect software repository URLs + eu.dnetlib.dhp.swh.CollectSoftwareRepositoryURLs + dhp-swh-${projectVersion}.jar + + --executor-memory=${sparkExecutorMemory} + --executor-cores=${sparkExecutorCores} + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir} + + + --softwareCodeRepositoryURLs${softwareCodeRepositoryURLs} + --hiveDbName${hiveDbName} + --hiveMetastoreUris${hiveMetastoreUris} + --softwareLimit${softwareLimit} + + + + + + + + eu.dnetlib.dhp.swh.CollectLastVisitRepositoryData + + --namenode${nameNode} + --softwareCodeRepositoryURLs${softwareCodeRepositoryURLs} + --lastVisitsPath${lastVisitsPath} + + --maxNumberOfRetry${maxNumberOfRetry} + --requestDelay${requestDelay} + --retryDelay${retryDelay} + --requestMethodGET + --apiAccessToken${apiAccessToken} + + + + + + + + + eu.dnetlib.dhp.swh.ArchiveRepositoryURLs + + --namenode${nameNode} + --lastVisitsPath${lastVisitsPath} + --archiveRequestsPath${archiveRequestsPath} + --archiveThresholdInDays365 + + --maxNumberOfRetry${maxNumberOfRetry} + --requestDelay${requestDelay} + --retryDelay${retryDelay} + --requestMethodPOST + --apiAccessToken${apiAccessToken} + + + + + + + + + yarn + cluster + Create actionsets for SWH data + eu.dnetlib.dhp.swh.PrepareSWHActionsets + dhp-swh-${projectVersion}.jar + + --executor-memory=${sparkExecutorMemory} + --executor-cores=${sparkExecutorCores} + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir} + + + --lastVisitsPath${lastVisitsPath} + --actionsetsPath${actionsetsPath} + --softwareInputPath${graphPath}/software + + + + + + + + \ No newline at end of file diff --git a/dhp-workflows/dhp-swh/src/test/java/eu/dnetlib/dhp/swh/ArchiveRepositoryURLsTest.java b/dhp-workflows/dhp-swh/src/test/java/eu/dnetlib/dhp/swh/ArchiveRepositoryURLsTest.java new file mode 100644 index 000000000..e069e9655 --- /dev/null +++ b/dhp-workflows/dhp-swh/src/test/java/eu/dnetlib/dhp/swh/ArchiveRepositoryURLsTest.java @@ -0,0 +1,38 @@ + +package eu.dnetlib.dhp.swh; + +import java.io.BufferedReader; +import java.io.File; +import java.io.FileReader; +import java.io.IOException; +import java.text.ParseException; +import java.util.Arrays; + +import org.apache.hadoop.fs.FileSystem; +import org.junit.jupiter.api.Test; + +import eu.dnetlib.dhp.swh.utils.SWHUtils; + +public class ArchiveRepositoryURLsTest { + + @Test + void testArchive() throws IOException, ParseException { + String inputPath = getClass() + .getResource("/eu/dnetlib/dhp/swh/lastVisitDataToArchive.csv") + .getPath(); + + File file = new File(inputPath); + FileReader fr = new FileReader(file); + BufferedReader br = new BufferedReader(fr); // creates a buffering character input stream + + String line; + while ((line = br.readLine()) != null) { + String[] tokens = line.split("\t"); + + String response = ArchiveRepositoryURLs.handleRecord(tokens[0], tokens[1], 365); + System.out.println(tokens[0] + "\t" + response); + System.out.println(); + } + fr.close(); + } +} diff --git a/dhp-workflows/dhp-swh/src/test/java/eu/dnetlib/dhp/swh/PrepareSWHActionsetsTest.java b/dhp-workflows/dhp-swh/src/test/java/eu/dnetlib/dhp/swh/PrepareSWHActionsetsTest.java new file mode 100644 index 000000000..ffcb7aaee --- /dev/null +++ b/dhp-workflows/dhp-swh/src/test/java/eu/dnetlib/dhp/swh/PrepareSWHActionsetsTest.java @@ -0,0 +1,97 @@ + +package eu.dnetlib.dhp.swh; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; + +import org.apache.commons.io.FileUtils; +import org.apache.hadoop.io.Text; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SparkSession; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.fasterxml.jackson.databind.ObjectMapper; + +import eu.dnetlib.dhp.schema.action.AtomicAction; +import eu.dnetlib.dhp.schema.common.ModelConstants; +import eu.dnetlib.dhp.schema.oaf.Relation; +import eu.dnetlib.dhp.schema.oaf.utils.CleaningFunctions; +import eu.dnetlib.dhp.schema.oaf.utils.IdentifierFactory; + +public class PrepareSWHActionsetsTest { + + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + + private static SparkSession spark; + + private static Path workingDir; + + private static final Logger log = LoggerFactory + .getLogger(PrepareSWHActionsetsTest.class); + + @BeforeAll + public static void beforeAll() throws IOException { + workingDir = Files.createTempDirectory(PrepareSWHActionsetsTest.class.getSimpleName()); + + log.info("Using work dir {}", workingDir); + + SparkConf conf = new SparkConf(); + conf.setAppName(PrepareSWHActionsetsTest.class.getSimpleName()); + + conf.setMaster("local[*]"); + conf.set("spark.driver.host", "localhost"); + conf.set("hive.metastore.local", "true"); + conf.set("spark.ui.enabled", "false"); + conf.set("spark.sql.warehouse.dir", workingDir.toString()); + conf.set("hive.metastore.warehouse.dir", workingDir.resolve("warehouse").toString()); + + spark = SparkSession + .builder() + .appName(PrepareSWHActionsetsTest.class.getSimpleName()) + .config(conf) + .getOrCreate(); + } + + @AfterAll + public static void afterAll() throws IOException { + FileUtils.deleteDirectory(workingDir.toFile()); + spark.stop(); + } + + @Test + void testRun() throws Exception { + + String lastVisitsPath = getClass() + .getResource("/eu/dnetlib/dhp/swh/last_visits_data.seq") + .getPath(); + + String outputPath = workingDir.toString() + "/actionSet"; + + String softwareInputPath = getClass() + .getResource("/eu/dnetlib/dhp/swh/software.json.gz") + .getPath(); + + PrepareSWHActionsets + .main( + new String[] { + "-isSparkSessionManaged", Boolean.FALSE.toString(), + "-lastVisitsPath", lastVisitsPath, + "-softwareInputPath", softwareInputPath, + "-actionsetsPath", outputPath + }); + + } +} diff --git a/dhp-workflows/dhp-swh/src/test/java/eu/dnetlib/dhp/swh/SWHConnectionTest.java b/dhp-workflows/dhp-swh/src/test/java/eu/dnetlib/dhp/swh/SWHConnectionTest.java new file mode 100644 index 000000000..b19e0e7ac --- /dev/null +++ b/dhp-workflows/dhp-swh/src/test/java/eu/dnetlib/dhp/swh/SWHConnectionTest.java @@ -0,0 +1,58 @@ + +package eu.dnetlib.dhp.swh; + +import java.io.IOException; +import java.net.MalformedURLException; +import java.net.URL; + +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import eu.dnetlib.dhp.common.collection.CollectorException; +import eu.dnetlib.dhp.common.collection.HttpClientParams; +import eu.dnetlib.dhp.swh.utils.SWHConnection; +import eu.dnetlib.dhp.swh.utils.SWHConstants; + +//import org.apache.hadoop.hdfs.MiniDFSCluster; + +public class SWHConnectionTest { + private static final Logger log = LoggerFactory.getLogger(SWHConnectionTest.class); + + @Test + void testGetCall() throws IOException { + + HttpClientParams clientParams = new HttpClientParams(); + clientParams.setRequestMethod("GET"); + + SWHConnection swhConnection = new SWHConnection(clientParams, null); + + String repoUrl = "https://github.com/stanford-futuredata/FAST"; + URL url = new URL(String.format(SWHConstants.SWH_LATEST_VISIT_URL, repoUrl)); + String response = null; + try { + response = swhConnection.call(url.toString()); + } catch (CollectorException e) { + System.out.println("Error in request: " + url); + } + System.out.println(response); + } + + @Test + void testPostCall() throws MalformedURLException { + HttpClientParams clientParams = new HttpClientParams(); + clientParams.setRequestMethod("POST"); + + SWHConnection swhConnection = new SWHConnection(clientParams, null); + + String repoUrl = "https://github.com/stanford-futuredata/FAST"; + URL url = new URL(String.format(SWHConstants.SWH_ARCHIVE_URL, SWHConstants.DEFAULT_VISIT_TYPE, repoUrl)); + String response = null; + try { + response = swhConnection.call(url.toString()); + } catch (CollectorException e) { + System.out.println("Error in request: " + url); + } + System.out.println(response); + } +} diff --git a/dhp-workflows/dhp-swh/src/test/resources/eu/dnetlib/dhp/swh/lastVisitDataToArchive.csv b/dhp-workflows/dhp-swh/src/test/resources/eu/dnetlib/dhp/swh/lastVisitDataToArchive.csv new file mode 100644 index 000000000..568ccf482 --- /dev/null +++ b/dhp-workflows/dhp-swh/src/test/resources/eu/dnetlib/dhp/swh/lastVisitDataToArchive.csv @@ -0,0 +1,7 @@ +https://bitbucket.org/samskillman/yt-stokes {"origin":"https://bitbucket.org/samskillman/yt-stokes","visit":43,"date":"2021-09-13T21:59:27.125171+00:00","status":"failed","snapshot":null,"type":"hg","metadata":{},"origin_url":"https://archive.softwareheritage.org/api/1/origin/https://bitbucket.org/samskillman/yt-stokes/get/","snapshot_url":null} +https://github.com/bioinsilico/BIPSPI {"origin":"https://github.com/bioinsilico/BIPSPI","visit":1,"date":"2020-03-18T14:50:21.541822+00:00","status":"full","snapshot":"c6c69d2cd73ce89811448da5f031611df6f63bdb","type":"git","metadata":{},"origin_url":"https://archive.softwareheritage.org/api/1/origin/https://github.com/bioinsilico/BIPSPI/get/","snapshot_url":"https://archive.softwareheritage.org/api/1/snapshot/c6c69d2cd73ce89811448da5f031611df6f63bdb/"} +https://github.com/mloop/kdiff-type1-error-rate/blob/master/analysis/simulation.R {} +https://github.com/schwanbeck/YSMR {"origin":"https://github.com/schwanbeck/YSMR","visit":6,"date":"2023-08-02T15:25:02.650676+00:00","status":"full","snapshot":"a9d1c5f0bca2def198b89f65bc9f7da3be8439ed","type":"git","metadata":{},"origin_url":"https://archive.softwareheritage.org/api/1/origin/https://github.com/schwanbeck/YSMR/get/","snapshot_url":"https://archive.softwareheritage.org/api/1/snapshot/a9d1c5f0bca2def198b89f65bc9f7da3be8439ed/"} +https://github.com/lvclark/TASSELGBS_combine {"origin":"https://github.com/lvclark/TASSELGBS_combine","visit":1,"date":"2020-04-12T20:44:09.405589+00:00","status":"full","snapshot":"ffa6fefd3f5becefbea9fe0e6d5d93859c95c071","type":"git","metadata":{},"origin_url":"https://archive.softwareheritage.org/api/1/origin/https://github.com/lvclark/TASSELGBS_combine/get/","snapshot_url":"https://archive.softwareheritage.org/api/1/snapshot/ffa6fefd3f5becefbea9fe0e6d5d93859c95c071/"} +https://github.com/PRIDE-Toolsuite/inspector-example-files {"origin":"https://github.com/PRIDE-Toolsuite/inspector-example-files","visit":12,"date":"2021-01-25T08:54:13.394674+00:00","status":"full","snapshot":"0b56eb0ad07cf778df6dabefc4b73636e0ae8b37","type":"git","metadata":{},"origin_url":"https://archive.softwareheritage.org/api/1/origin/https://github.com/PRIDE-Toolsuite/inspector-example-files/get/","snapshot_url":"https://archive.softwareheritage.org/api/1/snapshot/0b56eb0ad07cf778df6dabefc4b73636e0ae8b37/"} +https://bitbucket.org/matwey/chelyabinsk {"origin":"https://bitbucket.org/matwey/chelyabinsk","visit":6,"date":"2021-09-24T19:32:43.322909+00:00","status":"full","snapshot":"215913858c3ee0e61e1aaea18241c5ee006da1b0","type":"hg","metadata":{},"origin_url":"https://archive.softwareheritage.org/api/1/origin/https://bitbucket.org/matwey/chelyabinsk/get/","snapshot_url":"https://archive.softwareheritage.org/api/1/snapshot/215913858c3ee0e61e1aaea18241c5ee006da1b0/"} \ No newline at end of file diff --git a/dhp-workflows/dhp-swh/src/test/resources/eu/dnetlib/dhp/swh/last_visits_data.seq b/dhp-workflows/dhp-swh/src/test/resources/eu/dnetlib/dhp/swh/last_visits_data.seq new file mode 100644 index 000000000..683fc0e69 Binary files /dev/null and b/dhp-workflows/dhp-swh/src/test/resources/eu/dnetlib/dhp/swh/last_visits_data.seq differ diff --git a/dhp-workflows/dhp-swh/src/test/resources/eu/dnetlib/dhp/swh/software.json.gz b/dhp-workflows/dhp-swh/src/test/resources/eu/dnetlib/dhp/swh/software.json.gz new file mode 100644 index 000000000..3a62c0615 Binary files /dev/null and b/dhp-workflows/dhp-swh/src/test/resources/eu/dnetlib/dhp/swh/software.json.gz differ diff --git a/dhp-workflows/pom.xml b/dhp-workflows/pom.xml index d054ba39b..64f5f2d26 100644 --- a/dhp-workflows/pom.xml +++ b/dhp-workflows/pom.xml @@ -39,6 +39,7 @@ dhp-broker-events dhp-doiboost dhp-impact-indicators + dhp-swh