Merge branch 'beta' into crossref_missing_author_fix

This commit is contained in:
Claudio Atzori 2024-01-26 15:57:04 +01:00
commit ce3200263e
23 changed files with 766 additions and 166 deletions

43
CODE_OF_CONDUCT.md Normal file
View File

@ -0,0 +1,43 @@
# Contributor Code of Conduct
Openness, transparency and our community-driven participatory approach guide us in our day-to-day interactions and decision-making. Our open source projects are no exception. Trust, respect, collaboration and transparency are core values we believe should live and breathe within our projects. Our community welcomes participants from around the world with different experiences, unique perspectives, and great ideas to share.
## Our Pledge
In the interest of fostering an open and welcoming environment, we as contributors and maintainers pledge to making participation in our project and our community a harassment-free experience for everyone, regardless of age, body size, disability, ethnicity, sex characteristics, gender identity and expression, level of experience, education, socio-economic status, nationality, personal appearance, race, religion, or sexual identity and orientation.
## Our Standards
Examples of behavior that contributes to creating a positive environment include:
- Using welcoming and inclusive language
- Being respectful of differing viewpoints and experiences
- Gracefully accepting constructive criticism
- Attempting collaboration before conflict
- Focusing on what is best for the community
- Showing empathy towards other community members
Examples of unacceptable behavior by participants include:
- Violence, threats of violence, or inciting others to commit self-harm
- The use of sexualized language or imagery and unwelcome sexual attention or advances
- Trolling, intentionally spreading misinformation, insulting/derogatory comments, and personal or political attacks
- Public or private harassment
- Publishing others' private information, such as a physical or electronic address, without explicit permission
- Abuse of the reporting process to intentionally harass or exclude others
- Advocating for, or encouraging, any of the above behavior
- Other conduct which could reasonably be considered inappropriate in a professional setting
## Our Responsibilities
Project maintainers are responsible for clarifying the standards of acceptable behavior and are expected to take appropriate and fair corrective action in response to any instances of unacceptable behavior.
Project maintainers have the right and responsibility to remove, edit, or reject comments, commits, code, wiki edits, issues, and other contributions that are not aligned to this Code of Conduct, or to ban temporarily or permanently any contributor for other behaviors that they deem inappropriate, threatening, offensive, or harmful.
## Scope
This Code of Conduct applies both within project spaces and in public spaces when an individual is representing the project or its community. Examples of representing a project or community include using an official project e-mail address, posting via an official social media account, or acting as an appointed representative at an online or offline event. Representation of a project may be further defined and clarified by project maintainers.
## Attribution
This Code of Conduct is adapted from the [Contributor Covenant](https://www.contributor-covenant.org/), [version 1.4](https://www.contributor-covenant.org/version/1/4/code-of-conduct.html).

10
CONTRIBUTING.md Normal file
View File

@ -0,0 +1,10 @@
# Contributing to D-Net Hadoop
:+1::tada: First off, thanks for taking the time to contribute! :tada::+1:
This project and everyone participating in it is governed by our [Code of Conduct](CODE_OF_CONDUCT.md). By participating, you are expected to uphold this code. Please report unacceptable behavior to [dnet-team@isti.cnr.it](mailto:dnet-team@isti.cnr.it).
The following is a set of guidelines for contributing to this project and its packages. These are mostly guidelines, not rules, which applies to this project as a while, including all its sub-modules.
Use your best judgment, and feel free to propose changes to this document in a pull request.
All contributions are welcome, all contributions will be considered to be contributed under the [project license](LICENSE.md).

View File

View File

@ -2,6 +2,11 @@
Dnet-hadoop is the project that defined all the [OOZIE workflows](https://oozie.apache.org/) for the OpenAIRE Graph construction, processing, provisioning. Dnet-hadoop is the project that defined all the [OOZIE workflows](https://oozie.apache.org/) for the OpenAIRE Graph construction, processing, provisioning.
This project adheres to the Contributor Covenant [code of conduct](CODE_OF_CONDUCT.md).
By participating, you are expected to uphold this code. Please report unacceptable behavior to [dnet-team@isti.cnr.it](mailto:dnet-team@isti.cnr.it).
This project is licensed under the [AGPL v3 or later version](#LICENSE.md).
How to build, package and run oozie workflows How to build, package and run oozie workflows
==================== ====================

View File

@ -0,0 +1,39 @@
package eu.dnetlib.dhp.common.api.context;
public class CategorySummary {
private String id;
private String label;
private boolean hasConcept;
public String getId() {
return id;
}
public String getLabel() {
return label;
}
public boolean isHasConcept() {
return hasConcept;
}
public CategorySummary setId(final String id) {
this.id = id;
return this;
}
public CategorySummary setLabel(final String label) {
this.label = label;
return this;
}
public CategorySummary setHasConcept(final boolean hasConcept) {
this.hasConcept = hasConcept;
return this;
}
}

View File

@ -0,0 +1,7 @@
package eu.dnetlib.dhp.common.api.context;
import java.util.ArrayList;
public class CategorySummaryList extends ArrayList<CategorySummary> {
}

View File

@ -0,0 +1,52 @@
package eu.dnetlib.dhp.common.api.context;
import java.util.List;
public class ConceptSummary {
private String id;
private String label;
public boolean hasSubConcept;
private List<ConceptSummary> concepts;
public String getId() {
return id;
}
public String getLabel() {
return label;
}
public List<ConceptSummary> getConcepts() {
return concepts;
}
public ConceptSummary setId(final String id) {
this.id = id;
return this;
}
public ConceptSummary setLabel(final String label) {
this.label = label;
return this;
}
public boolean isHasSubConcept() {
return hasSubConcept;
}
public ConceptSummary setHasSubConcept(final boolean hasSubConcept) {
this.hasSubConcept = hasSubConcept;
return this;
}
public ConceptSummary setConcept(final List<ConceptSummary> concepts) {
this.concepts = concepts;
return this;
}
}

View File

@ -0,0 +1,7 @@
package eu.dnetlib.dhp.common.api.context;
import java.util.ArrayList;
public class ConceptSummaryList extends ArrayList<ConceptSummary> {
}

View File

@ -0,0 +1,50 @@
package eu.dnetlib.dhp.common.api.context;
public class ContextSummary {
private String id;
private String label;
private String type;
private String status;
public String getId() {
return id;
}
public String getLabel() {
return label;
}
public String getType() {
return type;
}
public String getStatus() {
return status;
}
public ContextSummary setId(final String id) {
this.id = id;
return this;
}
public ContextSummary setLabel(final String label) {
this.label = label;
return this;
}
public ContextSummary setType(final String type) {
this.type = type;
return this;
}
public ContextSummary setStatus(final String status) {
this.status = status;
return this;
}
}

View File

@ -0,0 +1,7 @@
package eu.dnetlib.dhp.common.api.context;
import java.util.ArrayList;
public class ContextSummaryList extends ArrayList<ContextSummary> {
}

View File

@ -8,10 +8,13 @@ import java.io.InputStream;
import java.net.*; import java.net.*;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.math.NumberUtils; import org.apache.commons.lang3.math.NumberUtils;
import org.apache.commons.lang3.time.DateUtils;
import org.apache.http.HttpHeaders; import org.apache.http.HttpHeaders;
import org.joda.time.Instant;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -94,14 +97,16 @@ public class HttpConnector2 {
throw new CollectorException(msg); throw new CollectorException(msg);
} }
log.info("Request attempt {} [{}]", retryNumber, requestUrl);
InputStream input = null; InputStream input = null;
long start = System.currentTimeMillis();
try { try {
if (getClientParams().getRequestDelay() > 0) { if (getClientParams().getRequestDelay() > 0) {
backoffAndSleep(getClientParams().getRequestDelay()); backoffAndSleep(getClientParams().getRequestDelay());
} }
log.info("Request attempt {} [{}]", retryNumber, requestUrl);
final HttpURLConnection urlConn = (HttpURLConnection) new URL(requestUrl).openConnection(); final HttpURLConnection urlConn = (HttpURLConnection) new URL(requestUrl).openConnection();
urlConn.setInstanceFollowRedirects(false); urlConn.setInstanceFollowRedirects(false);
urlConn.setReadTimeout(getClientParams().getReadTimeOut() * 1000); urlConn.setReadTimeout(getClientParams().getReadTimeOut() * 1000);
@ -115,9 +120,8 @@ public class HttpConnector2 {
urlConn.addRequestProperty(headerEntry.getKey(), headerEntry.getValue()); urlConn.addRequestProperty(headerEntry.getKey(), headerEntry.getValue());
} }
} }
if (log.isDebugEnabled()) {
logHeaderFields(urlConn); logHeaderFields(urlConn);
}
int retryAfter = obtainRetryAfter(urlConn.getHeaderFields()); int retryAfter = obtainRetryAfter(urlConn.getHeaderFields());
String rateLimit = urlConn.getHeaderField(Constants.HTTPHEADER_IETF_DRAFT_RATELIMIT_LIMIT); String rateLimit = urlConn.getHeaderField(Constants.HTTPHEADER_IETF_DRAFT_RATELIMIT_LIMIT);
@ -132,9 +136,7 @@ public class HttpConnector2 {
} }
if (is2xx(urlConn.getResponseCode())) { if (is2xx(urlConn.getResponseCode())) {
input = urlConn.getInputStream(); return getInputStream(urlConn, start);
responseType = urlConn.getContentType();
return input;
} }
if (is3xx(urlConn.getResponseCode())) { if (is3xx(urlConn.getResponseCode())) {
// REDIRECTS // REDIRECTS
@ -144,6 +146,7 @@ public class HttpConnector2 {
.put( .put(
REPORT_PREFIX + urlConn.getResponseCode(), REPORT_PREFIX + urlConn.getResponseCode(),
String.format("Moved to: %s", newUrl)); String.format("Moved to: %s", newUrl));
logRequestTime(start);
urlConn.disconnect(); urlConn.disconnect();
if (retryAfter > 0) { if (retryAfter > 0) {
backoffAndSleep(retryAfter); backoffAndSleep(retryAfter);
@ -159,26 +162,50 @@ public class HttpConnector2 {
if (retryAfter > 0) { if (retryAfter > 0) {
log log
.warn( .warn(
"{} - waiting and repeating request after suggested retry-after {} sec.", "waiting and repeating request after suggested retry-after {} sec for URL {}",
requestUrl, retryAfter); retryAfter, requestUrl);
backoffAndSleep(retryAfter * 1000); backoffAndSleep(retryAfter * 1000);
} else { } else {
log log
.warn( .warn(
"{} - waiting and repeating request after default delay of {} sec.", "waiting and repeating request after default delay of {} sec for URL {}",
requestUrl, getClientParams().getRetryDelay()); getClientParams().getRetryDelay(), requestUrl);
backoffAndSleep(retryNumber * getClientParams().getRetryDelay() * 1000); backoffAndSleep(retryNumber * getClientParams().getRetryDelay());
} }
report.put(REPORT_PREFIX + urlConn.getResponseCode(), requestUrl); report.put(REPORT_PREFIX + urlConn.getResponseCode(), requestUrl);
logRequestTime(start);
urlConn.disconnect(); urlConn.disconnect();
return attemptDownload(requestUrl, retryNumber + 1, report); return attemptDownload(requestUrl, retryNumber + 1, report);
case 422: // UNPROCESSABLE ENTITY
report.put(REPORT_PREFIX + urlConn.getResponseCode(), requestUrl);
log.warn("waiting and repeating request after 10 sec for URL {}", requestUrl);
backoffAndSleep(10000);
urlConn.disconnect();
logRequestTime(start);
try {
return getInputStream(urlConn, start);
} catch (IOException e) {
log
.error(
"server returned 422 and got IOException accessing the response body from URL {}",
requestUrl);
log.error("IOException:", e);
return attemptDownload(requestUrl, retryNumber + 1, report);
}
default: default:
log.error("gor error {} from URL: {}", urlConn.getResponseCode(), urlConn.getURL());
log.error("response message: {}", urlConn.getResponseMessage());
report report
.put( .put(
REPORT_PREFIX + urlConn.getResponseCode(), REPORT_PREFIX + urlConn.getResponseCode(),
String String
.format( .format(
"%s Error: %s", requestUrl, urlConn.getResponseMessage())); "%s Error: %s", requestUrl, urlConn.getResponseMessage()));
logRequestTime(start);
urlConn.disconnect();
throw new CollectorException(urlConn.getResponseCode() + " error " + report); throw new CollectorException(urlConn.getResponseCode() + " error " + report);
} }
} }
@ -199,13 +226,27 @@ public class HttpConnector2 {
} }
} }
private InputStream getInputStream(HttpURLConnection urlConn, long start) throws IOException {
InputStream input = urlConn.getInputStream();
responseType = urlConn.getContentType();
logRequestTime(start);
return input;
}
private static void logRequestTime(long start) {
log
.info(
"request time elapsed: {}sec",
TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis() - start));
}
private void logHeaderFields(final HttpURLConnection urlConn) throws IOException { private void logHeaderFields(final HttpURLConnection urlConn) throws IOException {
log.debug("StatusCode: {}", urlConn.getResponseMessage()); log.info("Response: {} - {}", urlConn.getResponseCode(), urlConn.getResponseMessage());
for (Map.Entry<String, List<String>> e : urlConn.getHeaderFields().entrySet()) { for (Map.Entry<String, List<String>> e : urlConn.getHeaderFields().entrySet()) {
if (e.getKey() != null) { if (e.getKey() != null) {
for (String v : e.getValue()) { for (String v : e.getValue()) {
log.debug(" key: {} - value: {}", e.getKey(), v); log.info(" key: {} - value: {}", e.getKey(), v);
} }
} }
} }
@ -225,7 +266,7 @@ public class HttpConnector2 {
for (String key : headerMap.keySet()) { for (String key : headerMap.keySet()) {
if ((key != null) && key.equalsIgnoreCase(HttpHeaders.RETRY_AFTER) && (!headerMap.get(key).isEmpty()) if ((key != null) && key.equalsIgnoreCase(HttpHeaders.RETRY_AFTER) && (!headerMap.get(key).isEmpty())
&& NumberUtils.isCreatable(headerMap.get(key).get(0))) { && NumberUtils.isCreatable(headerMap.get(key).get(0))) {
return Integer.parseInt(headerMap.get(key).get(0)) + 10; return Integer.parseInt(headerMap.get(key).get(0));
} }
} }
return -1; return -1;

View File

@ -0,0 +1,77 @@
package eu.dnetlib.dhp.oozie;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkHiveSession;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import org.apache.commons.lang3.time.DurationFormatUtils;
import org.apache.commons.text.StringSubstitutor;
import org.apache.spark.SparkConf;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.io.Resources;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
public class RunSQLSparkJob {
private static final Logger log = LoggerFactory.getLogger(RunSQLSparkJob.class);
private final ArgumentApplicationParser parser;
public RunSQLSparkJob(ArgumentApplicationParser parser) {
this.parser = parser;
}
public static void main(String[] args) throws Exception {
Map<String, String> params = new HashMap<>();
for (int i = 0; i < args.length - 1; i++) {
if (args[i].startsWith("--")) {
params.put(args[i].substring(2), args[++i]);
}
}
/*
* String jsonConfiguration = IOUtils .toString( Objects .requireNonNull( RunSQLSparkJob.class
* .getResourceAsStream( "/eu/dnetlib/dhp/oozie/run_sql_parameters.json"))); final ArgumentApplicationParser
* parser = new ArgumentApplicationParser(jsonConfiguration); parser.parseArgument(args);
*/
Boolean isSparkSessionManaged = Optional
.ofNullable(params.get("isSparkSessionManaged"))
.map(Boolean::valueOf)
.orElse(Boolean.TRUE);
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
URL url = com.google.common.io.Resources.getResource(params.get("sql"));
String raw_sql = Resources.toString(url, StandardCharsets.UTF_8);
String sql = StringSubstitutor.replace(raw_sql, params);
log.info("sql: {}", sql);
SparkConf conf = new SparkConf();
conf.set("hive.metastore.uris", params.get("hiveMetastoreUris"));
runWithSparkHiveSession(
conf,
isSparkSessionManaged,
spark -> {
for (String statement : sql.split(";\\s*/\\*\\s*EOS\\s*\\*/\\s*")) {
log.info("executing: {}", statement);
long startTime = System.currentTimeMillis();
spark.sql(statement).show();
log
.info(
"executed in {}",
DurationFormatUtils.formatDuration(System.currentTimeMillis() - startTime, "HH:mm:ss.S"));
}
});
}
}

View File

@ -0,0 +1,20 @@
[
{
"paramName": "issm",
"paramLongName": "isSparkSessionManaged",
"paramDescription": "when true will stop SparkSession after job execution",
"paramRequired": false
},
{
"paramName": "hmu",
"paramLongName": "hiveMetastoreUris",
"paramDescription": "the hive metastore uris",
"paramRequired": true
},
{
"paramName": "sql",
"paramLongName": "sql",
"paramDescription": "sql script to execute",
"paramRequired": true
}
]

View File

@ -1,6 +1,16 @@
package eu.dnetlib.dhp.oa.dedup; package eu.dnetlib.dhp.oa.dedup;
import java.util.*;
import java.util.stream.Stream;
import org.apache.commons.beanutils.BeanUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.api.java.function.ReduceFunction;
import org.apache.spark.sql.*;
import eu.dnetlib.dhp.oa.dedup.model.Identifier; import eu.dnetlib.dhp.oa.dedup.model.Identifier;
import eu.dnetlib.dhp.oa.merge.AuthorMerger; import eu.dnetlib.dhp.oa.merge.AuthorMerger;
import eu.dnetlib.dhp.schema.common.ModelSupport; import eu.dnetlib.dhp.schema.common.ModelSupport;
@ -8,180 +18,176 @@ import eu.dnetlib.dhp.schema.oaf.Author;
import eu.dnetlib.dhp.schema.oaf.DataInfo; import eu.dnetlib.dhp.schema.oaf.DataInfo;
import eu.dnetlib.dhp.schema.oaf.OafEntity; import eu.dnetlib.dhp.schema.oaf.OafEntity;
import eu.dnetlib.dhp.schema.oaf.Result; import eu.dnetlib.dhp.schema.oaf.Result;
import org.apache.commons.beanutils.BeanUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.api.java.function.ReduceFunction;
import org.apache.spark.sql.*;
import scala.Tuple2; import scala.Tuple2;
import scala.Tuple3; import scala.Tuple3;
import scala.collection.JavaConversions; import scala.collection.JavaConversions;
import java.util.*;
import java.util.stream.Stream;
public class DedupRecordFactory { public class DedupRecordFactory {
public static final class DedupRecordReduceState { public static final class DedupRecordReduceState {
public final String dedupId; public final String dedupId;
public final ArrayList<String> aliases = new ArrayList<>(); public final ArrayList<String> aliases = new ArrayList<>();
public final HashSet<String> acceptanceDate = new HashSet<>(); public final HashSet<String> acceptanceDate = new HashSet<>();
public OafEntity entity; public OafEntity entity;
public DedupRecordReduceState(String dedupId, String id, OafEntity entity) { public DedupRecordReduceState(String dedupId, String id, OafEntity entity) {
this.dedupId = dedupId; this.dedupId = dedupId;
this.entity = entity; this.entity = entity;
if (entity == null) { if (entity == null) {
aliases.add(id); aliases.add(id);
} else { } else {
if (Result.class.isAssignableFrom(entity.getClass())) { if (Result.class.isAssignableFrom(entity.getClass())) {
Result result = (Result) entity; Result result = (Result) entity;
if (result.getDateofacceptance() != null && StringUtils.isNotBlank(result.getDateofacceptance().getValue())) { if (result.getDateofacceptance() != null
acceptanceDate.add(result.getDateofacceptance().getValue()); && StringUtils.isNotBlank(result.getDateofacceptance().getValue())) {
} acceptanceDate.add(result.getDateofacceptance().getValue());
} }
} }
} }
}
public String getDedupId() { public String getDedupId() {
return dedupId; return dedupId;
} }
} }
private static final int MAX_ACCEPTANCE_DATE = 20;
private DedupRecordFactory() { private static final int MAX_ACCEPTANCE_DATE = 20;
}
public static Dataset<OafEntity> createDedupRecord( private DedupRecordFactory() {
final SparkSession spark, }
final DataInfo dataInfo,
final String mergeRelsInputPath,
final String entitiesInputPath,
final Class<OafEntity> clazz) {
final long ts = System.currentTimeMillis(); public static Dataset<OafEntity> createDedupRecord(
final Encoder<OafEntity> beanEncoder = Encoders.bean(clazz); final SparkSession spark,
final Encoder<OafEntity> kryoEncoder = Encoders.kryo(clazz); final DataInfo dataInfo,
final String mergeRelsInputPath,
final String entitiesInputPath,
final Class<OafEntity> clazz) {
// <id, json_entity> final long ts = System.currentTimeMillis();
Dataset<Row> entities = spark final Encoder<OafEntity> beanEncoder = Encoders.bean(clazz);
.read() final Encoder<OafEntity> kryoEncoder = Encoders.kryo(clazz);
.schema(Encoders.bean(clazz).schema())
.json(entitiesInputPath)
.as(beanEncoder)
.map(
(MapFunction<OafEntity, Tuple2<String, OafEntity>>) entity -> {
return new Tuple2<>(entity.getId(), entity);
},
Encoders.tuple(Encoders.STRING(), kryoEncoder))
.selectExpr("_1 AS id", "_2 AS kryoObject");
// <source, target>: source is the dedup_id, target is the id of the mergedIn // <id, json_entity>
Dataset<Row> mergeRels = spark Dataset<Row> entities = spark
.read() .read()
.load(mergeRelsInputPath) .schema(Encoders.bean(clazz).schema())
.where("relClass == 'merges'") .json(entitiesInputPath)
.selectExpr("source as dedupId", "target as id"); .as(beanEncoder)
.map(
(MapFunction<OafEntity, Tuple2<String, OafEntity>>) entity -> {
return new Tuple2<>(entity.getId(), entity);
},
Encoders.tuple(Encoders.STRING(), kryoEncoder))
.selectExpr("_1 AS id", "_2 AS kryoObject");
return mergeRels // <source, target>: source is the dedup_id, target is the id of the mergedIn
.join(entities, JavaConversions.asScalaBuffer(Collections.singletonList("id")), "left") Dataset<Row> mergeRels = spark
.select("dedupId", "id", "kryoObject") .read()
.as(Encoders.tuple(Encoders.STRING(), Encoders.STRING(), kryoEncoder)) .load(mergeRelsInputPath)
.map((MapFunction<Tuple3<String, String, OafEntity>, DedupRecordReduceState>) t -> new DedupRecordReduceState(t._1(), t._2(), t._3()), Encoders.kryo(DedupRecordReduceState.class)) .where("relClass == 'merges'")
.groupByKey((MapFunction<DedupRecordReduceState, String>) DedupRecordReduceState::getDedupId, Encoders.STRING()) .selectExpr("source as dedupId", "target as id");
.reduceGroups(
(ReduceFunction<DedupRecordReduceState>) (t1, t2) -> {
if (t1.entity == null) {
t2.aliases.addAll(t1.aliases);
return t2;
}
if (t1.acceptanceDate.size() < MAX_ACCEPTANCE_DATE) {
t1.acceptanceDate.addAll(t2.acceptanceDate);
}
t1.aliases.addAll(t2.aliases);
t1.entity = reduceEntity(t1.entity, t2.entity);
return t1; return mergeRels
} .join(entities, JavaConversions.asScalaBuffer(Collections.singletonList("id")), "left")
) .select("dedupId", "id", "kryoObject")
.flatMap .as(Encoders.tuple(Encoders.STRING(), Encoders.STRING(), kryoEncoder))
((FlatMapFunction<Tuple2<String, DedupRecordReduceState>, OafEntity>) t -> { .map(
String dedupId = t._1(); (MapFunction<Tuple3<String, String, OafEntity>, DedupRecordReduceState>) t -> new DedupRecordReduceState(
DedupRecordReduceState agg = t._2(); t._1(), t._2(), t._3()),
Encoders.kryo(DedupRecordReduceState.class))
.groupByKey(
(MapFunction<DedupRecordReduceState, String>) DedupRecordReduceState::getDedupId, Encoders.STRING())
.reduceGroups(
(ReduceFunction<DedupRecordReduceState>) (t1, t2) -> {
if (t1.entity == null) {
t2.aliases.addAll(t1.aliases);
return t2;
}
if (t1.acceptanceDate.size() < MAX_ACCEPTANCE_DATE) {
t1.acceptanceDate.addAll(t2.acceptanceDate);
}
t1.aliases.addAll(t2.aliases);
t1.entity = reduceEntity(t1.entity, t2.entity);
if (agg.acceptanceDate.size() >= MAX_ACCEPTANCE_DATE) { return t1;
return Collections.emptyIterator(); })
} .flatMap((FlatMapFunction<Tuple2<String, DedupRecordReduceState>, OafEntity>) t -> {
String dedupId = t._1();
DedupRecordReduceState agg = t._2();
return Stream.concat(Stream.of(agg.getDedupId()), agg.aliases.stream()) if (agg.acceptanceDate.size() >= MAX_ACCEPTANCE_DATE) {
.map(id -> { return Collections.emptyIterator();
try { }
OafEntity res = (OafEntity) BeanUtils.cloneBean(agg.entity);
res.setId(id);
res.setDataInfo(dataInfo);
res.setLastupdatetimestamp(ts);
return res;
} catch (Exception e) {
throw new RuntimeException(e);
}
}).iterator();
}, beanEncoder);
}
private static OafEntity reduceEntity(OafEntity entity, OafEntity duplicate) { return Stream
.concat(Stream.of(agg.getDedupId()), agg.aliases.stream())
.map(id -> {
try {
OafEntity res = (OafEntity) BeanUtils.cloneBean(agg.entity);
res.setId(id);
res.setDataInfo(dataInfo);
res.setLastupdatetimestamp(ts);
return res;
} catch (Exception e) {
throw new RuntimeException(e);
}
})
.iterator();
}, beanEncoder);
}
private static OafEntity reduceEntity(OafEntity entity, OafEntity duplicate) {
if (duplicate == null) { if (duplicate == null) {
return entity; return entity;
} }
int compare = new IdentifierComparator<>()
.compare(Identifier.newInstance(entity), Identifier.newInstance(duplicate));
int compare = new IdentifierComparator<>() if (compare > 0) {
.compare(Identifier.newInstance(entity), Identifier.newInstance(duplicate));
if (compare > 0) {
OafEntity swap = duplicate; OafEntity swap = duplicate;
duplicate = entity; duplicate = entity;
entity = swap; entity = swap;
} }
entity.mergeFrom(duplicate); entity.mergeFrom(duplicate);
if (ModelSupport.isSubClass(duplicate, Result.class)) { if (ModelSupport.isSubClass(duplicate, Result.class)) {
Result re = (Result) entity; Result re = (Result) entity;
Result rd = (Result) duplicate; Result rd = (Result) duplicate;
List<List<Author>> authors = new ArrayList<>(); List<List<Author>> authors = new ArrayList<>();
if (re.getAuthor() != null) { if (re.getAuthor() != null) {
authors.add(re.getAuthor()); authors.add(re.getAuthor());
} }
if (rd.getAuthor() != null) { if (rd.getAuthor() != null) {
authors.add(rd.getAuthor()); authors.add(rd.getAuthor());
} }
re.setAuthor(AuthorMerger.merge(authors)); re.setAuthor(AuthorMerger.merge(authors));
} }
return entity; return entity;
} }
public static <T extends OafEntity> T entityMerger( public static <T extends OafEntity> T entityMerger(
String id, Iterator<Tuple2<String, T>> entities, long ts, DataInfo dataInfo, Class<T> clazz) { String id, Iterator<Tuple2<String, T>> entities, long ts, DataInfo dataInfo, Class<T> clazz) {
T base = entities.next()._2(); T base = entities.next()._2();
while (entities.hasNext()) { while (entities.hasNext()) {
T duplicate = entities.next()._2(); T duplicate = entities.next()._2();
if (duplicate != null) if (duplicate != null)
base = (T) reduceEntity(base, duplicate); base = (T) reduceEntity(base, duplicate);
} }
base.setId(id); base.setId(id);
base.setDataInfo(dataInfo); base.setDataInfo(dataInfo);
base.setLastupdatetimestamp(ts); base.setLastupdatetimestamp(ts);
return base; return base;
} }
} }

View File

@ -242,13 +242,14 @@ public class SparkCreateMergeRels extends AbstractSparkAction {
// this was a pivot in a previous graph but it has been merged into a new group with different // this was a pivot in a previous graph but it has been merged into a new group with different
// pivot // pivot
if (!r.isNullAt(r.fieldIndex("lastUsage")) && !pivot.equals(id) && !dedupId.equals(pivotDedupId)) { if (!r.isNullAt(r.fieldIndex("lastUsage")) && !pivot.equals(id)
&& !dedupId.equals(pivotDedupId)) {
// materialize the previous dedup record as a merge relation with the new one // materialize the previous dedup record as a merge relation with the new one
res.add(new Tuple3<>(dedupId, pivotDedupId, null)); res.add(new Tuple3<>(dedupId, pivotDedupId, null));
} }
// add merge relations // add merge relations
if (cut <=0 || r.<Integer>getAs("position") <= cut) { if (cut <= 0 || r.<Integer> getAs("position") <= cut) {
res.add(new Tuple3<>(id, pivotDedupId, pivot)); res.add(new Tuple3<>(id, pivotDedupId, pivot));
} }

View File

@ -0,0 +1,26 @@
<configuration>
<property>
<name>jobTracker</name>
<value>yarnRM</value>
</property>
<property>
<name>nameNode</name>
<value>hdfs://nameservice1</value>
</property>
<property>
<name>oozie.use.system.libpath</name>
<value>true</value>
</property>
<property>
<name>oozie.action.sharelib.for.spark</name>
<value>spark2</value>
</property>
<property>
<name>hiveMetastoreUris</name>
<value>thrift://iis-cdh5-test-m3.ocean.icm.edu.pl:9083</value>
</property>
<property>
<name>sparkSqlWarehouseDir</name>
<value>/user/hive/warehouse</value>
</property>
</configuration>

View File

@ -0,0 +1,62 @@
CREATE TABLE `${pivot_history_db}`.`dataset_new` STORED AS PARQUET AS
WITH pivots (
SELECT property.value AS id, '${new_graph_date}' AS usedIn FROM `${new_graph_db}`.`relation`
LEFT SEMI JOIN `${new_graph_db}`.`dataset` ON relation.source = dataset.id
LATERAL VIEW EXPLODE(properties) AS property WHERE relClass = 'isMergedIn' AND property.key = 'pivot'
UNION
SELECT id, usedIn FROM `${pivot_history_db}`.`dataset` LATERAL VIEW EXPLODE(usages) AS usedIn
)
SELECT id, min(usedIn) as firstUsage, max(usedIn) as lastUsage, collect_set(usedIn) as usages
FROM pivots
GROUP BY id; /*EOS*/
CREATE TABLE `${pivot_history_db}`.`publication_new` STORED AS PARQUET AS
WITH pivots (
SELECT property.value AS id, '${new_graph_date}' AS usedIn FROM `${new_graph_db}`.`relation`
LEFT SEMI JOIN `${new_graph_db}`.`publication` ON relation.source = publication.id
LATERAL VIEW EXPLODE(properties) AS property WHERE relClass = 'isMergedIn' AND property.key = 'pivot'
UNION
SELECT id, usedIn FROM `${pivot_history_db}`.`publication` LATERAL VIEW EXPLODE(usages) AS usedIn
)
SELECT id, min(usedIn) as firstUsage, max(usedIn) as lastUsage, collect_set(usedIn) as usages
FROM pivots
GROUP BY id; /*EOS*/
CREATE TABLE `${pivot_history_db}`.`software_new` STORED AS PARQUET AS
WITH pivots (
SELECT property.value AS id, '${new_graph_date}' AS usedIn FROM `${new_graph_db}`.`relation`
LEFT SEMI JOIN `${new_graph_db}`.`software` ON relation.source = software.id
LATERAL VIEW EXPLODE(properties) AS property WHERE relClass = 'isMergedIn' AND property.key = 'pivot'
UNION
SELECT id, usedIn FROM `${pivot_history_db}`.`software` LATERAL VIEW EXPLODE(usages) AS usedIn
)
SELECT id, min(usedIn) as firstUsage, max(usedIn) as lastUsage, collect_set(usedIn) as usages
FROM pivots
GROUP BY id; /*EOS*/
CREATE TABLE `${pivot_history_db}`.`otherresearchproduct_new` STORED AS PARQUET AS
WITH pivots (
SELECT property.value AS id, '${new_graph_date}' AS usedIn FROM `${new_graph_db}`.`relation`
LEFT SEMI JOIN `${new_graph_db}`.`otherresearchproduct` ON relation.source = otherresearchproduct.id
LATERAL VIEW EXPLODE(properties) AS property WHERE relClass = 'isMergedIn' AND property.key = 'pivot'
UNION
SELECT id, usedIn FROM `${pivot_history_db}`.`otherresearchproduct` LATERAL VIEW EXPLODE(usages) AS usedIn
)
SELECT id, min(usedIn) as firstUsage, max(usedIn) as lastUsage, collect_set(usedIn) as usages
FROM pivots
GROUP BY id; /*EOS*/
DROP TABLE IF EXISTS `${pivot_history_db}`.`dataset_old`; /*EOS*/
ALTER TABLE `${pivot_history_db}`.`dataset` RENAME TO `${pivot_history_db}`.`dataset_old`; /*EOS*/
ALTER TABLE `${pivot_history_db}`.`dataset_new` RENAME TO `${pivot_history_db}`.`dataset`; /*EOS*/
DROP TABLE IF EXISTS `${pivot_history_db}`.`publication_old`; /*EOS*/
ALTER TABLE `${pivot_history_db}`.`publication` RENAME TO `${pivot_history_db}`.`publication_old`; /*EOS*/
ALTER TABLE `${pivot_history_db}`.`publication_new` RENAME TO `${pivot_history_db}`.`publication`; /*EOS*/
DROP TABLE IF EXISTS `${pivot_history_db}`.`software_old`; /*EOS*/
ALTER TABLE `${pivot_history_db}`.`software` RENAME TO `${pivot_history_db}`.`software_old`; /*EOS*/
ALTER TABLE `${pivot_history_db}`.`software_new` RENAME TO `${pivot_history_db}`.`software`; /*EOS*/
DROP TABLE IF EXISTS `${pivot_history_db}`.`otherresearchproduct_old`; /*EOS*/
ALTER TABLE `${pivot_history_db}`.`otherresearchproduct` RENAME TO `${pivot_history_db}`.`otherresearchproduct_old`; /*EOS*/
ALTER TABLE `${pivot_history_db}`.`otherresearchproduct_new` RENAME TO `${pivot_history_db}`.`otherresearchproduct`; /*EOS*/

View File

@ -0,0 +1,95 @@
<workflow-app name="Update pivot history" xmlns="uri:oozie:workflow:0.5">
<parameters>
<!-- properties used in SQL -->
<property>
<name>pivot_history_db</name>
<!-- <value>openaire_beta_pivots_test</value> -->
<description>Pivot history DB on hive</description>
</property>
<property>
<name>new_graph_db</name>
<!--<value>openaire_beta_20231208</value> -->
<description>New graph DB on hive</description>
</property>
<property>
<name>new_graph_date</name>
<!-- <value>20231208</value> -->
<description>Creation date of new graph db</description>
</property>
<!-- RunSQLSparkJob properties -->
<property>
<name>hiveMetastoreUris</name>
<description>hive server metastore URIs</description>
</property>
<property>
<name>sparkSqlWarehouseDir</name>
</property>
<!-- General oozie workflow properties -->
<property>
<name>sparkClusterOpts</name>
<value>--conf spark.network.timeout=600 --conf spark.extraListeners= --conf spark.sql.queryExecutionListeners= --conf spark.yarn.historyServer.address=http://iis-cdh5-test-m3.ocean.icm.edu.pl:18088 --conf spark.eventLog.dir=hdfs://nameservice1/user/spark/applicationHistory</value>
<description>spark cluster-wide options</description>
</property>
<property>
<name>sparkResourceOpts</name>
<value>--executor-memory=3G --conf spark.executor.memoryOverhead=3G --executor-cores=6 --driver-memory=8G --driver-cores=4</value>
<description>spark resource options</description>
</property>
<property>
<name>sparkApplicationOpts</name>
<value>--conf spark.sql.shuffle.partitions=3840</value>
<description>spark resource options</description>
</property>
</parameters>
<global>
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<configuration>
<property>
<name>mapreduce.job.queuename</name>
<value>${queueName}</value>
</property>
<property>
<name>oozie.launcher.mapred.job.queue.name</name>
<value>${oozieLauncherQueueName}</value>
</property>
<property>
<name>oozie.action.sharelib.for.spark</name>
<value>${oozieActionShareLibForSpark2}</value>
</property>
</configuration>
</global>
<start to="UpgradePivotHistory"/>
<kill name="Kill">
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<action name="UpgradePivotHistory">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Upgrade Pivot History</name>
<class>eu.dnetlib.dhp.oozie.RunSQLSparkJob</class>
<jar>dhp-dedup-openaire-${projectVersion}.jar</jar>
<spark-opts>
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
${sparkClusterOpts}
${sparkResourceOpts}
${sparkApplicationOpts}
</spark-opts>
<arg>--hiveMetastoreUris</arg><arg>${hiveMetastoreUris}</arg>
<arg>--sql</arg><arg>eu/dnetlib/dhp/oa/dedup/pivothistory/oozie_app/sql.sql</arg>
<arg>--pivot_history_db</arg><arg>${pivot_history_db}</arg>
<arg>--new_graph_db</arg><arg>${new_graph_db}</arg>
<arg>--new_graph_date</arg><arg>${new_graph_date}</arg>
</spark>
<ok to="End"/>
<error to="Kill"/>
</action>
<end name="End"/>
</workflow-app>

View File

@ -62,8 +62,8 @@ public class XmlConverterJob {
final String outputPath = parser.get("outputPath"); final String outputPath = parser.get("outputPath");
log.info("outputPath: {}", outputPath); log.info("outputPath: {}", outputPath);
final String isLookupUrl = parser.get("isLookupUrl"); final String contextApiBaseUrl = parser.get("contextApiBaseUrl");
log.info("isLookupUrl: {}", isLookupUrl); log.info("contextApiBaseUrl: {}", contextApiBaseUrl);
final SparkConf conf = new SparkConf(); final SparkConf conf = new SparkConf();
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
@ -71,7 +71,7 @@ public class XmlConverterJob {
runWithSparkSession(conf, isSparkSessionManaged, spark -> { runWithSparkSession(conf, isSparkSessionManaged, spark -> {
removeOutputDir(spark, outputPath); removeOutputDir(spark, outputPath);
convertToXml(spark, inputPath, outputPath, ContextMapper.fromIS(isLookupUrl)); convertToXml(spark, inputPath, outputPath, ContextMapper.fromAPI(contextApiBaseUrl));
}); });
} }

View File

@ -1,18 +1,22 @@
package eu.dnetlib.dhp.oa.provision.utils; package eu.dnetlib.dhp.oa.provision.utils;
import java.io.Serializable; import java.io.*;
import java.io.StringReader; import java.net.HttpURLConnection;
import java.net.URL;
import java.util.HashMap; import java.util.HashMap;
import org.dom4j.Document; import org.dom4j.Document;
import org.dom4j.DocumentException; import org.dom4j.DocumentException;
import org.dom4j.Node; import org.dom4j.Node;
import org.dom4j.io.SAXReader; import org.dom4j.io.SAXReader;
import org.jetbrains.annotations.NotNull;
import org.xml.sax.SAXException; import org.xml.sax.SAXException;
import com.google.common.base.Joiner; import com.google.common.base.Joiner;
import eu.dnetlib.dhp.common.api.context.*;
import eu.dnetlib.dhp.common.rest.DNetRestClient;
import eu.dnetlib.dhp.utils.ISLookupClientFactory; import eu.dnetlib.dhp.utils.ISLookupClientFactory;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException; import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
@ -23,6 +27,45 @@ public class ContextMapper extends HashMap<String, ContextDef> implements Serial
private static final String XQUERY = "for $x in //RESOURCE_PROFILE[.//RESOURCE_TYPE/@value='ContextDSResourceType']//*[name()='context' or name()='category' or name()='concept'] return <entry id=\"{$x/@id}\" label=\"{$x/@label|$x/@name}\" name=\"{$x/name()}\" type=\"{$x/@type}\"/>"; private static final String XQUERY = "for $x in //RESOURCE_PROFILE[.//RESOURCE_TYPE/@value='ContextDSResourceType']//*[name()='context' or name()='category' or name()='concept'] return <entry id=\"{$x/@id}\" label=\"{$x/@label|$x/@name}\" name=\"{$x/name()}\" type=\"{$x/@type}\"/>";
public static ContextMapper fromAPI(final String baseURL) throws Exception {
final ContextMapper contextMapper = new ContextMapper();
for (ContextSummary ctx : DNetRestClient
.doGET(String.format("%s/contexts", baseURL), ContextSummaryList.class)) {
contextMapper.put(ctx.getId(), new ContextDef(ctx.getId(), ctx.getLabel(), "context", ctx.getType()));
for (CategorySummary cat : DNetRestClient
.doGET(String.format("%s/context/%s?all=true", baseURL, ctx.getId()), CategorySummaryList.class)) {
contextMapper.put(cat.getId(), new ContextDef(cat.getId(), cat.getLabel(), "category", ""));
if (cat.isHasConcept()) {
for (ConceptSummary c : DNetRestClient
.doGET(
String.format("%s/context/category/%s?all=true", baseURL, cat.getId()),
ConceptSummaryList.class)) {
contextMapper.put(c.getId(), new ContextDef(c.getId(), c.getLabel(), "concept", ""));
if (c.isHasSubConcept()) {
for (ConceptSummary cs : c.getConcepts()) {
contextMapper.put(cs.getId(), new ContextDef(cs.getId(), cs.getLabel(), "concept", ""));
if (cs.isHasSubConcept()) {
for (ConceptSummary css : cs.getConcepts()) {
contextMapper
.put(
css.getId(),
new ContextDef(css.getId(), css.getLabel(), "concept", ""));
}
}
}
}
}
}
}
}
return contextMapper;
}
@Deprecated
public static ContextMapper fromIS(final String isLookupUrl) public static ContextMapper fromIS(final String isLookupUrl)
throws DocumentException, ISLookUpException, SAXException { throws DocumentException, ISLookUpException, SAXException {
ISLookUpService isLookUp = ISLookupClientFactory.getLookUpService(isLookupUrl); ISLookUpService isLookUp = ISLookupClientFactory.getLookUpService(isLookupUrl);
@ -32,6 +75,7 @@ public class ContextMapper extends HashMap<String, ContextDef> implements Serial
return fromXml(sb.toString()); return fromXml(sb.toString());
} }
@Deprecated
public static ContextMapper fromXml(final String xml) throws DocumentException, SAXException { public static ContextMapper fromXml(final String xml) throws DocumentException, SAXException {
final ContextMapper contextMapper = new ContextMapper(); final ContextMapper contextMapper = new ContextMapper();

View File

@ -12,9 +12,9 @@
"paramRequired": true "paramRequired": true
}, },
{ {
"paramName": "ilu", "paramName": "cau",
"paramLongName": "isLookupUrl", "paramLongName": "contextApiBaseUrl",
"paramDescription": "URL of the isLookUp Service", "paramDescription": "URL of the context API",
"paramRequired": true "paramRequired": true
} }
] ]

View File

@ -9,6 +9,10 @@
<name>isLookupUrl</name> <name>isLookupUrl</name>
<description>URL for the isLookup service</description> <description>URL for the isLookup service</description>
</property> </property>
<property>
<name>contextApiBaseUrl</name>
<description>context API URL</description>
</property>
<property> <property>
<name>relPartitions</name> <name>relPartitions</name>
<description>number or partitions for the relations Dataset</description> <description>number or partitions for the relations Dataset</description>
@ -589,7 +593,7 @@
</spark-opts> </spark-opts>
<arg>--inputPath</arg><arg>${workingDir}/join_entities</arg> <arg>--inputPath</arg><arg>${workingDir}/join_entities</arg>
<arg>--outputPath</arg><arg>${workingDir}/xml</arg> <arg>--outputPath</arg><arg>${workingDir}/xml</arg>
<arg>--isLookupUrl</arg><arg>${isLookupUrl}</arg> <arg>--contextApiBaseUrl</arg><arg>${contextApiBaseUrl}</arg>
</spark> </spark>
<ok to="should_index"/> <ok to="should_index"/>
<error to="Kill"/> <error to="Kill"/>

View File

@ -78,6 +78,10 @@
<name>hive.txn.timeout</name> <name>hive.txn.timeout</name>
<value>${hive_timeout}</value> <value>${hive_timeout}</value>
</property> </property>
<property>
<name>hive.mapjoin.followby.gby.localtask.max.memory.usage</name>
<value>0.80</value>
</property>
<property> <property>
<name>mapred.job.queue.name</name> <name>mapred.job.queue.name</name>
<value>analytics</value> <value>analytics</value>