Merge branch 'beta' into irish_funder

This commit is contained in:
Claudio Atzori 2023-10-06 14:26:41 +02:00
commit 3bc44fbf1d
37 changed files with 1805 additions and 178 deletions

View File

@ -51,6 +51,7 @@ public class Constants {
public static final String RETRY_DELAY = "retryDelay"; public static final String RETRY_DELAY = "retryDelay";
public static final String CONNECT_TIMEOUT = "connectTimeOut"; public static final String CONNECT_TIMEOUT = "connectTimeOut";
public static final String READ_TIMEOUT = "readTimeOut"; public static final String READ_TIMEOUT = "readTimeOut";
public static final String REQUEST_METHOD = "requestMethod";
public static final String FROM_DATE_OVERRIDE = "fromDateOverride"; public static final String FROM_DATE_OVERRIDE = "fromDateOverride";
public static final String UNTIL_DATE_OVERRIDE = "untilDateOverride"; public static final String UNTIL_DATE_OVERRIDE = "untilDateOverride";

View File

@ -1,6 +1,9 @@
package eu.dnetlib.dhp.common.collection; package eu.dnetlib.dhp.common.collection;
import java.util.HashMap;
import java.util.Map;
/** /**
* Bundles the http connection parameters driving the client behaviour. * Bundles the http connection parameters driving the client behaviour.
*/ */
@ -13,6 +16,8 @@ public class HttpClientParams {
public static int _connectTimeOut = 10; // seconds public static int _connectTimeOut = 10; // seconds
public static int _readTimeOut = 30; // seconds public static int _readTimeOut = 30; // seconds
public static String _requestMethod = "GET";
/** /**
* Maximum number of allowed retires before failing * Maximum number of allowed retires before failing
*/ */
@ -38,17 +43,30 @@ public class HttpClientParams {
*/ */
private int readTimeOut; private int readTimeOut;
/**
* Custom http headers
*/
private Map<String, String> headers;
/**
* Request method (i.e., GET, POST etc)
*/
private String requestMethod;
public HttpClientParams() { public HttpClientParams() {
this(_maxNumberOfRetry, _requestDelay, _retryDelay, _connectTimeOut, _readTimeOut); this(_maxNumberOfRetry, _requestDelay, _retryDelay, _connectTimeOut, _readTimeOut, new HashMap<>(),
_requestMethod);
} }
public HttpClientParams(int maxNumberOfRetry, int requestDelay, int retryDelay, int connectTimeOut, public HttpClientParams(int maxNumberOfRetry, int requestDelay, int retryDelay, int connectTimeOut,
int readTimeOut) { int readTimeOut, Map<String, String> headers, String requestMethod) {
this.maxNumberOfRetry = maxNumberOfRetry; this.maxNumberOfRetry = maxNumberOfRetry;
this.requestDelay = requestDelay; this.requestDelay = requestDelay;
this.retryDelay = retryDelay; this.retryDelay = retryDelay;
this.connectTimeOut = connectTimeOut; this.connectTimeOut = connectTimeOut;
this.readTimeOut = readTimeOut; this.readTimeOut = readTimeOut;
this.headers = headers;
this.requestMethod = requestMethod;
} }
public int getMaxNumberOfRetry() { public int getMaxNumberOfRetry() {
@ -91,4 +109,19 @@ public class HttpClientParams {
this.readTimeOut = readTimeOut; this.readTimeOut = readTimeOut;
} }
public Map<String, String> getHeaders() {
return headers;
}
public void setHeaders(Map<String, String> headers) {
this.headers = headers;
}
public String getRequestMethod() {
return requestMethod;
}
public void setRequestMethod(String requestMethod) {
this.requestMethod = requestMethod;
}
} }

View File

@ -107,7 +107,14 @@ public class HttpConnector2 {
urlConn.setReadTimeout(getClientParams().getReadTimeOut() * 1000); urlConn.setReadTimeout(getClientParams().getReadTimeOut() * 1000);
urlConn.setConnectTimeout(getClientParams().getConnectTimeOut() * 1000); urlConn.setConnectTimeout(getClientParams().getConnectTimeOut() * 1000);
urlConn.addRequestProperty(HttpHeaders.USER_AGENT, userAgent); urlConn.addRequestProperty(HttpHeaders.USER_AGENT, userAgent);
urlConn.setRequestMethod(getClientParams().getRequestMethod());
// if provided, add custom headers
if (!getClientParams().getHeaders().isEmpty()) {
for (Map.Entry<String, String> headerEntry : getClientParams().getHeaders().entrySet()) {
urlConn.addRequestProperty(headerEntry.getKey(), headerEntry.getValue());
}
}
if (log.isDebugEnabled()) { if (log.isDebugEnabled()) {
logHeaderFields(urlConn); logHeaderFields(urlConn);
} }

View File

@ -33,7 +33,7 @@ import scala.Tuple2;
public class GroupEntitiesSparkJob { public class GroupEntitiesSparkJob {
private static final Logger log = LoggerFactory.getLogger(GroupEntitiesSparkJob.class); private static final Logger log = LoggerFactory.getLogger(GroupEntitiesSparkJob.class);
private static final Encoder<OafEntity> OAFENTITY_KRYO_ENC = Encoders.kryo(OafEntity.class); private static final Encoder<OafEntity> OAFENTITY_KRYO_ENC = Encoders.kryo(OafEntity.class);
public static void main(String[] args) throws Exception { public static void main(String[] args) throws Exception {
@ -114,7 +114,7 @@ public class GroupEntitiesSparkJob {
Encoders.tuple(Encoders.STRING(), OAFENTITY_KRYO_ENC)); Encoders.tuple(Encoders.STRING(), OAFENTITY_KRYO_ENC));
// pivot on "_1" (classname of the entity) // pivot on "_1" (classname of the entity)
// created columns containing only entities of the same class // created columns containing only entities of the same class
for (Map.Entry<EntityType, Class> e : ModelSupport.entityTypes.entrySet()) { for (Map.Entry<EntityType, Class> e : ModelSupport.entityTypes.entrySet()) {
String entity = e.getKey().name(); String entity = e.getKey().name();
Class<? extends OafEntity> entityClass = e.getValue(); Class<? extends OafEntity> entityClass = e.getValue();

View File

@ -36,6 +36,19 @@ public class GraphCleaningFunctions extends CleaningFunctions {
public static final int TITLE_FILTER_RESIDUAL_LENGTH = 5; public static final int TITLE_FILTER_RESIDUAL_LENGTH = 5;
private static final String NAME_CLEANING_REGEX = "[\\r\\n\\t\\s]+"; private static final String NAME_CLEANING_REGEX = "[\\r\\n\\t\\s]+";
private static final HashSet<String> PEER_REVIEWED_TYPES = new HashSet<>();
static {
PEER_REVIEWED_TYPES.add("Article");
PEER_REVIEWED_TYPES.add("Part of book or chapter of book");
PEER_REVIEWED_TYPES.add("Book");
PEER_REVIEWED_TYPES.add("Doctoral thesis");
PEER_REVIEWED_TYPES.add("Master thesis");
PEER_REVIEWED_TYPES.add("Data Paper");
PEER_REVIEWED_TYPES.add("Thesis");
PEER_REVIEWED_TYPES.add("Bachelor thesis");
PEER_REVIEWED_TYPES.add("Conference object");
}
public static <T extends Oaf> T cleanContext(T value, String contextId, String verifyParam) { public static <T extends Oaf> T cleanContext(T value, String contextId, String verifyParam) {
if (ModelSupport.isSubClass(value, Result.class)) { if (ModelSupport.isSubClass(value, Result.class)) {
@ -493,6 +506,28 @@ public class GraphCleaningFunctions extends CleaningFunctions {
if (Objects.isNull(i.getRefereed()) || StringUtils.isBlank(i.getRefereed().getClassid())) { if (Objects.isNull(i.getRefereed()) || StringUtils.isBlank(i.getRefereed().getClassid())) {
i.setRefereed(qualifier("0000", "Unknown", ModelConstants.DNET_REVIEW_LEVELS)); i.setRefereed(qualifier("0000", "Unknown", ModelConstants.DNET_REVIEW_LEVELS));
} }
// from the script from Dimitris
if ("0000".equals(i.getRefereed().getClassid())) {
final boolean isFromCrossref = ModelConstants.CROSSREF_ID
.equals(i.getCollectedfrom().getKey());
final boolean hasDoi = i
.getPid()
.stream()
.anyMatch(pid -> PidType.doi.toString().equals(pid.getQualifier().getClassid()));
final boolean isPeerReviewedType = PEER_REVIEWED_TYPES
.contains(i.getInstancetype().getClassname());
final boolean noOtherLitType = r
.getInstance()
.stream()
.noneMatch(ii -> "Other literature type".equals(ii.getInstancetype().getClassname()));
if (isFromCrossref && hasDoi && isPeerReviewedType && noOtherLitType) {
i.setRefereed(qualifier("0001", "peerReviewed", ModelConstants.DNET_REVIEW_LEVELS));
} else {
i.setRefereed(qualifier("0002", "nonPeerReviewed", ModelConstants.DNET_REVIEW_LEVELS));
}
}
if (Objects.nonNull(i.getDateofacceptance())) { if (Objects.nonNull(i.getDateofacceptance())) {
Optional<String> date = cleanDateField(i.getDateofacceptance()); Optional<String> date = cleanDateField(i.getDateofacceptance());
if (date.isPresent()) { if (date.isPresent()) {

View File

@ -7,7 +7,7 @@ import java.util.regex.Pattern;
// https://researchguides.stevens.edu/c.php?g=442331&p=6577176 // https://researchguides.stevens.edu/c.php?g=442331&p=6577176
public class PmidCleaningRule { public class PmidCleaningRule {
public static final Pattern PATTERN = Pattern.compile("[1-9]{1,8}"); public static final Pattern PATTERN = Pattern.compile("0*(\\d{1,8})");
public static String clean(String pmid) { public static String clean(String pmid) {
String s = pmid String s = pmid
@ -17,7 +17,7 @@ public class PmidCleaningRule {
final Matcher m = PATTERN.matcher(s); final Matcher m = PATTERN.matcher(s);
if (m.find()) { if (m.find()) {
return m.group(); return m.group(1);
} }
return ""; return "";
} }

View File

@ -9,10 +9,16 @@ class PmidCleaningRuleTest {
@Test @Test
void testCleaning() { void testCleaning() {
// leading zeros are removed
assertEquals("1234", PmidCleaningRule.clean("01234")); assertEquals("1234", PmidCleaningRule.clean("01234"));
// tolerant to spaces in the middle
assertEquals("1234567", PmidCleaningRule.clean("0123 4567")); assertEquals("1234567", PmidCleaningRule.clean("0123 4567"));
// stop parsing at first not numerical char
assertEquals("123", PmidCleaningRule.clean("0123x4567")); assertEquals("123", PmidCleaningRule.clean("0123x4567"));
// invalid id leading to empty result
assertEquals("", PmidCleaningRule.clean("abc")); assertEquals("", PmidCleaningRule.clean("abc"));
// valid id with zeroes in the number
assertEquals("20794075", PmidCleaningRule.clean("20794075"));
} }
} }

View File

@ -32,18 +32,28 @@ import eu.dnetlib.dhp.schema.oaf.*;
import eu.dnetlib.dhp.schema.oaf.utils.CleaningFunctions; import eu.dnetlib.dhp.schema.oaf.utils.CleaningFunctions;
import eu.dnetlib.dhp.schema.oaf.utils.IdentifierFactory; import eu.dnetlib.dhp.schema.oaf.utils.IdentifierFactory;
import eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils; import eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils;
import eu.dnetlib.dhp.schema.oaf.utils.PidType;
import eu.dnetlib.dhp.utils.DHPUtils; import eu.dnetlib.dhp.utils.DHPUtils;
import scala.Tuple2; import scala.Tuple2;
public class CreateActionSetSparkJob implements Serializable { public class CreateActionSetSparkJob implements Serializable {
public static final String OPENCITATIONS_CLASSID = "sysimport:crosswalk:opencitations"; public static final String OPENCITATIONS_CLASSID = "sysimport:crosswalk:opencitations";
public static final String OPENCITATIONS_CLASSNAME = "Imported from OpenCitations"; public static final String OPENCITATIONS_CLASSNAME = "Imported from OpenCitations";
// DOI-to-DOI citations
public static final String COCI = "COCI";
// PMID-to-PMID citations
public static final String POCI = "POCI";
private static final String DOI_PREFIX = "50|doi_________::"; private static final String DOI_PREFIX = "50|doi_________::";
private static final String PMID_PREFIX = "50|pmid________::"; private static final String PMID_PREFIX = "50|pmid________::";
private static final String TRUST = "0.91"; private static final String TRUST = "0.91";
private static final Logger log = LoggerFactory.getLogger(CreateActionSetSparkJob.class); private static final Logger log = LoggerFactory.getLogger(CreateActionSetSparkJob.class);
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
public static void main(final String[] args) throws IOException, ParseException { public static void main(final String[] args) throws IOException, ParseException {
@ -67,7 +77,7 @@ public class CreateActionSetSparkJob implements Serializable {
log.info("isSparkSessionManaged: {}", isSparkSessionManaged); log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
final String inputPath = parser.get("inputPath"); final String inputPath = parser.get("inputPath");
log.info("inputPath {}", inputPath.toString()); log.info("inputPath {}", inputPath);
final String outputPath = parser.get("outputPath"); final String outputPath = parser.get("outputPath");
log.info("outputPath {}", outputPath); log.info("outputPath {}", outputPath);
@ -81,19 +91,16 @@ public class CreateActionSetSparkJob implements Serializable {
runWithSparkSession( runWithSparkSession(
conf, conf,
isSparkSessionManaged, isSparkSessionManaged,
spark -> { spark -> extractContent(spark, inputPath, outputPath, shouldDuplicateRels));
extractContent(spark, inputPath, outputPath, shouldDuplicateRels);
});
} }
private static void extractContent(SparkSession spark, String inputPath, String outputPath, private static void extractContent(SparkSession spark, String inputPath, String outputPath,
boolean shouldDuplicateRels) { boolean shouldDuplicateRels) {
getTextTextJavaPairRDD(spark, inputPath, shouldDuplicateRels, "COCI") getTextTextJavaPairRDD(spark, inputPath, shouldDuplicateRels, COCI)
.union(getTextTextJavaPairRDD(spark, inputPath, shouldDuplicateRels, "POCI")) .union(getTextTextJavaPairRDD(spark, inputPath, shouldDuplicateRels, POCI))
.saveAsHadoopFile(outputPath, Text.class, Text.class, SequenceFileOutputFormat.class); .saveAsHadoopFile(outputPath, Text.class, Text.class, SequenceFileOutputFormat.class);
} }
private static JavaPairRDD<Text, Text> getTextTextJavaPairRDD(SparkSession spark, String inputPath, private static JavaPairRDD<Text, Text> getTextTextJavaPairRDD(SparkSession spark, String inputPath,
@ -109,7 +116,7 @@ public class CreateActionSetSparkJob implements Serializable {
value, shouldDuplicateRels, prefix) value, shouldDuplicateRels, prefix)
.iterator(), .iterator(),
Encoders.bean(Relation.class)) Encoders.bean(Relation.class))
.filter((FilterFunction<Relation>) value -> value != null) .filter((FilterFunction<Relation>) Objects::nonNull)
.toJavaRDD() .toJavaRDD()
.map(p -> new AtomicAction(p.getClass(), p)) .map(p -> new AtomicAction(p.getClass(), p))
.mapToPair( .mapToPair(
@ -123,20 +130,28 @@ public class CreateActionSetSparkJob implements Serializable {
String prefix; String prefix;
String citing; String citing;
String cited; String cited;
if (p.equals("COCI")) {
prefix = DOI_PREFIX;
citing = prefix
+ IdentifierFactory.md5(CleaningFunctions.normalizePidValue("doi", value.getCiting()));
cited = prefix
+ IdentifierFactory.md5(CleaningFunctions.normalizePidValue("doi", value.getCited()));
} else {
prefix = PMID_PREFIX;
citing = prefix
+ IdentifierFactory.md5(CleaningFunctions.normalizePidValue("pmid", value.getCiting()));
cited = prefix
+ IdentifierFactory.md5(CleaningFunctions.normalizePidValue("pmid", value.getCited()));
switch (p) {
case COCI:
prefix = DOI_PREFIX;
citing = prefix
+ IdentifierFactory
.md5(CleaningFunctions.normalizePidValue(PidType.doi.toString(), value.getCiting()));
cited = prefix
+ IdentifierFactory
.md5(CleaningFunctions.normalizePidValue(PidType.doi.toString(), value.getCited()));
break;
case POCI:
prefix = PMID_PREFIX;
citing = prefix
+ IdentifierFactory
.md5(CleaningFunctions.normalizePidValue(PidType.pmid.toString(), value.getCiting()));
cited = prefix
+ IdentifierFactory
.md5(CleaningFunctions.normalizePidValue(PidType.pmid.toString(), value.getCited()));
break;
default:
throw new IllegalStateException("Invalid prefix: " + p);
} }
if (!citing.equals(cited)) { if (!citing.equals(cited)) {
@ -162,7 +177,7 @@ public class CreateActionSetSparkJob implements Serializable {
public static Relation getRelation( public static Relation getRelation(
String source, String source,
String target, String target,
String relclass) { String relClass) {
return OafMapperUtils return OafMapperUtils
.getRelation( .getRelation(
@ -170,7 +185,7 @@ public class CreateActionSetSparkJob implements Serializable {
target, target,
ModelConstants.RESULT_RESULT, ModelConstants.RESULT_RESULT,
ModelConstants.CITATION, ModelConstants.CITATION,
relclass, relClass,
Arrays Arrays
.asList( .asList(
OafMapperUtils.keyValue(ModelConstants.OPENOCITATIONS_ID, ModelConstants.OPENOCITATIONS_NAME)), OafMapperUtils.keyValue(ModelConstants.OPENOCITATIONS_ID, ModelConstants.OPENOCITATIONS_NAME)),
@ -183,6 +198,6 @@ public class CreateActionSetSparkJob implements Serializable {
ModelConstants.DNET_PROVENANCE_ACTIONS, ModelConstants.DNET_PROVENANCE_ACTIONS), ModelConstants.DNET_PROVENANCE_ACTIONS, ModelConstants.DNET_PROVENANCE_ACTIONS),
TRUST), TRUST),
null); null);
} }
} }

View File

@ -3,6 +3,7 @@ package eu.dnetlib.dhp.actionmanager.opencitations;
import java.io.*; import java.io.*;
import java.io.Serializable; import java.io.Serializable;
import java.util.Arrays;
import java.util.Objects; import java.util.Objects;
import java.util.zip.GZIPOutputStream; import java.util.zip.GZIPOutputStream;
import java.util.zip.ZipEntry; import java.util.zip.ZipEntry;
@ -37,7 +38,7 @@ public class GetOpenCitationsRefs implements Serializable {
parser.parseArgument(args); parser.parseArgument(args);
final String[] inputFile = parser.get("inputFile").split(";"); final String[] inputFile = parser.get("inputFile").split(";");
log.info("inputFile {}", inputFile.toString()); log.info("inputFile {}", Arrays.asList(inputFile));
final String workingPath = parser.get("workingPath"); final String workingPath = parser.get("workingPath");
log.info("workingPath {}", workingPath); log.info("workingPath {}", workingPath);

View File

@ -7,6 +7,7 @@ import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import java.io.IOException; import java.io.IOException;
import java.io.Serializable; import java.io.Serializable;
import java.util.Arrays;
import java.util.Optional; import java.util.Optional;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
@ -42,7 +43,7 @@ public class ReadCOCI implements Serializable {
log.info("outputPath: {}", outputPath); log.info("outputPath: {}", outputPath);
final String[] inputFile = parser.get("inputFile").split(";"); final String[] inputFile = parser.get("inputFile").split(";");
log.info("inputFile {}", inputFile.toString()); log.info("inputFile {}", Arrays.asList(inputFile));
Boolean isSparkSessionManaged = isSparkSessionManaged(parser); Boolean isSparkSessionManaged = isSparkSessionManaged(parser);
log.info("isSparkSessionManaged: {}", isSparkSessionManaged); log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
@ -74,10 +75,10 @@ public class ReadCOCI implements Serializable {
private static void doRead(SparkSession spark, String workingPath, String[] inputFiles, private static void doRead(SparkSession spark, String workingPath, String[] inputFiles,
String outputPath, String outputPath,
String delimiter, String format) throws IOException { String delimiter, String format) {
for (String inputFile : inputFiles) { for (String inputFile : inputFiles) {
String p_string = workingPath + "/" + inputFile + ".gz"; String pString = workingPath + "/" + inputFile + ".gz";
Dataset<Row> cociData = spark Dataset<Row> cociData = spark
.read() .read()
@ -86,7 +87,7 @@ public class ReadCOCI implements Serializable {
.option("inferSchema", "true") .option("inferSchema", "true")
.option("header", "true") .option("header", "true")
.option("quotes", "\"") .option("quotes", "\"")
.load(p_string) .load(pString)
.repartition(100); .repartition(100);
cociData.map((MapFunction<Row, COCI>) row -> { cociData.map((MapFunction<Row, COCI>) row -> {

View File

@ -16,15 +16,11 @@
"paramLongName": "isSparkSessionManaged", "paramLongName": "isSparkSessionManaged",
"paramDescription": "the hdfs name node", "paramDescription": "the hdfs name node",
"paramRequired": false "paramRequired": false
}, { },
"paramName": "sdr", {
"paramLongName": "shouldDuplicateRels", "paramName": "sdr",
"paramDescription": "the hdfs name node", "paramLongName": "shouldDuplicateRels",
"paramRequired": false "paramDescription": "activates/deactivates the construction of bidirectional relations Cites/IsCitedBy",
},{ "paramRequired": false
"paramName": "p", }
"paramLongName": "prefix",
"paramDescription": "the hdfs name node",
"paramRequired": true
}
] ]

View File

@ -34,6 +34,7 @@
<kill name="Kill"> <kill name="Kill">
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message> <message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill> </kill>
<action name="download"> <action name="download">
<shell xmlns="uri:oozie:shell-action:0.2"> <shell xmlns="uri:oozie:shell-action:0.2">
<job-tracker>${jobTracker}</job-tracker> <job-tracker>${jobTracker}</job-tracker>
@ -54,6 +55,7 @@
<ok to="extract"/> <ok to="extract"/>
<error to="Kill"/> <error to="Kill"/>
</action> </action>
<action name="extract"> <action name="extract">
<java> <java>
<main-class>eu.dnetlib.dhp.actionmanager.opencitations.GetOpenCitationsRefs</main-class> <main-class>eu.dnetlib.dhp.actionmanager.opencitations.GetOpenCitationsRefs</main-class>
@ -112,7 +114,6 @@
</spark-opts> </spark-opts>
<arg>--inputPath</arg><arg>${workingPath}</arg> <arg>--inputPath</arg><arg>${workingPath}</arg>
<arg>--outputPath</arg><arg>${outputPath}</arg> <arg>--outputPath</arg><arg>${outputPath}</arg>
<arg>--prefix</arg><arg>${prefix}</arg>
</spark> </spark>
<ok to="End"/> <ok to="End"/>
<error to="Kill"/> <error to="Kill"/>

View File

@ -67,60 +67,60 @@ public class SparkPropagateRelation extends AbstractSparkAction {
log.info("graphOutputPath: '{}'", graphOutputPath); log.info("graphOutputPath: '{}'", graphOutputPath);
Dataset<Relation> mergeRels = spark Dataset<Relation> mergeRels = spark
.read() .read()
.load(DedupUtility.createMergeRelPath(workingPath, "*", "*")) .load(DedupUtility.createMergeRelPath(workingPath, "*", "*"))
.as(REL_BEAN_ENC); .as(REL_BEAN_ENC);
// <mergedObjectID, dedupID> // <mergedObjectID, dedupID>
Dataset<Row> idsToMerge = mergeRels Dataset<Row> idsToMerge = mergeRels
.where(col("relClass").equalTo(ModelConstants.MERGES)) .where(col("relClass").equalTo(ModelConstants.MERGES))
.select(col("source").as("dedupID"), col("target").as("mergedObjectID")) .select(col("source").as("dedupID"), col("target").as("mergedObjectID"))
.distinct(); .distinct();
Dataset<Row> allRels = spark Dataset<Row> allRels = spark
.read() .read()
.schema(REL_BEAN_ENC.schema()) .schema(REL_BEAN_ENC.schema())
.json(graphBasePath + "/relation"); .json(graphBasePath + "/relation");
Dataset<Relation> dedupedRels = allRels Dataset<Relation> dedupedRels = allRels
.joinWith(idsToMerge, allRels.col("source").equalTo(idsToMerge.col("mergedObjectID")), "left_outer") .joinWith(idsToMerge, allRels.col("source").equalTo(idsToMerge.col("mergedObjectID")), "left_outer")
.joinWith(idsToMerge, col("_1.target").equalTo(idsToMerge.col("mergedObjectID")), "left_outer") .joinWith(idsToMerge, col("_1.target").equalTo(idsToMerge.col("mergedObjectID")), "left_outer")
.select("_1._1", "_1._2.dedupID", "_2.dedupID") .select("_1._1", "_1._2.dedupID", "_2.dedupID")
.as(Encoders.tuple(REL_BEAN_ENC, Encoders.STRING(), Encoders.STRING())) .as(Encoders.tuple(REL_BEAN_ENC, Encoders.STRING(), Encoders.STRING()))
.map((MapFunction<Tuple3<Relation, String, String>, Relation>) t -> { .map((MapFunction<Tuple3<Relation, String, String>, Relation>) t -> {
Relation rel = t._1(); Relation rel = t._1();
String newSource = t._2(); String newSource = t._2();
String newTarget = t._3(); String newTarget = t._3();
if (rel.getDataInfo() == null) { if (rel.getDataInfo() == null) {
rel.setDataInfo(new DataInfo()); rel.setDataInfo(new DataInfo());
} }
if (newSource != null || newTarget != null) { if (newSource != null || newTarget != null) {
rel.getDataInfo().setDeletedbyinference(false); rel.getDataInfo().setDeletedbyinference(false);
if (newSource != null) if (newSource != null)
rel.setSource(newSource); rel.setSource(newSource);
if (newTarget != null) if (newTarget != null)
rel.setTarget(newTarget); rel.setTarget(newTarget);
} }
return rel; return rel;
}, REL_BEAN_ENC); }, REL_BEAN_ENC);
// ids of records that are both not deletedbyinference and not invisible // ids of records that are both not deletedbyinference and not invisible
Dataset<Row> ids = validIds(spark, graphBasePath); Dataset<Row> ids = validIds(spark, graphBasePath);
// filter relations that point to valid records, can force them to be visible // filter relations that point to valid records, can force them to be visible
Dataset<Relation> cleanedRels = dedupedRels Dataset<Relation> cleanedRels = dedupedRels
.join(ids, col("source").equalTo(ids.col("id")), "leftsemi") .join(ids, col("source").equalTo(ids.col("id")), "leftsemi")
.join(ids, col("target").equalTo(ids.col("id")), "leftsemi") .join(ids, col("target").equalTo(ids.col("id")), "leftsemi")
.as(REL_BEAN_ENC) .as(REL_BEAN_ENC)
.map((MapFunction<Relation, Relation>) r -> { .map((MapFunction<Relation, Relation>) r -> {
r.getDataInfo().setInvisible(false); r.getDataInfo().setInvisible(false);
return r; return r;
}, REL_KRYO_ENC); }, REL_KRYO_ENC);
Dataset<Relation> distinctRels = cleanedRels Dataset<Relation> distinctRels = cleanedRels
.groupByKey( .groupByKey(

View File

@ -1,14 +1,14 @@
package eu.dnetlib.dhp.oa.graph.group; package eu.dnetlib.dhp.oa.graph.group;
import com.fasterxml.jackson.databind.DeserializationFeature; import static org.junit.jupiter.api.Assertions.assertEquals;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.common.HdfsSupport; import java.io.IOException;
import eu.dnetlib.dhp.oa.merge.GroupEntitiesSparkJob; import java.net.URISyntaxException;
import eu.dnetlib.dhp.schema.common.ModelSupport; import java.nio.file.Files;
import eu.dnetlib.dhp.schema.oaf.OafEntity; import java.nio.file.Path;
import eu.dnetlib.dhp.schema.oaf.Result; import java.nio.file.Paths;
import eu.dnetlib.dhp.utils.DHPUtils;
import org.apache.commons.io.FileUtils; import org.apache.commons.io.FileUtils;
import org.apache.spark.SparkConf; import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FilterFunction; import org.apache.spark.api.java.function.FilterFunction;
@ -18,108 +18,108 @@ import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.SparkSession;
import org.junit.jupiter.api.*; import org.junit.jupiter.api.*;
import java.io.IOException; import com.fasterxml.jackson.databind.DeserializationFeature;
import java.net.URISyntaxException; import com.fasterxml.jackson.databind.ObjectMapper;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import static org.junit.jupiter.api.Assertions.assertEquals; import eu.dnetlib.dhp.common.HdfsSupport;
import eu.dnetlib.dhp.oa.merge.GroupEntitiesSparkJob;
import eu.dnetlib.dhp.schema.common.ModelSupport;
import eu.dnetlib.dhp.schema.oaf.OafEntity;
import eu.dnetlib.dhp.schema.oaf.Result;
import eu.dnetlib.dhp.utils.DHPUtils;
@TestMethodOrder(MethodOrderer.OrderAnnotation.class) @TestMethodOrder(MethodOrderer.OrderAnnotation.class)
public class GroupEntitiesSparkJobTest { public class GroupEntitiesSparkJobTest {
private static SparkSession spark; private static SparkSession spark;
private static ObjectMapper mapper = new ObjectMapper() private static ObjectMapper mapper = new ObjectMapper()
.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
private static Path workingDir; private static Path workingDir;
private Path dataInputPath; private Path dataInputPath;
private Path checkpointPath; private Path checkpointPath;
private Path outputPath; private Path outputPath;
@BeforeAll @BeforeAll
public static void beforeAll() throws IOException { public static void beforeAll() throws IOException {
workingDir = Files.createTempDirectory(GroupEntitiesSparkJob.class.getSimpleName()); workingDir = Files.createTempDirectory(GroupEntitiesSparkJob.class.getSimpleName());
SparkConf conf = new SparkConf(); SparkConf conf = new SparkConf();
conf.setAppName(GroupEntitiesSparkJob.class.getSimpleName()); conf.setAppName(GroupEntitiesSparkJob.class.getSimpleName());
conf.setMaster("local"); conf.setMaster("local");
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
conf.registerKryoClasses(ModelSupport.getOafModelClasses()); conf.registerKryoClasses(ModelSupport.getOafModelClasses());
spark = SparkSession.builder().config(conf).getOrCreate(); spark = SparkSession.builder().config(conf).getOrCreate();
} }
@BeforeEach @BeforeEach
public void beforeEach() throws IOException, URISyntaxException { public void beforeEach() throws IOException, URISyntaxException {
dataInputPath = Paths.get(ClassLoader.getSystemResource("eu/dnetlib/dhp/oa/graph/group").toURI()); dataInputPath = Paths.get(ClassLoader.getSystemResource("eu/dnetlib/dhp/oa/graph/group").toURI());
checkpointPath = workingDir.resolve("grouped_entity"); checkpointPath = workingDir.resolve("grouped_entity");
outputPath = workingDir.resolve("dispatched_entity"); outputPath = workingDir.resolve("dispatched_entity");
} }
@AfterAll @AfterAll
public static void afterAll() throws IOException { public static void afterAll() throws IOException {
spark.stop(); spark.stop();
FileUtils.deleteDirectory(workingDir.toFile()); FileUtils.deleteDirectory(workingDir.toFile());
} }
@Test @Test
@Order(1) @Order(1)
void testGroupEntities() throws Exception { void testGroupEntities() throws Exception {
GroupEntitiesSparkJob.main(new String[]{ GroupEntitiesSparkJob.main(new String[] {
"-isSparkSessionManaged", "-isSparkSessionManaged",
Boolean.FALSE.toString(), Boolean.FALSE.toString(),
"-graphInputPath", "-graphInputPath",
dataInputPath.toString(), dataInputPath.toString(),
"-checkpointPath", "-checkpointPath",
checkpointPath.toString(), checkpointPath.toString(),
"-outputPath", "-outputPath",
outputPath.toString(), outputPath.toString(),
"-filterInvisible", "-filterInvisible",
Boolean.FALSE.toString() Boolean.FALSE.toString()
}); });
Dataset<OafEntity> checkpointTable = spark Dataset<OafEntity> checkpointTable = spark
.read() .read()
.load(checkpointPath.toString()) .load(checkpointPath.toString())
.selectExpr("COALESCE(*)") .selectExpr("COALESCE(*)")
.as(Encoders.kryo(OafEntity.class)); .as(Encoders.kryo(OafEntity.class));
assertEquals(
1,
checkpointTable
.filter(
(FilterFunction<OafEntity>) r -> "50|doi_________::09821844208a5cd6300b2bfb13bca1b9"
.equals(r.getId()) &&
r.getCollectedfrom().stream().anyMatch(kv -> kv.getValue().equalsIgnoreCase("zenodo")))
.count());
assertEquals( Dataset<Result> output = spark
1, .read()
checkpointTable .textFile(
.filter( DHPUtils
(FilterFunction<OafEntity>) r -> "50|doi_________::09821844208a5cd6300b2bfb13bca1b9" .toSeq(
.equals(r.getId()) && HdfsSupport
r.getCollectedfrom().stream().anyMatch(kv -> kv.getValue().equalsIgnoreCase("zenodo"))) .listFiles(outputPath.toString(), spark.sparkContext().hadoopConfiguration())))
.count()); .map((MapFunction<String, Result>) s -> mapper.readValue(s, Result.class), Encoders.bean(Result.class));
assertEquals(3, output.count());
Dataset<Result> output = spark assertEquals(
.read() 2,
.textFile( output
DHPUtils .map((MapFunction<Result, String>) r -> r.getResulttype().getClassid(), Encoders.STRING())
.toSeq( .filter((FilterFunction<String>) s -> s.equals("publication"))
HdfsSupport .count());
.listFiles(outputPath.toString(), spark.sparkContext().hadoopConfiguration()))) assertEquals(
.map((MapFunction<String, Result>) s -> mapper.readValue(s, Result.class), Encoders.bean(Result.class)); 1,
output
assertEquals(3, output.count()); .map((MapFunction<Result, String>) r -> r.getResulttype().getClassid(), Encoders.STRING())
assertEquals( .filter((FilterFunction<String>) s -> s.equals("dataset"))
2, .count());
output }
.map((MapFunction<Result, String>) r -> r.getResulttype().getClassid(), Encoders.STRING())
.filter((FilterFunction<String>) s -> s.equals("publication"))
.count());
assertEquals(
1,
output
.map((MapFunction<Result, String>) r -> r.getResulttype().getClassid(), Encoders.STRING())
.filter((FilterFunction<String>) s -> s.equals("dataset"))
.count());
}
} }

View File

@ -0,0 +1,110 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>eu.dnetlib.dhp</groupId>
<artifactId>dhp-workflows</artifactId>
<version>1.2.5-SNAPSHOT</version>
</parent>
<artifactId>dhp-swh</artifactId>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.binary.version}</artifactId>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_${scala.binary.version}</artifactId>
</dependency>
<dependency>
<groupId>eu.dnetlib.dhp</groupId>
<artifactId>dhp-common</artifactId>
<version>${project.version}</version>
<exclusions>
<exclusion>
<groupId>net.sf.saxon</groupId>
<artifactId>Saxon-HE</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>dom4j</groupId>
<artifactId>dom4j</artifactId>
</dependency>
<dependency>
<groupId>xml-apis</groupId>
<artifactId>xml-apis</artifactId>
</dependency>
<dependency>
<groupId>jaxen</groupId>
<artifactId>jaxen</artifactId>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-distcp</artifactId>
</dependency>
<dependency>
<groupId>eu.dnetlib</groupId>
<artifactId>dnet-actionmanager-api</artifactId>
</dependency>
<dependency>
<groupId>eu.dnetlib</groupId>
<artifactId>dnet-actionmanager-common</artifactId>
<exclusions>
<exclusion>
<groupId>eu.dnetlib</groupId>
<artifactId>dnet-openaireplus-mapping-utils</artifactId>
</exclusion>
<exclusion>
<groupId>saxonica</groupId>
<artifactId>saxon</artifactId>
</exclusion>
<exclusion>
<groupId>saxonica</groupId>
<artifactId>saxon-dom</artifactId>
</exclusion>
<exclusion>
<groupId>jgrapht</groupId>
<artifactId>jgrapht</artifactId>
</exclusion>
<exclusion>
<groupId>net.sf.ehcache</groupId>
<artifactId>ehcache</artifactId>
</exclusion>
<exclusion>
<groupId>org.springframework</groupId>
<artifactId>spring-test</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.*</groupId>
<artifactId>*</artifactId>
</exclusion>
<exclusion>
<groupId>apache</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>4.5.13</version>
</dependency>
<dependency>
<groupId>org.datanucleus</groupId>
<artifactId>datanucleus-core</artifactId>
<version>3.2.10</version>
<scope>compile</scope>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,176 @@
package eu.dnetlib.dhp.swh;
import static eu.dnetlib.dhp.utils.DHPUtils.getHadoopConfiguration;
import java.io.IOException;
import java.net.URL;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import org.apache.commons.cli.ParseException;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.common.collection.CollectorException;
import eu.dnetlib.dhp.common.collection.HttpClientParams;
import eu.dnetlib.dhp.schema.common.ModelSupport;
import eu.dnetlib.dhp.schema.oaf.utils.GraphCleaningFunctions;
import eu.dnetlib.dhp.swh.models.LastVisitData;
import eu.dnetlib.dhp.swh.utils.SWHConnection;
import eu.dnetlib.dhp.swh.utils.SWHConstants;
import eu.dnetlib.dhp.swh.utils.SWHUtils;
/**
* Sends archive requests to the SWH API for those software repository URLs that are missing from them
*
* @author Serafeim Chatzopoulos
*/
public class ArchiveRepositoryURLs {
private static final Logger log = LoggerFactory.getLogger(ArchiveRepositoryURLs.class);
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
private static SWHConnection swhConnection = null;
public static void main(final String[] args) throws IOException, ParseException {
final ArgumentApplicationParser argumentParser = new ArgumentApplicationParser(
IOUtils
.toString(
CollectLastVisitRepositoryData.class
.getResourceAsStream(
"/eu/dnetlib/dhp/swh/input_archive_repository_urls.json")));
argumentParser.parseArgument(args);
final String hdfsuri = argumentParser.get("namenode");
log.info("hdfsURI: {}", hdfsuri);
final String inputPath = argumentParser.get("lastVisitsPath");
log.info("inputPath: {}", inputPath);
final String outputPath = argumentParser.get("archiveRequestsPath");
log.info("outputPath: {}", outputPath);
final Integer archiveThresholdInDays = Integer.parseInt(argumentParser.get("archiveThresholdInDays"));
log.info("archiveThresholdInDays: {}", archiveThresholdInDays);
final String apiAccessToken = argumentParser.get("apiAccessToken");
log.info("apiAccessToken: {}", apiAccessToken);
final HttpClientParams clientParams = SWHUtils.getClientParams(argumentParser);
swhConnection = new SWHConnection(clientParams, apiAccessToken);
final FileSystem fs = FileSystem.get(getHadoopConfiguration(hdfsuri));
archive(fs, inputPath, outputPath, archiveThresholdInDays);
}
private static void archive(FileSystem fs, String inputPath, String outputPath, Integer archiveThresholdInDays)
throws IOException {
SequenceFile.Reader fr = SWHUtils.getSequenceFileReader(fs, inputPath);
SequenceFile.Writer fw = SWHUtils.getSequenceFileWriter(fs, outputPath);
// Create key and value objects to hold data
Text repoUrl = new Text();
Text lastVisitData = new Text();
// Read key-value pairs from the SequenceFile and handle appropriately
while (fr.next(repoUrl, lastVisitData)) {
String response = null;
try {
response = handleRecord(repoUrl.toString(), lastVisitData.toString(), archiveThresholdInDays);
} catch (java.text.ParseException e) {
log.error("Could not handle record with repo Url: {}", repoUrl.toString());
throw new RuntimeException(e);
}
// response is equal to null when no need for request
if (response != null) {
SWHUtils.appendToSequenceFile(fw, repoUrl.toString(), response);
}
}
// Close readers
fw.close();
fr.close();
}
public static String handleRecord(String repoUrl, String lastVisitData, Integer archiveThresholdInDays)
throws IOException, java.text.ParseException {
log.info("{ Key: {}, Value: {} }", repoUrl, lastVisitData);
LastVisitData lastVisit = OBJECT_MAPPER.readValue(lastVisitData, LastVisitData.class);
// a previous attempt for archival has been made, and repository URL was not found
// avoid performing the same archive request again
if (lastVisit.getStatus() != null &&
lastVisit.getStatus().equals(SWHConstants.VISIT_STATUS_NOT_FOUND)) {
log.info("Avoid request -- previous archive request returned NOT_FOUND");
return null;
}
// if we have last visit data
if (lastVisit.getSnapshot() != null) {
String cleanDate = GraphCleaningFunctions.cleanDate(lastVisit.getDate());
// and the last visit date can be parsed
if (cleanDate != null) {
SimpleDateFormat formatter = new SimpleDateFormat(ModelSupport.DATE_FORMAT);
Date lastVisitDate = formatter.parse(cleanDate);
// OR last visit time < (now() - archiveThresholdInDays)
long diffInMillies = Math.abs((new Date()).getTime() - lastVisitDate.getTime());
long diffInDays = TimeUnit.DAYS.convert(diffInMillies, TimeUnit.MILLISECONDS);
log.info("Date diff from now (in days): {}", diffInDays);
// do not perform a request, if the last visit date is no older than $archiveThresholdInDays
if (archiveThresholdInDays >= diffInDays) {
log.info("Avoid request -- no older than {} days", archiveThresholdInDays);
return null;
}
}
}
// ELSE perform an archive request
log.info("Perform archive request for: {}", repoUrl);
// if last visit data are available, re-use version control type,
// else use the default one (i.e., git)
String visitType = Optional
.ofNullable(lastVisit.getType())
.orElse(SWHConstants.DEFAULT_VISIT_TYPE);
URL url = new URL(String.format(SWHConstants.SWH_ARCHIVE_URL, visitType, repoUrl.trim()));
log.info("Sending archive request: {}", url);
String response;
try {
response = swhConnection.call(url.toString());
} catch (CollectorException e) {
log.error("Error in request: {}", url);
response = "{}";
}
return response;
}
}

View File

@ -0,0 +1,119 @@
package eu.dnetlib.dhp.swh;
import static eu.dnetlib.dhp.utils.DHPUtils.getHadoopConfiguration;
import java.io.BufferedReader;
import java.io.IOException;
import java.net.URL;
import org.apache.commons.cli.ParseException;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.SequenceFile;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.common.collection.CollectorException;
import eu.dnetlib.dhp.common.collection.HttpClientParams;
import eu.dnetlib.dhp.swh.utils.SWHConnection;
import eu.dnetlib.dhp.swh.utils.SWHConstants;
import eu.dnetlib.dhp.swh.utils.SWHUtils;
/**
* Given a file with software repository URLs, this class
* collects last visit data from the Software Heritage API.
*
* @author Serafeim Chatzopoulos
*/
public class CollectLastVisitRepositoryData {
private static final Logger log = LoggerFactory.getLogger(CollectLastVisitRepositoryData.class);
private static SWHConnection swhConnection = null;
public static void main(final String[] args)
throws IOException, ParseException {
final ArgumentApplicationParser argumentParser = new ArgumentApplicationParser(
IOUtils
.toString(
CollectLastVisitRepositoryData.class
.getResourceAsStream(
"/eu/dnetlib/dhp/swh/input_collect_last_visit_repository_data.json")));
argumentParser.parseArgument(args);
log.info("Java Xmx: {}m", Runtime.getRuntime().maxMemory() / (1024 * 1024));
final String hdfsuri = argumentParser.get("namenode");
log.info("hdfsURI: {}", hdfsuri);
final String inputPath = argumentParser.get("softwareCodeRepositoryURLs");
log.info("inputPath: {}", inputPath);
final String outputPath = argumentParser.get("lastVisitsPath");
log.info("outputPath: {}", outputPath);
final String apiAccessToken = argumentParser.get("apiAccessToken");
log.info("apiAccessToken: {}", apiAccessToken);
final HttpClientParams clientParams = SWHUtils.getClientParams(argumentParser);
swhConnection = new SWHConnection(clientParams, apiAccessToken);
final FileSystem fs = FileSystem.get(getHadoopConfiguration(hdfsuri));
collect(fs, inputPath, outputPath);
fs.close();
}
private static void collect(FileSystem fs, String inputPath, String outputPath)
throws IOException {
SequenceFile.Writer fw = SWHUtils.getSequenceFileWriter(fs, outputPath);
// Specify the HDFS directory path you want to read
Path directoryPath = new Path(inputPath);
// List all files in the directory
FileStatus[] partStatuses = fs.listStatus(directoryPath);
for (FileStatus partStatus : partStatuses) {
// Check if it's a file (not a directory)
if (partStatus.isFile()) {
handleFile(fs, partStatus.getPath(), fw);
}
}
fw.close();
}
private static void handleFile(FileSystem fs, Path partInputPath, SequenceFile.Writer fw)
throws IOException {
BufferedReader br = SWHUtils.getFileReader(fs, partInputPath);
String repoUrl;
while ((repoUrl = br.readLine()) != null) {
URL url = new URL(String.format(SWHConstants.SWH_LATEST_VISIT_URL, repoUrl.trim()));
String response;
try {
response = swhConnection.call(url.toString());
} catch (CollectorException e) {
log.error("Error in request: {}", url);
response = "{}";
}
SWHUtils.appendToSequenceFile(fw, repoUrl, response);
}
br.close();
}
}

View File

@ -0,0 +1,93 @@
package eu.dnetlib.dhp.swh;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkHiveSession;
import java.util.Optional;
import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.schema.oaf.Result;
/**
* Collects unique software repository URLs in the Graph using Hive
*
* @author Serafeim Chatzopoulos
*/
public class CollectSoftwareRepositoryURLs {
private static final Logger log = LoggerFactory.getLogger(CollectSoftwareRepositoryURLs.class);
public static <I extends Result> void main(String[] args) throws Exception {
String jsonConfiguration = IOUtils
.toString(
CollectSoftwareRepositoryURLs.class
.getResourceAsStream("/eu/dnetlib/dhp/swh/input_collect_software_repository_urls.json"));
final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
parser.parseArgument(args);
final Boolean isSparkSessionManaged = Optional
.ofNullable(parser.get("isSparkSessionManaged"))
.map(Boolean::valueOf)
.orElse(Boolean.TRUE);
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
final String hiveDbName = parser.get("hiveDbName");
log.info("hiveDbName: {}", hiveDbName);
final String outputPath = parser.get("softwareCodeRepositoryURLs");
log.info("softwareCodeRepositoryURLs: {}", outputPath);
final String hiveMetastoreUris = parser.get("hiveMetastoreUris");
log.info("hiveMetastoreUris: {}", hiveMetastoreUris);
final Integer softwareLimit = Integer.parseInt(parser.get("softwareLimit"));
log.info("softwareLimit: {}", softwareLimit);
SparkConf conf = new SparkConf();
conf.set("hive.metastore.uris", hiveMetastoreUris);
runWithSparkHiveSession(
conf,
isSparkSessionManaged,
spark -> {
doRun(spark, hiveDbName, softwareLimit, outputPath);
});
}
private static <I extends Result> void doRun(SparkSession spark, String hiveDbName, Integer limit,
String outputPath) {
String queryTemplate = "SELECT distinct coderepositoryurl.value " +
"FROM %s.software " +
"WHERE coderepositoryurl.value IS NOT NULL " +
"AND datainfo.deletedbyinference = FALSE " +
"AND datainfo.invisible = FALSE ";
if (limit != null) {
queryTemplate += String.format("LIMIT %s", limit);
}
String query = String.format(queryTemplate, hiveDbName);
log.info("Hive query to fetch software code URLs: {}", query);
Dataset<Row> df = spark.sql(query);
// write distinct repository URLs
df
.write()
.mode(SaveMode.Overwrite)
.csv(outputPath);
}
}

View File

@ -0,0 +1,185 @@
package eu.dnetlib.dhp.swh;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import static org.apache.spark.sql.functions.col;
import java.io.Serializable;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.mapred.SequenceFileOutputFormat;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.*;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.schema.action.AtomicAction;
import eu.dnetlib.dhp.schema.common.ModelConstants;
import eu.dnetlib.dhp.schema.oaf.*;
import eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils;
import eu.dnetlib.dhp.swh.models.LastVisitData;
import eu.dnetlib.dhp.swh.utils.SWHConstants;
import scala.Tuple2;
/**
* Creates action sets for Software Heritage data
*
* @author Serafeim Chatzopoulos
*/
public class PrepareSWHActionsets {
private static final Logger log = LoggerFactory.getLogger(PrepareSWHActionsets.class);
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
public static <I extends Result> void main(String[] args) throws Exception {
String jsonConfiguration = IOUtils
.toString(
PrepareSWHActionsets.class
.getResourceAsStream(
"/eu/dnetlib/dhp/swh/input_prepare_swh_actionsets.json"));
final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
parser.parseArgument(args);
final Boolean isSparkSessionManaged = Optional
.ofNullable(parser.get("isSparkSessionManaged"))
.map(Boolean::valueOf)
.orElse(Boolean.TRUE);
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
final String inputPath = parser.get("lastVisitsPath");
log.info("inputPath: {}", inputPath);
final String softwareInputPath = parser.get("softwareInputPath");
log.info("softwareInputPath: {}", softwareInputPath);
final String outputPath = parser.get("actionsetsPath");
log.info("outputPath: {}", outputPath);
SparkConf conf = new SparkConf();
runWithSparkSession(
conf,
isSparkSessionManaged,
spark -> {
JavaPairRDD<Text, Text> softwareRDD = prepareActionsets(spark, inputPath, softwareInputPath);
softwareRDD
.saveAsHadoopFile(
outputPath, Text.class, Text.class, SequenceFileOutputFormat.class, GzipCodec.class);
});
}
private static Dataset<Row> loadSWHData(SparkSession spark, String inputPath) {
JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
// read from file and transform to <origin, snapshotId> tuples
// Note: snapshot id is the SWH id for us
JavaRDD<Row> swhRDD = sc
.sequenceFile(inputPath, Text.class, Text.class)
.map(t -> t._2().toString())
.map(t -> OBJECT_MAPPER.readValue(t, LastVisitData.class))
.filter(t -> t.getOrigin() != null && t.getSnapshot() != null) // response from SWH API is empty if repo URL
// was not found
.map(item -> RowFactory.create(item.getOrigin(), item.getSnapshot()));
// convert RDD to 2-column DF
List<StructField> fields = Arrays
.asList(
DataTypes.createStructField("repoUrl", DataTypes.StringType, true),
DataTypes.createStructField("swhId", DataTypes.StringType, true));
StructType schema = DataTypes.createStructType(fields);
return spark.createDataFrame(swhRDD, schema);
}
private static Dataset<Row> loadGraphSoftwareData(SparkSession spark, String softwareInputPath) {
return spark
.read()
.textFile(softwareInputPath)
.map(
(MapFunction<String, Software>) t -> OBJECT_MAPPER.readValue(t, Software.class),
Encoders.bean(Software.class))
.filter(t -> t.getCodeRepositoryUrl() != null)
.select(col("id"), col("codeRepositoryUrl.value").as("repoUrl"));
}
private static <I extends Software> JavaPairRDD<Text, Text> prepareActionsets(SparkSession spark, String inputPath,
String softwareInputPath) {
Dataset<Row> swhDF = loadSWHData(spark, inputPath);
// swhDF.show(false);
Dataset<Row> graphSoftwareDF = loadGraphSoftwareData(spark, softwareInputPath);
// graphSoftwareDF.show(5);
Dataset<Row> joinedDF = graphSoftwareDF.join(swhDF, "repoUrl").select("id", "swhid");
// joinedDF.show(false);
return joinedDF.map((MapFunction<Row, Software>) row -> {
Software s = new Software();
// set openaire id
s.setId(row.getString(row.fieldIndex("id")));
// set swh id
Qualifier qualifier = OafMapperUtils
.qualifier(
SWHConstants.SWHID,
SWHConstants.SWHID_CLASSNAME,
ModelConstants.DNET_PID_TYPES,
ModelConstants.DNET_PID_TYPES);
DataInfo dataInfo = OafMapperUtils
.dataInfo(
false,
null,
false,
false,
ModelConstants.PROVENANCE_ACTION_SET_QUALIFIER,
"");
s
.setPid(
Arrays
.asList(
OafMapperUtils
.structuredProperty(
String.format("swh:1:snp:%s", row.getString(row.fieldIndex("swhid"))),
qualifier,
dataInfo)));
// add SWH in the `collectedFrom` field
KeyValue kv = new KeyValue();
kv.setKey(SWHConstants.SWH_ID);
kv.setValue(SWHConstants.SWH_NAME);
s.setCollectedfrom(Arrays.asList(kv));
return s;
}, Encoders.bean(Software.class))
.toJavaRDD()
.map(p -> new AtomicAction(Software.class, p))
.mapToPair(
aa -> new Tuple2<>(new Text(aa.getClazz().getCanonicalName()),
new Text(OBJECT_MAPPER.writeValueAsString(aa))));
}
}

View File

@ -0,0 +1,71 @@
package eu.dnetlib.dhp.swh.models;
import java.io.Serializable;
import com.cloudera.com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
@JsonIgnoreProperties(ignoreUnknown = true)
public class LastVisitData implements Serializable {
private String origin;
private String type;
private String date;
@JsonProperty("snapshot")
private String snapshotId;
private String status;
public String getType() {
return type;
}
public void setType(String type) {
this.type = type;
}
public String getDate() {
return date;
}
public void setDate(String date) {
this.date = date;
}
public String getSnapshot() {
return snapshotId;
}
public void setSnapshot(String snapshotId) {
this.snapshotId = snapshotId;
}
public String getStatus() {
return status;
}
public void setStatus(String status) {
this.status = status;
}
public String getOrigin() {
return origin;
}
public void setOrigin(String origin) {
this.origin = origin;
}
@Override
public String toString() {
return "LastVisitData{" +
"origin='" + origin + '\'' +
", type='" + type + '\'' +
", date='" + date + '\'' +
", snapshotId='" + snapshotId + '\'' +
", status='" + status + '\'' +
'}';
}
}

View File

@ -0,0 +1,40 @@
package eu.dnetlib.dhp.swh.utils;
import java.util.HashMap;
import java.util.Map;
import org.apache.http.HttpHeaders;
import eu.dnetlib.dhp.common.collection.CollectorException;
import eu.dnetlib.dhp.common.collection.HttpClientParams;
import eu.dnetlib.dhp.common.collection.HttpConnector2;
public class SWHConnection {
HttpConnector2 conn;
public SWHConnection(HttpClientParams clientParams, String accessToken) {
// set custom headers
Map<String, String> headers = new HashMap<String, String>() {
{
put(HttpHeaders.ACCEPT, "application/json");
if (accessToken != null) {
put(HttpHeaders.AUTHORIZATION, String.format("Bearer %s", accessToken));
}
}
};
clientParams.setHeaders(headers);
// create http connector
conn = new HttpConnector2(clientParams);
}
public String call(String url) throws CollectorException {
return conn.getInputSource(url);
}
}

View File

@ -0,0 +1,21 @@
package eu.dnetlib.dhp.swh.utils;
public class SWHConstants {
public static final String SWH_LATEST_VISIT_URL = "https://archive.softwareheritage.org/api/1/origin/%s/visit/latest/";
public static final String SWH_ARCHIVE_URL = "https://archive.softwareheritage.org/api/1/origin/save/%s/url/%s/";
public static final String DEFAULT_VISIT_TYPE = "git";
public static final String VISIT_STATUS_NOT_FOUND = "not_found";
public static final String SWHID = "swhid";
public static final String SWHID_CLASSNAME = "Software Heritage Identifier";
public static final String SWH_ID = "10|openaire____::dbfd07503aaa1ed31beed7dec942f3f4";
public static final String SWH_NAME = "Software Heritage";
}

View File

@ -0,0 +1,95 @@
package eu.dnetlib.dhp.swh.utils;
import static eu.dnetlib.dhp.common.Constants.*;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets;
import java.util.Optional;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.common.collection.HttpClientParams;
public class SWHUtils {
private static final Logger log = LoggerFactory.getLogger(SWHUtils.class);
public static HttpClientParams getClientParams(ArgumentApplicationParser argumentParser) {
final HttpClientParams clientParams = new HttpClientParams();
clientParams
.setMaxNumberOfRetry(
Optional
.ofNullable(argumentParser.get(MAX_NUMBER_OF_RETRY))
.map(Integer::parseInt)
.orElse(HttpClientParams._maxNumberOfRetry));
log.info("maxNumberOfRetry is {}", clientParams.getMaxNumberOfRetry());
clientParams
.setRequestDelay(
Optional
.ofNullable(argumentParser.get(REQUEST_DELAY))
.map(Integer::parseInt)
.orElse(HttpClientParams._requestDelay));
log.info("requestDelay is {}", clientParams.getRequestDelay());
clientParams
.setRetryDelay(
Optional
.ofNullable(argumentParser.get(RETRY_DELAY))
.map(Integer::parseInt)
.orElse(HttpClientParams._retryDelay));
log.info("retryDelay is {}", clientParams.getRetryDelay());
clientParams
.setRequestMethod(
Optional
.ofNullable(argumentParser.get(REQUEST_METHOD))
.orElse(HttpClientParams._requestMethod));
log.info("requestMethod is {}", clientParams.getRequestMethod());
return clientParams;
}
public static BufferedReader getFileReader(FileSystem fs, Path inputPath) throws IOException {
FSDataInputStream inputStream = fs.open(inputPath);
return new BufferedReader(
new InputStreamReader(inputStream, StandardCharsets.UTF_8));
}
public static SequenceFile.Writer getSequenceFileWriter(FileSystem fs, String outputPath) throws IOException {
return SequenceFile
.createWriter(
fs.getConf(),
SequenceFile.Writer.file(new Path(outputPath)),
SequenceFile.Writer.keyClass(Text.class),
SequenceFile.Writer.valueClass(Text.class));
}
public static SequenceFile.Reader getSequenceFileReader(FileSystem fs, String inputPath) throws IOException {
Path filePath = new Path(inputPath);
SequenceFile.Reader.Option fileOption = SequenceFile.Reader.file(filePath);
return new SequenceFile.Reader(fs.getConf(), fileOption);
}
public static void appendToSequenceFile(SequenceFile.Writer fw, String keyStr, String valueStr) throws IOException {
Text key = new Text();
key.set(keyStr);
Text value = new Text();
value.set(valueStr);
fw.append(key, value);
}
}

View File

@ -0,0 +1,56 @@
[
{
"paramName": "n",
"paramLongName": "namenode",
"paramDescription": "the Name Node URI",
"paramRequired": true
},
{
"paramName": "lv",
"paramLongName": "lastVisitsPath",
"paramDescription": "the URL where to store last visits data",
"paramRequired": true
},
{
"paramName": "arp",
"paramLongName": "archiveRequestsPath",
"paramDescription": "the URL where to store the responses of the archive requests",
"paramRequired": true
},
{
"paramName": "mnr",
"paramLongName": "maxNumberOfRetry",
"paramDescription": "the maximum number of admitted connection retries",
"paramRequired": false
},
{
"paramName": "rqd",
"paramLongName": "requestDelay",
"paramDescription": "the delay (ms) between requests",
"paramRequired": false
},
{
"paramName": "rtd",
"paramLongName": "retryDelay",
"paramDescription": "the delay (ms) between retries",
"paramRequired": false
},
{
"paramName": "rm",
"paramLongName": "requestMethod",
"paramDescription": "the method of the requests to perform",
"paramRequired": false
},
{
"paramName": "atid",
"paramLongName": "archiveThresholdInDays",
"paramDescription": "the thershold (in days) required to issue an archive request",
"paramRequired": false
},
{
"paramName": "aat",
"paramLongName": "apiAccessToken",
"paramDescription": "the API access token of the SWH API",
"paramRequired": false
}
]

View File

@ -0,0 +1,50 @@
[
{
"paramName": "n",
"paramLongName": "namenode",
"paramDescription": "the Name Node URI",
"paramRequired": true
},
{
"paramName": "scr",
"paramLongName": "softwareCodeRepositoryURLs",
"paramDescription": "the URL from where to read software repository URLs",
"paramRequired": true
},
{
"paramName": "lv",
"paramLongName": "lastVisitsPath",
"paramDescription": "the URL where to store last visits data",
"paramRequired": true
},
{
"paramName": "mnr",
"paramLongName": "maxNumberOfRetry",
"paramDescription": "the maximum number of admitted connection retries",
"paramRequired": false
},
{
"paramName": "rqd",
"paramLongName": "requestDelay",
"paramDescription": "the delay (ms) between requests",
"paramRequired": false
},
{
"paramName": "rtd",
"paramLongName": "retryDelay",
"paramDescription": "the delay (ms) between retries",
"paramRequired": false
},
{
"paramName": "rm",
"paramLongName": "requestMethod",
"paramDescription": "the method of the requests to perform",
"paramRequired": false
},
{
"paramName": "aat",
"paramLongName": "apiAccessToken",
"paramDescription": "the API access token of the SWH API",
"paramRequired": false
}
]

View File

@ -0,0 +1,32 @@
[
{
"paramName": "issm",
"paramLongName": "isSparkSessionManaged",
"paramDescription": "when true will stop SparkSession after job execution",
"paramRequired": false
},
{
"paramName": "scr",
"paramLongName": "softwareCodeRepositoryURLs",
"paramDescription": "the URL where to store software repository URLs",
"paramRequired": true
},
{
"paramName": "db",
"paramLongName": "hiveDbName",
"paramDescription": "the target hive database name",
"paramRequired": true
},
{
"paramName": "hmu",
"paramLongName": "hiveMetastoreUris",
"paramDescription": "the hive metastore uris",
"paramRequired": true
},
{
"paramName": "slim",
"paramLongName": "softwareLimit",
"paramDescription": "limit on the number of software repo URL to fetch",
"paramRequired": false
}
]

View File

@ -0,0 +1,26 @@
[
{
"paramName": "issm",
"paramLongName": "isSparkSessionManaged",
"paramDescription": "when true will stop SparkSession after job execution",
"paramRequired": false
},
{
"paramName": "lv",
"paramLongName": "lastVisitsPath",
"paramDescription": "the URL where to store last visits data",
"paramRequired": true
},
{
"paramName": "ap",
"paramLongName": "actionsetsPath",
"paramDescription": "the URL path where to store actionsets",
"paramRequired": true
},
{
"paramName": "sip",
"paramLongName": "softwareInputPath",
"paramDescription": "the URL path of the software in the graph",
"paramRequired": true
}
]

View File

@ -0,0 +1,19 @@
# hive
hiveDbName=openaire_prod_20230914
# input/output files
softwareCodeRepositoryURLs=${workingDir}/1_code_repo_urls.csv
lastVisitsPath=${workingDir}/2_last_visits.seq
archiveRequestsPath=${workingDir}/3_archive_requests.seq
actionsetsPath=${workingDir}/4_actionsets
graphPath=/tmp/prod_provision/graph/18_graph_blacklisted
apiAccessToken=eyJhbGciOiJIUzI1NiIsInR5cCIgOiAiSldUIiwia2lkIiA6ICJhMTMxYTQ1My1hM2IyLTQwMTUtODQ2Ny05MzAyZjk3MTFkOGEifQ.eyJpYXQiOjE2OTQ2MzYwMjAsImp0aSI6IjkwZjdkNTNjLTQ5YTktNGFiMy1hY2E0LTcwMTViMjEyZTNjNiIsImlzcyI6Imh0dHBzOi8vYXV0aC5zb2Z0d2FyZWhlcml0YWdlLm9yZy9hdXRoL3JlYWxtcy9Tb2Z0d2FyZUhlcml0YWdlIiwiYXVkIjoiaHR0cHM6Ly9hdXRoLnNvZnR3YXJlaGVyaXRhZ2Uub3JnL2F1dGgvcmVhbG1zL1NvZnR3YXJlSGVyaXRhZ2UiLCJzdWIiOiIzMTY5OWZkNC0xNmE0LTQxOWItYTdhMi00NjI5MDY4ZjI3OWEiLCJ0eXAiOiJPZmZsaW5lIiwiYXpwIjoic3doLXdlYiIsInNlc3Npb25fc3RhdGUiOiIzMjYzMzEwMS00ZDRkLTQwMjItODU2NC1iMzNlMTJiNTE3ZDkiLCJzY29wZSI6Im9wZW5pZCBvZmZsaW5lX2FjY2VzcyBwcm9maWxlIGVtYWlsIn0.XHj1VIZu1dZ4Ej32-oU84mFmaox9cLNjXosNxwZM0Xs
maxNumberOfRetry=2
retryDelay=1
requestDelay=100
softwareLimit=500
resume=collect-software-repository-urls

View File

@ -0,0 +1,54 @@
<configuration>
<property>
<name>jobTracker</name>
<value>yarnRM</value>
</property>
<property>
<name>nameNode</name>
<value>hdfs://nameservice1</value>
</property>
<property>
<name>oozie.use.system.libpath</name>
<value>true</value>
</property>
<property>
<name>oozie.action.sharelib.for.spark</name>
<value>spark2</value>
</property>
<property>
<name>hiveMetastoreUris</name>
<value>thrift://iis-cdh5-test-m3.ocean.icm.edu.pl:9083</value>
</property>
<property>
<name>spark2YarnHistoryServerAddress</name>
<value>http://iis-cdh5-test-gw.ocean.icm.edu.pl:18089</value>
</property>
<property>
<name>spark2EventLogDir</name>
<value>/user/spark/spark2ApplicationHistory</value>
</property>
<property>
<name>spark2ExtraListeners</name>
<value>"com.cloudera.spark.lineage.NavigatorAppListener"</value>
</property>
<property>
<name>spark2SqlQueryExecutionListeners</name>
<value>"com.cloudera.spark.lineage.NavigatorQueryListener"</value>
</property>
<property>
<name>oozieActionShareLibForSpark2</name>
<value>spark2</value>
</property>
<property>
<name>resourceManager</name>
<value>http://iis-cdh5-test-m2.ocean.icm.edu.pl:8088/cluster</value>
</property>
<property>
<name>oozie.launcher.mapreduce.user.classpath.first</name>
<value>true</value>
</property>
<property>
<name>sparkSqlWarehouseDir</name>
<value>/user/hive/warehouse</value>
</property>
</configuration>

View File

@ -0,0 +1,183 @@
<workflow-app name="Software-Heritage-Integration-Workflow" xmlns="uri:oozie:workflow:0.5">
<!-- Custom parameters -->
<parameters>
<property>
<name>hiveDbName</name>
<description>The name of the Hive DB to be used</description>
</property>
<property>
<name>softwareCodeRepositoryURLs</name>
<description>The path in the HDFS to save the software repository URLs</description>
</property>
<property>
<name>lastVisitsPath</name>
<description>The path in the HDFS to save the responses of the last visit requests</description>
</property>
<property>
<name>archiveRequestsPath</name>
<description>The path in the HDFS to save the responses of the archive requests</description>
</property>
<property>
<name>actionsetsPath</name>
<description>The path in the HDFS to save the action sets</description>
</property>
<property>
<name>graphPath</name>
<description>The path in the HDFS to the base folder of the graph</description>
</property>
<property>
<name>maxNumberOfRetry</name>
<description>Max number of retries for failed API calls</description>
</property>
<property>
<name>retryDelay</name>
<description>Retry delay for failed requests (in sec)</description>
</property>
<property>
<name>requestDelay</name>
<description>Delay between API requests (in ms)</description>
</property>
<property>
<name>apiAccessToken</name>
<description>The API Key of the SWH API</description>
</property>
<property>
<name>softwareLimit</name>
<description>Limit on the number of repo URLs to use (Optional); for debug purposes</description>
</property>
<property>
<name>resumeFrom</name>
<description>Variable that indicates the step to start from</description>
</property>
</parameters>
<!-- Global variables -->
<global>
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<configuration>
<property>
<name>oozie.action.sharelib.for.spark</name>
<value>${oozieActionShareLibForSpark2}</value>
</property>
<property>
<name>actionsetsPath</name>
<value>${actionsetsPath}</value>
</property>
<property>
<name>apiAccessToken</name>
<value>${apiAccessToken}</value>
</property>
</configuration>
</global>
<start to="startFrom"/>
<kill name="Kill">
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<decision name="startFrom">
<switch>
<case to="collect-software-repository-urls">${wf:conf('resumeFrom') eq 'collect-software-repository-urls'}</case>
<case to="create-swh-actionsets">${wf:conf('resumeFrom') eq 'create-swh-actionsets'}</case>
<default to="collect-software-repository-urls"/>
</switch>
</decision>
<action name="collect-software-repository-urls">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Collect software repository URLs</name>
<class>eu.dnetlib.dhp.swh.CollectSoftwareRepositoryURLs</class>
<jar>dhp-swh-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
</spark-opts>
<arg>--softwareCodeRepositoryURLs</arg><arg>${softwareCodeRepositoryURLs}</arg>
<arg>--hiveDbName</arg><arg>${hiveDbName}</arg>
<arg>--hiveMetastoreUris</arg><arg>${hiveMetastoreUris}</arg>
<arg>--softwareLimit</arg><arg>${softwareLimit}</arg>
</spark>
<ok to="collect-repository-last-visit-data"/>
<error to="Kill"/>
</action>
<action name="collect-repository-last-visit-data">
<java>
<main-class>eu.dnetlib.dhp.swh.CollectLastVisitRepositoryData</main-class>
<arg>--namenode</arg><arg>${nameNode}</arg>
<arg>--softwareCodeRepositoryURLs</arg><arg>${softwareCodeRepositoryURLs}</arg>
<arg>--lastVisitsPath</arg><arg>${lastVisitsPath}</arg>
<arg>--maxNumberOfRetry</arg><arg>${maxNumberOfRetry}</arg>
<arg>--requestDelay</arg><arg>${requestDelay}</arg>
<arg>--retryDelay</arg><arg>${retryDelay}</arg>
<arg>--requestMethod</arg><arg>GET</arg>
<arg>--apiAccessToken</arg><arg>${apiAccessToken}</arg>
</java>
<ok to="archive-repository-urls"/>
<error to="Kill"/>
</action>
<action name="archive-repository-urls">
<java>
<main-class>eu.dnetlib.dhp.swh.ArchiveRepositoryURLs</main-class>
<arg>--namenode</arg><arg>${nameNode}</arg>
<arg>--lastVisitsPath</arg><arg>${lastVisitsPath}</arg>
<arg>--archiveRequestsPath</arg><arg>${archiveRequestsPath}</arg>
<arg>--archiveThresholdInDays</arg><arg>365</arg>
<arg>--maxNumberOfRetry</arg><arg>${maxNumberOfRetry}</arg>
<arg>--requestDelay</arg><arg>${requestDelay}</arg>
<arg>--retryDelay</arg><arg>${retryDelay}</arg>
<arg>--requestMethod</arg><arg>POST</arg>
<arg>--apiAccessToken</arg><arg>${apiAccessToken}</arg>
</java>
<ok to="create-swh-actionsets"/>
<error to="Kill"/>
</action>
<action name="create-swh-actionsets">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Create actionsets for SWH data</name>
<class>eu.dnetlib.dhp.swh.PrepareSWHActionsets</class>
<jar>dhp-swh-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
</spark-opts>
<arg>--lastVisitsPath</arg><arg>${lastVisitsPath}</arg>
<arg>--actionsetsPath</arg><arg>${actionsetsPath}</arg>
<arg>--softwareInputPath</arg><arg>${graphPath}/software</arg>
</spark>
<ok to="End"/>
<error to="Kill"/>
</action>
<end name="End"/>
</workflow-app>

View File

@ -0,0 +1,38 @@
package eu.dnetlib.dhp.swh;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.text.ParseException;
import java.util.Arrays;
import org.apache.hadoop.fs.FileSystem;
import org.junit.jupiter.api.Test;
import eu.dnetlib.dhp.swh.utils.SWHUtils;
public class ArchiveRepositoryURLsTest {
@Test
void testArchive() throws IOException, ParseException {
String inputPath = getClass()
.getResource("/eu/dnetlib/dhp/swh/lastVisitDataToArchive.csv")
.getPath();
File file = new File(inputPath);
FileReader fr = new FileReader(file);
BufferedReader br = new BufferedReader(fr); // creates a buffering character input stream
String line;
while ((line = br.readLine()) != null) {
String[] tokens = line.split("\t");
String response = ArchiveRepositoryURLs.handleRecord(tokens[0], tokens[1], 365);
System.out.println(tokens[0] + "\t" + response);
System.out.println();
}
fr.close();
}
}

View File

@ -0,0 +1,97 @@
package eu.dnetlib.dhp.swh;
import static org.junit.jupiter.api.Assertions.assertEquals;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.io.Text;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.schema.action.AtomicAction;
import eu.dnetlib.dhp.schema.common.ModelConstants;
import eu.dnetlib.dhp.schema.oaf.Relation;
import eu.dnetlib.dhp.schema.oaf.utils.CleaningFunctions;
import eu.dnetlib.dhp.schema.oaf.utils.IdentifierFactory;
public class PrepareSWHActionsetsTest {
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
private static SparkSession spark;
private static Path workingDir;
private static final Logger log = LoggerFactory
.getLogger(PrepareSWHActionsetsTest.class);
@BeforeAll
public static void beforeAll() throws IOException {
workingDir = Files.createTempDirectory(PrepareSWHActionsetsTest.class.getSimpleName());
log.info("Using work dir {}", workingDir);
SparkConf conf = new SparkConf();
conf.setAppName(PrepareSWHActionsetsTest.class.getSimpleName());
conf.setMaster("local[*]");
conf.set("spark.driver.host", "localhost");
conf.set("hive.metastore.local", "true");
conf.set("spark.ui.enabled", "false");
conf.set("spark.sql.warehouse.dir", workingDir.toString());
conf.set("hive.metastore.warehouse.dir", workingDir.resolve("warehouse").toString());
spark = SparkSession
.builder()
.appName(PrepareSWHActionsetsTest.class.getSimpleName())
.config(conf)
.getOrCreate();
}
@AfterAll
public static void afterAll() throws IOException {
FileUtils.deleteDirectory(workingDir.toFile());
spark.stop();
}
@Test
void testRun() throws Exception {
String lastVisitsPath = getClass()
.getResource("/eu/dnetlib/dhp/swh/last_visits_data.seq")
.getPath();
String outputPath = workingDir.toString() + "/actionSet";
String softwareInputPath = getClass()
.getResource("/eu/dnetlib/dhp/swh/software.json.gz")
.getPath();
PrepareSWHActionsets
.main(
new String[] {
"-isSparkSessionManaged", Boolean.FALSE.toString(),
"-lastVisitsPath", lastVisitsPath,
"-softwareInputPath", softwareInputPath,
"-actionsetsPath", outputPath
});
}
}

View File

@ -0,0 +1,58 @@
package eu.dnetlib.dhp.swh;
import java.io.IOException;
import java.net.MalformedURLException;
import java.net.URL;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import eu.dnetlib.dhp.common.collection.CollectorException;
import eu.dnetlib.dhp.common.collection.HttpClientParams;
import eu.dnetlib.dhp.swh.utils.SWHConnection;
import eu.dnetlib.dhp.swh.utils.SWHConstants;
//import org.apache.hadoop.hdfs.MiniDFSCluster;
public class SWHConnectionTest {
private static final Logger log = LoggerFactory.getLogger(SWHConnectionTest.class);
@Test
void testGetCall() throws IOException {
HttpClientParams clientParams = new HttpClientParams();
clientParams.setRequestMethod("GET");
SWHConnection swhConnection = new SWHConnection(clientParams, null);
String repoUrl = "https://github.com/stanford-futuredata/FAST";
URL url = new URL(String.format(SWHConstants.SWH_LATEST_VISIT_URL, repoUrl));
String response = null;
try {
response = swhConnection.call(url.toString());
} catch (CollectorException e) {
System.out.println("Error in request: " + url);
}
System.out.println(response);
}
@Test
void testPostCall() throws MalformedURLException {
HttpClientParams clientParams = new HttpClientParams();
clientParams.setRequestMethod("POST");
SWHConnection swhConnection = new SWHConnection(clientParams, null);
String repoUrl = "https://github.com/stanford-futuredata/FAST";
URL url = new URL(String.format(SWHConstants.SWH_ARCHIVE_URL, SWHConstants.DEFAULT_VISIT_TYPE, repoUrl));
String response = null;
try {
response = swhConnection.call(url.toString());
} catch (CollectorException e) {
System.out.println("Error in request: " + url);
}
System.out.println(response);
}
}

View File

@ -0,0 +1,7 @@
https://bitbucket.org/samskillman/yt-stokes {"origin":"https://bitbucket.org/samskillman/yt-stokes","visit":43,"date":"2021-09-13T21:59:27.125171+00:00","status":"failed","snapshot":null,"type":"hg","metadata":{},"origin_url":"https://archive.softwareheritage.org/api/1/origin/https://bitbucket.org/samskillman/yt-stokes/get/","snapshot_url":null}
https://github.com/bioinsilico/BIPSPI {"origin":"https://github.com/bioinsilico/BIPSPI","visit":1,"date":"2020-03-18T14:50:21.541822+00:00","status":"full","snapshot":"c6c69d2cd73ce89811448da5f031611df6f63bdb","type":"git","metadata":{},"origin_url":"https://archive.softwareheritage.org/api/1/origin/https://github.com/bioinsilico/BIPSPI/get/","snapshot_url":"https://archive.softwareheritage.org/api/1/snapshot/c6c69d2cd73ce89811448da5f031611df6f63bdb/"}
https://github.com/mloop/kdiff-type1-error-rate/blob/master/analysis/simulation.R {}
https://github.com/schwanbeck/YSMR {"origin":"https://github.com/schwanbeck/YSMR","visit":6,"date":"2023-08-02T15:25:02.650676+00:00","status":"full","snapshot":"a9d1c5f0bca2def198b89f65bc9f7da3be8439ed","type":"git","metadata":{},"origin_url":"https://archive.softwareheritage.org/api/1/origin/https://github.com/schwanbeck/YSMR/get/","snapshot_url":"https://archive.softwareheritage.org/api/1/snapshot/a9d1c5f0bca2def198b89f65bc9f7da3be8439ed/"}
https://github.com/lvclark/TASSELGBS_combine {"origin":"https://github.com/lvclark/TASSELGBS_combine","visit":1,"date":"2020-04-12T20:44:09.405589+00:00","status":"full","snapshot":"ffa6fefd3f5becefbea9fe0e6d5d93859c95c071","type":"git","metadata":{},"origin_url":"https://archive.softwareheritage.org/api/1/origin/https://github.com/lvclark/TASSELGBS_combine/get/","snapshot_url":"https://archive.softwareheritage.org/api/1/snapshot/ffa6fefd3f5becefbea9fe0e6d5d93859c95c071/"}
https://github.com/PRIDE-Toolsuite/inspector-example-files {"origin":"https://github.com/PRIDE-Toolsuite/inspector-example-files","visit":12,"date":"2021-01-25T08:54:13.394674+00:00","status":"full","snapshot":"0b56eb0ad07cf778df6dabefc4b73636e0ae8b37","type":"git","metadata":{},"origin_url":"https://archive.softwareheritage.org/api/1/origin/https://github.com/PRIDE-Toolsuite/inspector-example-files/get/","snapshot_url":"https://archive.softwareheritage.org/api/1/snapshot/0b56eb0ad07cf778df6dabefc4b73636e0ae8b37/"}
https://bitbucket.org/matwey/chelyabinsk {"origin":"https://bitbucket.org/matwey/chelyabinsk","visit":6,"date":"2021-09-24T19:32:43.322909+00:00","status":"full","snapshot":"215913858c3ee0e61e1aaea18241c5ee006da1b0","type":"hg","metadata":{},"origin_url":"https://archive.softwareheritage.org/api/1/origin/https://bitbucket.org/matwey/chelyabinsk/get/","snapshot_url":"https://archive.softwareheritage.org/api/1/snapshot/215913858c3ee0e61e1aaea18241c5ee006da1b0/"}
Can't render this file because it contains an unexpected character in line 1 and column 46.

View File

@ -39,6 +39,7 @@
<module>dhp-broker-events</module> <module>dhp-broker-events</module>
<module>dhp-doiboost</module> <module>dhp-doiboost</module>
<module>dhp-impact-indicators</module> <module>dhp-impact-indicators</module>
<module>dhp-swh</module>
</modules> </modules>
<pluginRepositories> <pluginRepositories>