1
0
Fork 0

Compare commits

...

34 Commits

Author SHA1 Message Date
Lampros Smyrnaios b5f4d37827 Merge branch 'beta' of https://code-repo.d4science.org/lsmyrnaios/dnet-hadoop into continuous_validation2 2024-01-31 13:01:42 +02:00
Alessia Bardi f2a08d8cc2 test for Italian records from IRS repositories 2024-01-30 19:20:14 +01:00
Miriam Baglioni a5995ab557 [orcid-enrichment] change the value of parameters. 2024-01-29 18:19:48 +01:00
Claudio Atzori f804c58bc7 Merge pull request 'Use SparkSQL in place of Hive for executing step16-createIndicatorsTables.sql of stats update wf' (#386) from stats_with_spark_sql into beta
Reviewed-on: D-Net/dnet-hadoop#386
2024-01-29 09:11:59 +01:00
Claudio Atzori 926903b06b Merge branch 'beta' into stats_with_spark_sql 2024-01-29 09:11:45 +01:00
Giambattista Bloisi 078df0b4d1 Use SparkSQL in place of Hive for executing step16-createIndicatorsTables.sql of stats update wf 2024-01-26 21:56:55 +01:00
Claudio Atzori bf99c424fa Merge pull request 'Fixed problem on missing author in crossref Mapping' (#383) from crossref_missing_author_fix into beta
Reviewed-on: D-Net/dnet-hadoop#383
2024-01-26 15:57:23 +01:00
Claudio Atzori ce3200263e Merge branch 'beta' into crossref_missing_author_fix 2024-01-26 15:57:04 +01:00
Sandro La Bruzzo e889808daa Fixed problem on missing author in crossref Mapping 2024-01-26 12:19:04 +01:00
Claudio Atzori 9e8fc6aa88 [collection] increased logging from the oai-pmh metadata collection process 2024-01-26 09:17:20 +01:00
Antonis Lempesis a7115cfa9e max mem of joins (hive.mapjoin.followby.gby.localtask.max.memory.usage) now 80%, up from 55%. 2024-01-25 15:13:16 +01:00
Claudio Atzori 2838a9b630 Update 'CONTRIBUTING.md' 2024-01-24 16:07:05 +01:00
Claudio Atzori da944a5c55 Merge pull request 'code of conduct and contributing' (#382) from contributing into beta
Reviewed-on: D-Net/dnet-hadoop#382
2024-01-24 15:40:26 +01:00
Claudio Atzori 0c97a3a81a minor 2024-01-24 10:56:33 +01:00
Claudio Atzori 2c1e6849f0 added code of conduct and contributing files 2024-01-24 10:36:41 +01:00
Claudio Atzori 9b13c22e5d [graph provision] retrieve all the context information by adding all=true to the requests issued to thr API 2024-01-23 15:36:08 +01:00
Claudio Atzori 3e96777cc4 [collection] increased logging from the oai-pmh metadata collection process 2024-01-23 15:21:03 +01:00
Claudio Atzori 9812406589 Merge pull request '[graph provision] updated param specification for the XML converter job' (#380) from provision_community_api into beta
Reviewed-on: D-Net/dnet-hadoop#380
2024-01-23 08:55:59 +01:00
Claudio Atzori f87f3a6483 [graph provision] updated param specification for the XML converter job 2024-01-23 08:54:37 +01:00
Claudio Atzori 6fd25cf549 code formatting 2024-01-23 08:47:12 +01:00
Claudio Atzori bd187ec6e7 Merge pull request 'Implements pivots table update oozie workflow' (#376) from update_pivots_table into beta
Reviewed-on: D-Net/dnet-hadoop#376
2024-01-22 16:37:30 +01:00
Claudio Atzori f76852f385 Merge branch 'beta' into update_pivots_table 2024-01-22 16:37:22 +01:00
Claudio Atzori b9fcc5ad5e Merge pull request 'Context API update' (#379) from provision_community_api into beta
Reviewed-on: D-Net/dnet-hadoop#379
2024-01-22 15:55:33 +01:00
Claudio Atzori 1c6db320f4 [graph provision] obtain context info from the context API instead from the ISLookUp service 2024-01-22 15:53:17 +01:00
Claudio Atzori 2655eea5bc [orcid enrichment] drop paths before copying the non-modifyed contents 2024-01-19 16:28:05 +01:00
Claudio Atzori c6b3401596 increased shuffle partitions for publications in the country propagation workflow 2024-01-19 10:15:39 +01:00
Miriam Baglioni bcc0a13981 [enrichment single step] adding <end> element in wf definition 2024-01-18 17:39:14 +01:00
Miriam Baglioni 6af536541d [enrichment single step] moving parameter file in correct location 2024-01-18 15:35:40 +01:00
Miriam Baglioni a12a3eb143 - 2024-01-18 15:18:10 +01:00
Claudio Atzori 628fdfb5eb Merge pull request '[enrichment single step]' (#378) from enrichmentSingleStepFixed into beta
Reviewed-on: D-Net/dnet-hadoop#378
2024-01-18 09:41:09 +01:00
Miriam Baglioni 82e9e262ee [enrichment single step] remove parameter from execution 2024-01-17 17:38:03 +01:00
Miriam Baglioni 67ce2d54be [enrichment single step] refactoring to fix issues in disappeared result type 2024-01-17 16:50:00 +01:00
Miriam Baglioni 59eaccbd87 [enrichment single step] refactoring to fix issue in disappeared result type 2024-01-15 17:49:54 +01:00
Giambattista Bloisi 21a14fcd80 Reusable RunSQLSparkJob for executing SQL in Spark through Oozie Spark Actions
Implements pivots table update oozie workflow
2024-01-15 10:18:14 +01:00
52 changed files with 1941 additions and 719 deletions

43
CODE_OF_CONDUCT.md Normal file
View File

@ -0,0 +1,43 @@
# Contributor Code of Conduct
Openness, transparency and our community-driven participatory approach guide us in our day-to-day interactions and decision-making. Our open source projects are no exception. Trust, respect, collaboration and transparency are core values we believe should live and breathe within our projects. Our community welcomes participants from around the world with different experiences, unique perspectives, and great ideas to share.
## Our Pledge
In the interest of fostering an open and welcoming environment, we as contributors and maintainers pledge to making participation in our project and our community a harassment-free experience for everyone, regardless of age, body size, disability, ethnicity, sex characteristics, gender identity and expression, level of experience, education, socio-economic status, nationality, personal appearance, race, religion, or sexual identity and orientation.
## Our Standards
Examples of behavior that contributes to creating a positive environment include:
- Using welcoming and inclusive language
- Being respectful of differing viewpoints and experiences
- Gracefully accepting constructive criticism
- Attempting collaboration before conflict
- Focusing on what is best for the community
- Showing empathy towards other community members
Examples of unacceptable behavior by participants include:
- Violence, threats of violence, or inciting others to commit self-harm
- The use of sexualized language or imagery and unwelcome sexual attention or advances
- Trolling, intentionally spreading misinformation, insulting/derogatory comments, and personal or political attacks
- Public or private harassment
- Publishing others' private information, such as a physical or electronic address, without explicit permission
- Abuse of the reporting process to intentionally harass or exclude others
- Advocating for, or encouraging, any of the above behavior
- Other conduct which could reasonably be considered inappropriate in a professional setting
## Our Responsibilities
Project maintainers are responsible for clarifying the standards of acceptable behavior and are expected to take appropriate and fair corrective action in response to any instances of unacceptable behavior.
Project maintainers have the right and responsibility to remove, edit, or reject comments, commits, code, wiki edits, issues, and other contributions that are not aligned to this Code of Conduct, or to ban temporarily or permanently any contributor for other behaviors that they deem inappropriate, threatening, offensive, or harmful.
## Scope
This Code of Conduct applies both within project spaces and in public spaces when an individual is representing the project or its community. Examples of representing a project or community include using an official project e-mail address, posting via an official social media account, or acting as an appointed representative at an online or offline event. Representation of a project may be further defined and clarified by project maintainers.
## Attribution
This Code of Conduct is adapted from the [Contributor Covenant](https://www.contributor-covenant.org/), [version 1.4](https://www.contributor-covenant.org/version/1/4/code-of-conduct.html).

10
CONTRIBUTING.md Normal file
View File

@ -0,0 +1,10 @@
# Contributing to D-Net Hadoop
:+1::tada: First off, thanks for taking the time to contribute! :tada::+1:
This project and everyone participating in it is governed by our [Code of Conduct](CODE_OF_CONDUCT.md). By participating, you are expected to uphold this code. Please report unacceptable behavior to [dnet-team@isti.cnr.it](mailto:dnet-team@isti.cnr.it).
The following is a set of guidelines for contributing to this project and its packages. These are mostly guidelines, not rules, which applies to this project as a while, including all its sub-modules.
Use your best judgment, and feel free to propose changes to this document in a pull request.
All contributions are welcome, all contributions will be considered to be contributed under the [project license](LICENSE.md).

View File

View File

@ -2,6 +2,11 @@
Dnet-hadoop is the project that defined all the [OOZIE workflows](https://oozie.apache.org/) for the OpenAIRE Graph construction, processing, provisioning.
This project adheres to the Contributor Covenant [code of conduct](CODE_OF_CONDUCT.md).
By participating, you are expected to uphold this code. Please report unacceptable behavior to [dnet-team@isti.cnr.it](mailto:dnet-team@isti.cnr.it).
This project is licensed under the [AGPL v3 or later version](#LICENSE.md).
How to build, package and run oozie workflows
====================

View File

@ -0,0 +1,39 @@
package eu.dnetlib.dhp.common.api.context;
public class CategorySummary {
private String id;
private String label;
private boolean hasConcept;
public String getId() {
return id;
}
public String getLabel() {
return label;
}
public boolean isHasConcept() {
return hasConcept;
}
public CategorySummary setId(final String id) {
this.id = id;
return this;
}
public CategorySummary setLabel(final String label) {
this.label = label;
return this;
}
public CategorySummary setHasConcept(final boolean hasConcept) {
this.hasConcept = hasConcept;
return this;
}
}

View File

@ -0,0 +1,7 @@
package eu.dnetlib.dhp.common.api.context;
import java.util.ArrayList;
public class CategorySummaryList extends ArrayList<CategorySummary> {
}

View File

@ -0,0 +1,52 @@
package eu.dnetlib.dhp.common.api.context;
import java.util.List;
public class ConceptSummary {
private String id;
private String label;
public boolean hasSubConcept;
private List<ConceptSummary> concepts;
public String getId() {
return id;
}
public String getLabel() {
return label;
}
public List<ConceptSummary> getConcepts() {
return concepts;
}
public ConceptSummary setId(final String id) {
this.id = id;
return this;
}
public ConceptSummary setLabel(final String label) {
this.label = label;
return this;
}
public boolean isHasSubConcept() {
return hasSubConcept;
}
public ConceptSummary setHasSubConcept(final boolean hasSubConcept) {
this.hasSubConcept = hasSubConcept;
return this;
}
public ConceptSummary setConcept(final List<ConceptSummary> concepts) {
this.concepts = concepts;
return this;
}
}

View File

@ -0,0 +1,7 @@
package eu.dnetlib.dhp.common.api.context;
import java.util.ArrayList;
public class ConceptSummaryList extends ArrayList<ConceptSummary> {
}

View File

@ -0,0 +1,50 @@
package eu.dnetlib.dhp.common.api.context;
public class ContextSummary {
private String id;
private String label;
private String type;
private String status;
public String getId() {
return id;
}
public String getLabel() {
return label;
}
public String getType() {
return type;
}
public String getStatus() {
return status;
}
public ContextSummary setId(final String id) {
this.id = id;
return this;
}
public ContextSummary setLabel(final String label) {
this.label = label;
return this;
}
public ContextSummary setType(final String type) {
this.type = type;
return this;
}
public ContextSummary setStatus(final String status) {
this.status = status;
return this;
}
}

View File

@ -0,0 +1,7 @@
package eu.dnetlib.dhp.common.api.context;
import java.util.ArrayList;
public class ContextSummaryList extends ArrayList<ContextSummary> {
}

View File

@ -8,10 +8,13 @@ import java.io.InputStream;
import java.net.*;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.math.NumberUtils;
import org.apache.commons.lang3.time.DateUtils;
import org.apache.http.HttpHeaders;
import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -94,14 +97,16 @@ public class HttpConnector2 {
throw new CollectorException(msg);
}
log.info("Request attempt {} [{}]", retryNumber, requestUrl);
InputStream input = null;
long start = System.currentTimeMillis();
try {
if (getClientParams().getRequestDelay() > 0) {
backoffAndSleep(getClientParams().getRequestDelay());
}
log.info("Request attempt {} [{}]", retryNumber, requestUrl);
final HttpURLConnection urlConn = (HttpURLConnection) new URL(requestUrl).openConnection();
urlConn.setInstanceFollowRedirects(false);
urlConn.setReadTimeout(getClientParams().getReadTimeOut() * 1000);
@ -115,9 +120,8 @@ public class HttpConnector2 {
urlConn.addRequestProperty(headerEntry.getKey(), headerEntry.getValue());
}
}
if (log.isDebugEnabled()) {
logHeaderFields(urlConn);
}
logHeaderFields(urlConn);
int retryAfter = obtainRetryAfter(urlConn.getHeaderFields());
String rateLimit = urlConn.getHeaderField(Constants.HTTPHEADER_IETF_DRAFT_RATELIMIT_LIMIT);
@ -132,9 +136,7 @@ public class HttpConnector2 {
}
if (is2xx(urlConn.getResponseCode())) {
input = urlConn.getInputStream();
responseType = urlConn.getContentType();
return input;
return getInputStream(urlConn, start);
}
if (is3xx(urlConn.getResponseCode())) {
// REDIRECTS
@ -144,6 +146,7 @@ public class HttpConnector2 {
.put(
REPORT_PREFIX + urlConn.getResponseCode(),
String.format("Moved to: %s", newUrl));
logRequestTime(start);
urlConn.disconnect();
if (retryAfter > 0) {
backoffAndSleep(retryAfter);
@ -159,26 +162,50 @@ public class HttpConnector2 {
if (retryAfter > 0) {
log
.warn(
"{} - waiting and repeating request after suggested retry-after {} sec.",
requestUrl, retryAfter);
"waiting and repeating request after suggested retry-after {} sec for URL {}",
retryAfter, requestUrl);
backoffAndSleep(retryAfter * 1000);
} else {
log
.warn(
"{} - waiting and repeating request after default delay of {} sec.",
requestUrl, getClientParams().getRetryDelay());
backoffAndSleep(retryNumber * getClientParams().getRetryDelay() * 1000);
"waiting and repeating request after default delay of {} sec for URL {}",
getClientParams().getRetryDelay(), requestUrl);
backoffAndSleep(retryNumber * getClientParams().getRetryDelay());
}
report.put(REPORT_PREFIX + urlConn.getResponseCode(), requestUrl);
logRequestTime(start);
urlConn.disconnect();
return attemptDownload(requestUrl, retryNumber + 1, report);
case 422: // UNPROCESSABLE ENTITY
report.put(REPORT_PREFIX + urlConn.getResponseCode(), requestUrl);
log.warn("waiting and repeating request after 10 sec for URL {}", requestUrl);
backoffAndSleep(10000);
urlConn.disconnect();
logRequestTime(start);
try {
return getInputStream(urlConn, start);
} catch (IOException e) {
log
.error(
"server returned 422 and got IOException accessing the response body from URL {}",
requestUrl);
log.error("IOException:", e);
return attemptDownload(requestUrl, retryNumber + 1, report);
}
default:
log.error("gor error {} from URL: {}", urlConn.getResponseCode(), urlConn.getURL());
log.error("response message: {}", urlConn.getResponseMessage());
report
.put(
REPORT_PREFIX + urlConn.getResponseCode(),
String
.format(
"%s Error: %s", requestUrl, urlConn.getResponseMessage()));
logRequestTime(start);
urlConn.disconnect();
throw new CollectorException(urlConn.getResponseCode() + " error " + report);
}
}
@ -199,13 +226,27 @@ public class HttpConnector2 {
}
}
private InputStream getInputStream(HttpURLConnection urlConn, long start) throws IOException {
InputStream input = urlConn.getInputStream();
responseType = urlConn.getContentType();
logRequestTime(start);
return input;
}
private static void logRequestTime(long start) {
log
.info(
"request time elapsed: {}sec",
TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis() - start));
}
private void logHeaderFields(final HttpURLConnection urlConn) throws IOException {
log.debug("StatusCode: {}", urlConn.getResponseMessage());
log.info("Response: {} - {}", urlConn.getResponseCode(), urlConn.getResponseMessage());
for (Map.Entry<String, List<String>> e : urlConn.getHeaderFields().entrySet()) {
if (e.getKey() != null) {
for (String v : e.getValue()) {
log.debug(" key: {} - value: {}", e.getKey(), v);
log.info(" key: {} - value: {}", e.getKey(), v);
}
}
}
@ -225,7 +266,7 @@ public class HttpConnector2 {
for (String key : headerMap.keySet()) {
if ((key != null) && key.equalsIgnoreCase(HttpHeaders.RETRY_AFTER) && (!headerMap.get(key).isEmpty())
&& NumberUtils.isCreatable(headerMap.get(key).get(0))) {
return Integer.parseInt(headerMap.get(key).get(0)) + 10;
return Integer.parseInt(headerMap.get(key).get(0));
}
}
return -1;

View File

@ -0,0 +1,77 @@
package eu.dnetlib.dhp.oozie;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkHiveSession;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import org.apache.commons.lang3.time.DurationFormatUtils;
import org.apache.commons.text.StringSubstitutor;
import org.apache.spark.SparkConf;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.io.Resources;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
public class RunSQLSparkJob {
private static final Logger log = LoggerFactory.getLogger(RunSQLSparkJob.class);
private final ArgumentApplicationParser parser;
public RunSQLSparkJob(ArgumentApplicationParser parser) {
this.parser = parser;
}
public static void main(String[] args) throws Exception {
Map<String, String> params = new HashMap<>();
for (int i = 0; i < args.length - 1; i++) {
if (args[i].startsWith("--")) {
params.put(args[i].substring(2), args[++i]);
}
}
/*
* String jsonConfiguration = IOUtils .toString( Objects .requireNonNull( RunSQLSparkJob.class
* .getResourceAsStream( "/eu/dnetlib/dhp/oozie/run_sql_parameters.json"))); final ArgumentApplicationParser
* parser = new ArgumentApplicationParser(jsonConfiguration); parser.parseArgument(args);
*/
Boolean isSparkSessionManaged = Optional
.ofNullable(params.get("isSparkSessionManaged"))
.map(Boolean::valueOf)
.orElse(Boolean.TRUE);
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
URL url = com.google.common.io.Resources.getResource(params.get("sql"));
String raw_sql = Resources.toString(url, StandardCharsets.UTF_8);
String sql = StringSubstitutor.replace(raw_sql, params);
log.info("sql: {}", sql);
SparkConf conf = new SparkConf();
conf.set("hive.metastore.uris", params.get("hiveMetastoreUris"));
runWithSparkHiveSession(
conf,
isSparkSessionManaged,
spark -> {
for (String statement : sql.split(";\\s*/\\*\\s*EOS\\s*\\*/\\s*")) {
log.info("executing: {}", statement);
long startTime = System.currentTimeMillis();
spark.sql(statement).show();
log
.info(
"executed in {}",
DurationFormatUtils.formatDuration(System.currentTimeMillis() - startTime, "HH:mm:ss.S"));
}
});
}
}

View File

@ -0,0 +1,20 @@
[
{
"paramName": "issm",
"paramLongName": "isSparkSessionManaged",
"paramDescription": "when true will stop SparkSession after job execution",
"paramRequired": false
},
{
"paramName": "hmu",
"paramLongName": "hiveMetastoreUris",
"paramDescription": "the hive metastore uris",
"paramRequired": true
},
{
"paramName": "sql",
"paramLongName": "sql",
"paramDescription": "sql script to execute",
"paramRequired": true
}
]

View File

@ -1,6 +1,16 @@
package eu.dnetlib.dhp.oa.dedup;
import java.util.*;
import java.util.stream.Stream;
import org.apache.commons.beanutils.BeanUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.api.java.function.ReduceFunction;
import org.apache.spark.sql.*;
import eu.dnetlib.dhp.oa.dedup.model.Identifier;
import eu.dnetlib.dhp.oa.merge.AuthorMerger;
import eu.dnetlib.dhp.schema.common.ModelSupport;
@ -8,180 +18,176 @@ import eu.dnetlib.dhp.schema.oaf.Author;
import eu.dnetlib.dhp.schema.oaf.DataInfo;
import eu.dnetlib.dhp.schema.oaf.OafEntity;
import eu.dnetlib.dhp.schema.oaf.Result;
import org.apache.commons.beanutils.BeanUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.api.java.function.ReduceFunction;
import org.apache.spark.sql.*;
import scala.Tuple2;
import scala.Tuple3;
import scala.collection.JavaConversions;
import java.util.*;
import java.util.stream.Stream;
public class DedupRecordFactory {
public static final class DedupRecordReduceState {
public final String dedupId;
public static final class DedupRecordReduceState {
public final String dedupId;
public final ArrayList<String> aliases = new ArrayList<>();
public final ArrayList<String> aliases = new ArrayList<>();
public final HashSet<String> acceptanceDate = new HashSet<>();
public final HashSet<String> acceptanceDate = new HashSet<>();
public OafEntity entity;
public OafEntity entity;
public DedupRecordReduceState(String dedupId, String id, OafEntity entity) {
this.dedupId = dedupId;
this.entity = entity;
if (entity == null) {
aliases.add(id);
} else {
if (Result.class.isAssignableFrom(entity.getClass())) {
Result result = (Result) entity;
if (result.getDateofacceptance() != null && StringUtils.isNotBlank(result.getDateofacceptance().getValue())) {
acceptanceDate.add(result.getDateofacceptance().getValue());
}
}
}
}
public DedupRecordReduceState(String dedupId, String id, OafEntity entity) {
this.dedupId = dedupId;
this.entity = entity;
if (entity == null) {
aliases.add(id);
} else {
if (Result.class.isAssignableFrom(entity.getClass())) {
Result result = (Result) entity;
if (result.getDateofacceptance() != null
&& StringUtils.isNotBlank(result.getDateofacceptance().getValue())) {
acceptanceDate.add(result.getDateofacceptance().getValue());
}
}
}
}
public String getDedupId() {
return dedupId;
}
}
private static final int MAX_ACCEPTANCE_DATE = 20;
public String getDedupId() {
return dedupId;
}
}
private DedupRecordFactory() {
}
private static final int MAX_ACCEPTANCE_DATE = 20;
public static Dataset<OafEntity> createDedupRecord(
final SparkSession spark,
final DataInfo dataInfo,
final String mergeRelsInputPath,
final String entitiesInputPath,
final Class<OafEntity> clazz) {
private DedupRecordFactory() {
}
final long ts = System.currentTimeMillis();
final Encoder<OafEntity> beanEncoder = Encoders.bean(clazz);
final Encoder<OafEntity> kryoEncoder = Encoders.kryo(clazz);
public static Dataset<OafEntity> createDedupRecord(
final SparkSession spark,
final DataInfo dataInfo,
final String mergeRelsInputPath,
final String entitiesInputPath,
final Class<OafEntity> clazz) {
// <id, json_entity>
Dataset<Row> entities = spark
.read()
.schema(Encoders.bean(clazz).schema())
.json(entitiesInputPath)
.as(beanEncoder)
.map(
(MapFunction<OafEntity, Tuple2<String, OafEntity>>) entity -> {
return new Tuple2<>(entity.getId(), entity);
},
Encoders.tuple(Encoders.STRING(), kryoEncoder))
.selectExpr("_1 AS id", "_2 AS kryoObject");
final long ts = System.currentTimeMillis();
final Encoder<OafEntity> beanEncoder = Encoders.bean(clazz);
final Encoder<OafEntity> kryoEncoder = Encoders.kryo(clazz);
// <source, target>: source is the dedup_id, target is the id of the mergedIn
Dataset<Row> mergeRels = spark
.read()
.load(mergeRelsInputPath)
.where("relClass == 'merges'")
.selectExpr("source as dedupId", "target as id");
// <id, json_entity>
Dataset<Row> entities = spark
.read()
.schema(Encoders.bean(clazz).schema())
.json(entitiesInputPath)
.as(beanEncoder)
.map(
(MapFunction<OafEntity, Tuple2<String, OafEntity>>) entity -> {
return new Tuple2<>(entity.getId(), entity);
},
Encoders.tuple(Encoders.STRING(), kryoEncoder))
.selectExpr("_1 AS id", "_2 AS kryoObject");
return mergeRels
.join(entities, JavaConversions.asScalaBuffer(Collections.singletonList("id")), "left")
.select("dedupId", "id", "kryoObject")
.as(Encoders.tuple(Encoders.STRING(), Encoders.STRING(), kryoEncoder))
.map((MapFunction<Tuple3<String, String, OafEntity>, DedupRecordReduceState>) t -> new DedupRecordReduceState(t._1(), t._2(), t._3()), Encoders.kryo(DedupRecordReduceState.class))
.groupByKey((MapFunction<DedupRecordReduceState, String>) DedupRecordReduceState::getDedupId, Encoders.STRING())
.reduceGroups(
(ReduceFunction<DedupRecordReduceState>) (t1, t2) -> {
if (t1.entity == null) {
t2.aliases.addAll(t1.aliases);
return t2;
}
if (t1.acceptanceDate.size() < MAX_ACCEPTANCE_DATE) {
t1.acceptanceDate.addAll(t2.acceptanceDate);
}
t1.aliases.addAll(t2.aliases);
t1.entity = reduceEntity(t1.entity, t2.entity);
// <source, target>: source is the dedup_id, target is the id of the mergedIn
Dataset<Row> mergeRels = spark
.read()
.load(mergeRelsInputPath)
.where("relClass == 'merges'")
.selectExpr("source as dedupId", "target as id");
return t1;
}
)
.flatMap
((FlatMapFunction<Tuple2<String, DedupRecordReduceState>, OafEntity>) t -> {
String dedupId = t._1();
DedupRecordReduceState agg = t._2();
return mergeRels
.join(entities, JavaConversions.asScalaBuffer(Collections.singletonList("id")), "left")
.select("dedupId", "id", "kryoObject")
.as(Encoders.tuple(Encoders.STRING(), Encoders.STRING(), kryoEncoder))
.map(
(MapFunction<Tuple3<String, String, OafEntity>, DedupRecordReduceState>) t -> new DedupRecordReduceState(
t._1(), t._2(), t._3()),
Encoders.kryo(DedupRecordReduceState.class))
.groupByKey(
(MapFunction<DedupRecordReduceState, String>) DedupRecordReduceState::getDedupId, Encoders.STRING())
.reduceGroups(
(ReduceFunction<DedupRecordReduceState>) (t1, t2) -> {
if (t1.entity == null) {
t2.aliases.addAll(t1.aliases);
return t2;
}
if (t1.acceptanceDate.size() < MAX_ACCEPTANCE_DATE) {
t1.acceptanceDate.addAll(t2.acceptanceDate);
}
t1.aliases.addAll(t2.aliases);
t1.entity = reduceEntity(t1.entity, t2.entity);
if (agg.acceptanceDate.size() >= MAX_ACCEPTANCE_DATE) {
return Collections.emptyIterator();
}
return t1;
})
.flatMap((FlatMapFunction<Tuple2<String, DedupRecordReduceState>, OafEntity>) t -> {
String dedupId = t._1();
DedupRecordReduceState agg = t._2();
return Stream.concat(Stream.of(agg.getDedupId()), agg.aliases.stream())
.map(id -> {
try {
OafEntity res = (OafEntity) BeanUtils.cloneBean(agg.entity);
res.setId(id);
res.setDataInfo(dataInfo);
res.setLastupdatetimestamp(ts);
return res;
} catch (Exception e) {
throw new RuntimeException(e);
}
}).iterator();
}, beanEncoder);
}
if (agg.acceptanceDate.size() >= MAX_ACCEPTANCE_DATE) {
return Collections.emptyIterator();
}
private static OafEntity reduceEntity(OafEntity entity, OafEntity duplicate) {
return Stream
.concat(Stream.of(agg.getDedupId()), agg.aliases.stream())
.map(id -> {
try {
OafEntity res = (OafEntity) BeanUtils.cloneBean(agg.entity);
res.setId(id);
res.setDataInfo(dataInfo);
res.setLastupdatetimestamp(ts);
return res;
} catch (Exception e) {
throw new RuntimeException(e);
}
})
.iterator();
}, beanEncoder);
}
private static OafEntity reduceEntity(OafEntity entity, OafEntity duplicate) {
if (duplicate == null) {
return entity;
}
int compare = new IdentifierComparator<>()
.compare(Identifier.newInstance(entity), Identifier.newInstance(duplicate));
int compare = new IdentifierComparator<>()
.compare(Identifier.newInstance(entity), Identifier.newInstance(duplicate));
if (compare > 0) {
if (compare > 0) {
OafEntity swap = duplicate;
duplicate = entity;
entity = swap;
}
duplicate = entity;
entity = swap;
}
entity.mergeFrom(duplicate);
entity.mergeFrom(duplicate);
if (ModelSupport.isSubClass(duplicate, Result.class)) {
Result re = (Result) entity;
Result rd = (Result) duplicate;
if (ModelSupport.isSubClass(duplicate, Result.class)) {
Result re = (Result) entity;
Result rd = (Result) duplicate;
List<List<Author>> authors = new ArrayList<>();
if (re.getAuthor() != null) {
authors.add(re.getAuthor());
}
if (rd.getAuthor() != null) {
authors.add(rd.getAuthor());
}
List<List<Author>> authors = new ArrayList<>();
if (re.getAuthor() != null) {
authors.add(re.getAuthor());
}
if (rd.getAuthor() != null) {
authors.add(rd.getAuthor());
}
re.setAuthor(AuthorMerger.merge(authors));
}
re.setAuthor(AuthorMerger.merge(authors));
}
return entity;
}
return entity;
}
public static <T extends OafEntity> T entityMerger(
String id, Iterator<Tuple2<String, T>> entities, long ts, DataInfo dataInfo, Class<T> clazz) {
T base = entities.next()._2();
public static <T extends OafEntity> T entityMerger(
String id, Iterator<Tuple2<String, T>> entities, long ts, DataInfo dataInfo, Class<T> clazz) {
T base = entities.next()._2();
while (entities.hasNext()) {
T duplicate = entities.next()._2();
if (duplicate != null)
base = (T) reduceEntity(base, duplicate);
}
while (entities.hasNext()) {
T duplicate = entities.next()._2();
if (duplicate != null)
base = (T) reduceEntity(base, duplicate);
}
base.setId(id);
base.setDataInfo(dataInfo);
base.setLastupdatetimestamp(ts);
base.setId(id);
base.setDataInfo(dataInfo);
base.setLastupdatetimestamp(ts);
return base;
}
return base;
}
}

View File

@ -242,13 +242,14 @@ public class SparkCreateMergeRels extends AbstractSparkAction {
// this was a pivot in a previous graph but it has been merged into a new group with different
// pivot
if (!r.isNullAt(r.fieldIndex("lastUsage")) && !pivot.equals(id) && !dedupId.equals(pivotDedupId)) {
if (!r.isNullAt(r.fieldIndex("lastUsage")) && !pivot.equals(id)
&& !dedupId.equals(pivotDedupId)) {
// materialize the previous dedup record as a merge relation with the new one
res.add(new Tuple3<>(dedupId, pivotDedupId, null));
}
// add merge relations
if (cut <=0 || r.<Integer>getAs("position") <= cut) {
if (cut <= 0 || r.<Integer> getAs("position") <= cut) {
res.add(new Tuple3<>(id, pivotDedupId, pivot));
}

View File

@ -0,0 +1,26 @@
<configuration>
<property>
<name>jobTracker</name>
<value>yarnRM</value>
</property>
<property>
<name>nameNode</name>
<value>hdfs://nameservice1</value>
</property>
<property>
<name>oozie.use.system.libpath</name>
<value>true</value>
</property>
<property>
<name>oozie.action.sharelib.for.spark</name>
<value>spark2</value>
</property>
<property>
<name>hiveMetastoreUris</name>
<value>thrift://iis-cdh5-test-m3.ocean.icm.edu.pl:9083</value>
</property>
<property>
<name>sparkSqlWarehouseDir</name>
<value>/user/hive/warehouse</value>
</property>
</configuration>

View File

@ -0,0 +1,62 @@
CREATE TABLE `${pivot_history_db}`.`dataset_new` STORED AS PARQUET AS
WITH pivots (
SELECT property.value AS id, '${new_graph_date}' AS usedIn FROM `${new_graph_db}`.`relation`
LEFT SEMI JOIN `${new_graph_db}`.`dataset` ON relation.source = dataset.id
LATERAL VIEW EXPLODE(properties) AS property WHERE relClass = 'isMergedIn' AND property.key = 'pivot'
UNION
SELECT id, usedIn FROM `${pivot_history_db}`.`dataset` LATERAL VIEW EXPLODE(usages) AS usedIn
)
SELECT id, min(usedIn) as firstUsage, max(usedIn) as lastUsage, collect_set(usedIn) as usages
FROM pivots
GROUP BY id; /*EOS*/
CREATE TABLE `${pivot_history_db}`.`publication_new` STORED AS PARQUET AS
WITH pivots (
SELECT property.value AS id, '${new_graph_date}' AS usedIn FROM `${new_graph_db}`.`relation`
LEFT SEMI JOIN `${new_graph_db}`.`publication` ON relation.source = publication.id
LATERAL VIEW EXPLODE(properties) AS property WHERE relClass = 'isMergedIn' AND property.key = 'pivot'
UNION
SELECT id, usedIn FROM `${pivot_history_db}`.`publication` LATERAL VIEW EXPLODE(usages) AS usedIn
)
SELECT id, min(usedIn) as firstUsage, max(usedIn) as lastUsage, collect_set(usedIn) as usages
FROM pivots
GROUP BY id; /*EOS*/
CREATE TABLE `${pivot_history_db}`.`software_new` STORED AS PARQUET AS
WITH pivots (
SELECT property.value AS id, '${new_graph_date}' AS usedIn FROM `${new_graph_db}`.`relation`
LEFT SEMI JOIN `${new_graph_db}`.`software` ON relation.source = software.id
LATERAL VIEW EXPLODE(properties) AS property WHERE relClass = 'isMergedIn' AND property.key = 'pivot'
UNION
SELECT id, usedIn FROM `${pivot_history_db}`.`software` LATERAL VIEW EXPLODE(usages) AS usedIn
)
SELECT id, min(usedIn) as firstUsage, max(usedIn) as lastUsage, collect_set(usedIn) as usages
FROM pivots
GROUP BY id; /*EOS*/
CREATE TABLE `${pivot_history_db}`.`otherresearchproduct_new` STORED AS PARQUET AS
WITH pivots (
SELECT property.value AS id, '${new_graph_date}' AS usedIn FROM `${new_graph_db}`.`relation`
LEFT SEMI JOIN `${new_graph_db}`.`otherresearchproduct` ON relation.source = otherresearchproduct.id
LATERAL VIEW EXPLODE(properties) AS property WHERE relClass = 'isMergedIn' AND property.key = 'pivot'
UNION
SELECT id, usedIn FROM `${pivot_history_db}`.`otherresearchproduct` LATERAL VIEW EXPLODE(usages) AS usedIn
)
SELECT id, min(usedIn) as firstUsage, max(usedIn) as lastUsage, collect_set(usedIn) as usages
FROM pivots
GROUP BY id; /*EOS*/
DROP TABLE IF EXISTS `${pivot_history_db}`.`dataset_old`; /*EOS*/
ALTER TABLE `${pivot_history_db}`.`dataset` RENAME TO `${pivot_history_db}`.`dataset_old`; /*EOS*/
ALTER TABLE `${pivot_history_db}`.`dataset_new` RENAME TO `${pivot_history_db}`.`dataset`; /*EOS*/
DROP TABLE IF EXISTS `${pivot_history_db}`.`publication_old`; /*EOS*/
ALTER TABLE `${pivot_history_db}`.`publication` RENAME TO `${pivot_history_db}`.`publication_old`; /*EOS*/
ALTER TABLE `${pivot_history_db}`.`publication_new` RENAME TO `${pivot_history_db}`.`publication`; /*EOS*/
DROP TABLE IF EXISTS `${pivot_history_db}`.`software_old`; /*EOS*/
ALTER TABLE `${pivot_history_db}`.`software` RENAME TO `${pivot_history_db}`.`software_old`; /*EOS*/
ALTER TABLE `${pivot_history_db}`.`software_new` RENAME TO `${pivot_history_db}`.`software`; /*EOS*/
DROP TABLE IF EXISTS `${pivot_history_db}`.`otherresearchproduct_old`; /*EOS*/
ALTER TABLE `${pivot_history_db}`.`otherresearchproduct` RENAME TO `${pivot_history_db}`.`otherresearchproduct_old`; /*EOS*/
ALTER TABLE `${pivot_history_db}`.`otherresearchproduct_new` RENAME TO `${pivot_history_db}`.`otherresearchproduct`; /*EOS*/

View File

@ -0,0 +1,95 @@
<workflow-app name="Update pivot history" xmlns="uri:oozie:workflow:0.5">
<parameters>
<!-- properties used in SQL -->
<property>
<name>pivot_history_db</name>
<!-- <value>openaire_beta_pivots_test</value> -->
<description>Pivot history DB on hive</description>
</property>
<property>
<name>new_graph_db</name>
<!--<value>openaire_beta_20231208</value> -->
<description>New graph DB on hive</description>
</property>
<property>
<name>new_graph_date</name>
<!-- <value>20231208</value> -->
<description>Creation date of new graph db</description>
</property>
<!-- RunSQLSparkJob properties -->
<property>
<name>hiveMetastoreUris</name>
<description>hive server metastore URIs</description>
</property>
<property>
<name>sparkSqlWarehouseDir</name>
</property>
<!-- General oozie workflow properties -->
<property>
<name>sparkClusterOpts</name>
<value>--conf spark.network.timeout=600 --conf spark.extraListeners= --conf spark.sql.queryExecutionListeners= --conf spark.yarn.historyServer.address=http://iis-cdh5-test-m3.ocean.icm.edu.pl:18088 --conf spark.eventLog.dir=hdfs://nameservice1/user/spark/applicationHistory</value>
<description>spark cluster-wide options</description>
</property>
<property>
<name>sparkResourceOpts</name>
<value>--executor-memory=3G --conf spark.executor.memoryOverhead=3G --executor-cores=6 --driver-memory=8G --driver-cores=4</value>
<description>spark resource options</description>
</property>
<property>
<name>sparkApplicationOpts</name>
<value>--conf spark.sql.shuffle.partitions=3840</value>
<description>spark resource options</description>
</property>
</parameters>
<global>
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<configuration>
<property>
<name>mapreduce.job.queuename</name>
<value>${queueName}</value>
</property>
<property>
<name>oozie.launcher.mapred.job.queue.name</name>
<value>${oozieLauncherQueueName}</value>
</property>
<property>
<name>oozie.action.sharelib.for.spark</name>
<value>${oozieActionShareLibForSpark2}</value>
</property>
</configuration>
</global>
<start to="UpgradePivotHistory"/>
<kill name="Kill">
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<action name="UpgradePivotHistory">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Upgrade Pivot History</name>
<class>eu.dnetlib.dhp.oozie.RunSQLSparkJob</class>
<jar>dhp-dedup-openaire-${projectVersion}.jar</jar>
<spark-opts>
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
${sparkClusterOpts}
${sparkResourceOpts}
${sparkApplicationOpts}
</spark-opts>
<arg>--hiveMetastoreUris</arg><arg>${hiveMetastoreUris}</arg>
<arg>--sql</arg><arg>eu/dnetlib/dhp/oa/dedup/pivothistory/oozie_app/sql.sql</arg>
<arg>--pivot_history_db</arg><arg>${pivot_history_db}</arg>
<arg>--new_graph_db</arg><arg>${new_graph_db}</arg>
<arg>--new_graph_date</arg><arg>${new_graph_date}</arg>
</spark>
<ok to="End"/>
<error to="Kill"/>
</action>
<end name="End"/>
</workflow-app>

View File

@ -25,7 +25,7 @@ case class mappingAffiliation(name: String) {}
case class mappingAuthor(
given: Option[String],
family: String,
family: Option[String],
sequence: Option[String],
ORCID: Option[String],
affiliation: Option[mappingAffiliation]
@ -226,14 +226,14 @@ case object Crossref2Oaf {
//Mapping Author
val authorList: List[mappingAuthor] =
(json \ "author").extractOrElse[List[mappingAuthor]](List())
(json \ "author").extract[List[mappingAuthor]].filter(a => a.family.isDefined)
val sorted_list = authorList.sortWith((a: mappingAuthor, b: mappingAuthor) =>
a.sequence.isDefined && a.sequence.get.equalsIgnoreCase("first")
)
result.setAuthor(sorted_list.zipWithIndex.map { case (a, index) =>
generateAuhtor(a.given.orNull, a.family, a.ORCID.orNull, index)
generateAuhtor(a.given.orNull, a.family.get, a.ORCID.orNull, index)
}.asJava)
// Mapping instance

File diff suppressed because one or more lines are too long

View File

@ -22,6 +22,13 @@ class CrossrefMappingTest {
val logger: Logger = LoggerFactory.getLogger(Crossref2Oaf.getClass)
val mapper = new ObjectMapper()
@Test
def testMissingAuthorParser():Unit = {
val json: String = Source.fromInputStream(getClass.getResourceAsStream("/eu/dnetlib/doiboost/crossref/s41567-022-01757-y.json")).mkString
val result = Crossref2Oaf.convert(json)
result.filter(o => o.isInstanceOf[Publication]).map(p=> p.asInstanceOf[Publication]).foreach(p =>assertTrue(p.getAuthor.size()>0))
}
@Test
def testFunderRelationshipsMapping(): Unit = {
val template = Source

View File

@ -0,0 +1,84 @@
package eu.dnetlib.dhp;
import static eu.dnetlib.dhp.PropagationConstant.isSparkSessionManaged;
import static eu.dnetlib.dhp.PropagationConstant.readPath;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import java.io.Serializable;
import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.resulttocommunityfromorganization.SparkResultToCommunityFromOrganizationJob;
import eu.dnetlib.dhp.schema.common.ModelSupport;
import eu.dnetlib.dhp.schema.oaf.Result;
/**
* @author miriam.baglioni
* @Date 15/01/24
*/
public class MoveResult implements Serializable {
private static final Logger log = LoggerFactory.getLogger(MoveResult.class);
public static void main(String[] args) throws Exception {
String jsonConfiguration = IOUtils
.toString(
MoveResult.class
.getResourceAsStream(
"/eu/dnetlib/dhp/wf/subworkflows/input_moveresult_parameters.json"));
final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
parser.parseArgument(args);
Boolean isSparkSessionManaged = isSparkSessionManaged(parser);
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
String inputPath = parser.get("sourcePath");
log.info("inputPath: {}", inputPath);
final String outputPath = parser.get("outputPath");
log.info("outputPath: {}", outputPath);
SparkConf conf = new SparkConf();
runWithSparkSession(
conf,
isSparkSessionManaged,
spark -> {
moveResults(spark, inputPath, outputPath);
});
}
public static <R extends Result> void moveResults(SparkSession spark, String inputPath, String outputPath) {
ModelSupport.entityTypes
.keySet()
.parallelStream()
.filter(e -> ModelSupport.isResult(e))
// .parallelStream()
.forEach(e -> {
Class<R> resultClazz = ModelSupport.entityTypes.get(e);
Dataset<R> resultDataset = readPath(spark, inputPath + e.name(), resultClazz);
if (resultDataset.count() > 0) {
resultDataset
.write()
.mode(SaveMode.Overwrite)
.option("compression", "gzip")
.json(outputPath + e.name());
}
});
}
}

View File

@ -97,12 +97,6 @@ public class SparkCountryPropagationJob {
.mode(SaveMode.Overwrite)
.json(outputPath);
readPath(spark, outputPath, resultClazz)
.write()
.mode(SaveMode.Overwrite)
.option("compression", "gzip")
.json(sourcePath);
}
private static <R extends Result> MapFunction<Tuple2<R, ResultCountrySet>, R> getCountryMergeFn() {

View File

@ -64,7 +64,7 @@ public class SparkResultToProjectThroughSemRelJob {
removeOutputDir(spark, outputPath);
}
execPropagation(
spark, outputPath, alreadyLinkedPath, potentialUpdatePath, saveGraph);
spark, outputPath, alreadyLinkedPath, potentialUpdatePath);
});
}
@ -72,24 +72,23 @@ public class SparkResultToProjectThroughSemRelJob {
SparkSession spark,
String outputPath,
String alreadyLinkedPath,
String potentialUpdatePath,
Boolean saveGraph) {
String potentialUpdatePath) {
Dataset<ResultProjectSet> toaddrelations = readPath(spark, potentialUpdatePath, ResultProjectSet.class);
Dataset<ResultProjectSet> alreadyLinked = readPath(spark, alreadyLinkedPath, ResultProjectSet.class);
if (saveGraph) {
toaddrelations
.joinWith(
alreadyLinked,
toaddrelations.col("resultId").equalTo(alreadyLinked.col("resultId")),
"left_outer")
.flatMap(mapRelationRn(), Encoders.bean(Relation.class))
.write()
.mode(SaveMode.Append)
.option("compression", "gzip")
.json(outputPath);
}
// if (saveGraph) {
toaddrelations
.joinWith(
alreadyLinked,
toaddrelations.col("resultId").equalTo(alreadyLinked.col("resultId")),
"left_outer")
.flatMap(mapRelationRn(), Encoders.bean(Relation.class))
.write()
.mode(SaveMode.Append)
.option("compression", "gzip")
.json(outputPath);
// }
}
private static FlatMapFunction<Tuple2<ResultProjectSet, ResultProjectSet>, Relation> mapRelationRn() {

View File

@ -76,29 +76,41 @@ public class SparkResultToCommunityFromOrganizationJob {
ModelSupport.entityTypes
.keySet()
.parallelStream()
.filter(e -> ModelSupport.isResult(e))
// .parallelStream()
.forEach(e -> {
if (ModelSupport.isResult(e)) {
Class<R> resultClazz = ModelSupport.entityTypes.get(e);
removeOutputDir(spark, outputPath + e.name());
Dataset<R> result = readPath(spark, inputPath + e.name(), resultClazz);
// if () {
Class<R> resultClazz = ModelSupport.entityTypes.get(e);
removeOutputDir(spark, outputPath + e.name());
Dataset<R> result = readPath(spark, inputPath + e.name(), resultClazz);
result
.joinWith(
possibleUpdates,
result.col("id").equalTo(possibleUpdates.col("resultId")),
"left_outer")
.map(resultCommunityFn(), Encoders.bean(resultClazz))
.write()
.mode(SaveMode.Overwrite)
.option("compression", "gzip")
.json(outputPath + e.name());
log.info("executing left join");
result
.joinWith(
possibleUpdates,
result.col("id").equalTo(possibleUpdates.col("resultId")),
"left_outer")
.map(resultCommunityFn(), Encoders.bean(resultClazz))
.write()
.mode(SaveMode.Overwrite)
.option("compression", "gzip")
.json(outputPath + e.name());
readPath(spark, outputPath + e.name(), resultClazz)
.write()
.mode(SaveMode.Overwrite)
.option("compression", "gzip")
.json(inputPath + e.name());
}
// log
// .info(
// "reading results from " + outputPath + e.name() + " and copying them to " + inputPath
// + e.name());
// Dataset<R> tmp = readPath(spark, outputPath + e.name(), resultClazz);
// if (tmp.count() > 0){
//
// tmp
// .write()
// .mode(SaveMode.Overwrite)
// .option("compression", "gzip")
// .json(inputPath + e.name());
// }
// }
});
}
@ -115,11 +127,11 @@ public class SparkResultToCommunityFromOrganizationJob {
.map(Context::getId)
.collect(Collectors.toList());
@SuppressWarnings("unchecked")
R res = (R) ret.getClass().newInstance();
// @SuppressWarnings("unchecked")
// R res = (R) ret.getClass().newInstance();
res.setId(ret.getId());
List<Context> propagatedContexts = new ArrayList<>();
// res.setId(ret.getId());
// List<Context> propagatedContexts = new ArrayList<>();
for (String cId : communitySet) {
if (!contextList.contains(cId)) {
Context newContext = new Context();
@ -133,11 +145,11 @@ public class SparkResultToCommunityFromOrganizationJob {
PROPAGATION_RESULT_COMMUNITY_ORGANIZATION_CLASS_ID,
PROPAGATION_RESULT_COMMUNITY_ORGANIZATION_CLASS_NAME,
ModelConstants.DNET_PROVENANCE_ACTIONS)));
propagatedContexts.add(newContext);
ret.getContext().add(newContext);
}
}
res.setContext(propagatedContexts);
ret.mergeFrom(res);
// res.setContext(propagatedContexts);
// ret.mergeFrom(res);
}
return ret;
};

View File

@ -86,29 +86,30 @@ public class SparkResultToCommunityFromProject implements Serializable {
ModelSupport.entityTypes
.keySet()
.parallelStream()
.filter(e -> ModelSupport.isResult(e))
.forEach(e -> {
if (ModelSupport.isResult(e)) {
removeOutputDir(spark, outputPath + e.name());
Class<R> resultClazz = ModelSupport.entityTypes.get(e);
Dataset<R> result = readPath(spark, inputPath + e.name(), resultClazz);
// if () {
removeOutputDir(spark, outputPath + e.name());
Class<R> resultClazz = ModelSupport.entityTypes.get(e);
Dataset<R> result = readPath(spark, inputPath + e.name(), resultClazz);
result
.joinWith(
possibleUpdates,
result.col("id").equalTo(possibleUpdates.col("resultId")),
"left_outer")
.map(resultCommunityFn(), Encoders.bean(resultClazz))
.write()
.mode(SaveMode.Overwrite)
.option("compression", "gzip")
.json(outputPath + e.name());
result
.joinWith(
possibleUpdates,
result.col("id").equalTo(possibleUpdates.col("resultId")),
"left_outer")
.map(resultCommunityFn(), Encoders.bean(resultClazz))
.write()
.mode(SaveMode.Overwrite)
.option("compression", "gzip")
.json(outputPath + e.name());
readPath(spark, outputPath + e.name(), resultClazz)
.write()
.mode(SaveMode.Overwrite)
.option("compression", "gzip")
.json(inputPath + e.name());
}
// readPath(spark, outputPath + e.name(), resultClazz)
// .write()
// .mode(SaveMode.Overwrite)
// .option("compression", "gzip")
// .json(inputPath + e.name());
// }
});
}

View File

@ -101,11 +101,6 @@ public class SparkResultToCommunityThroughSemRelJob {
.option("compression", "gzip")
.json(outputPath);
readPath(spark, outputPath, resultClazz)
.write()
.mode(SaveMode.Overwrite)
.option("compression", "gzip")
.json(inputPath);
}
private static <R extends Result> MapFunction<Tuple2<R, ResultCommunityList>, R> contextUpdaterFn() {
@ -115,11 +110,11 @@ public class SparkResultToCommunityThroughSemRelJob {
if (rcl.isPresent()) {
Set<String> contexts = new HashSet<>();
ret.getContext().forEach(c -> contexts.add(c.getId()));
List<Context> contextList = rcl
rcl
.get()
.getCommunityList()
.stream()
.map(
.forEach(
c -> {
if (!contexts.contains(c)) {
Context newContext = new Context();
@ -133,19 +128,11 @@ public class SparkResultToCommunityThroughSemRelJob {
PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID,
PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME,
ModelConstants.DNET_PROVENANCE_ACTIONS)));
return newContext;
ret.getContext().add(newContext);
}
return null;
})
.filter(Objects::nonNull)
.collect(Collectors.toList());
@SuppressWarnings("unchecked")
R r = (R) ret.getClass().newInstance();
});
r.setId(ret.getId());
r.setContext(contextList);
ret.mergeFrom(r);
}
return ret;

View File

@ -1,12 +1,12 @@
sourcePath=/tmp/beta_provision/graph/09_graph_dedup_enriched
resumeFrom=CountryPropagation
sourcePath=/tmp/beta_provision/graph/10_graph_orcid_enriched
resumeFrom=ResultProject
allowedsemrelsorcidprop=isSupplementedBy;isSupplementTo
allowedsemrelsresultproject=isSupplementedBy;isSupplementTo
allowedsemrelscommunitysemrel=isSupplementedBy;isSupplementTo
datasourceWhitelistForCountryPropagation=10|opendoar____::16e6a3326dd7d868cbc926602a61e4d0;10|openaire____::fdb035c8b3e0540a8d9a561a6c44f4de;10|eurocrisdris::fe4903425d9040f680d8610d9079ea14;10|openaire____::5b76240cc27a58c6f7ceef7d8c36660e;10|openaire____::172bbccecf8fca44ab6a6653e84cb92a;10|openaire____::149c6590f8a06b46314eed77bfca693f;10|eurocrisdris::a6026877c1a174d60f81fd71f62df1c1;10|openaire____::4692342f0992d91f9e705c26959f09e0;10|openaire____::8d529dbb05ec0284662b391789e8ae2a;10|openaire____::345c9d171ef3c5d706d08041d506428c;10|opendoar____::1c1d4df596d01da60385f0bb17a4a9e0;10|opendoar____::7a614fd06c325499f1680b9896beedeb;10|opendoar____::1ee3dfcd8a0645a25a35977997223d22;10|opendoar____::d296c101daa88a51f6ca8cfc1ac79b50;10|opendoar____::798ed7d4ee7138d49b8828958048130a;10|openaire____::c9d2209ecc4d45ba7b4ca7597acb88a2;10|eurocrisdris::c49e0fe4b9ba7b7fab717d1f0f0a674d;10|eurocrisdris::9ae43d14471c4b33661fedda6f06b539;10|eurocrisdris::432ca599953ff50cd4eeffe22faf3e48
#allowedtypes=pubsrepository::institutional
allowedtypes=Institutional
outputPath=/tmp/miriam/enrichment_one_step
outputPath=/tmp/miriam/graph/11_graph_orcid
pathMap ={"author":"$['author'][*]['fullname']", \
"title":"$['title'][*]['value']",\
"orcid":"$['author'][*]['pid'][*][?(@['qualifier']['classid']=='orcid')]['value']" ,\

View File

@ -45,10 +45,18 @@
</property>
<property>
<name>sparkExecutorMemory</name>
<value>6G</value>
<value>5G</value>
</property>
<property>
<name>sparkExecutorCores</name>
<value>1</value>
<value>4</value>
</property>
<property>
<name>memoryOverhead</name>
<value>3G</value>
</property>
<property>
<name>partitions</name>
<value>3284</value>
</property>
</configuration>

View File

@ -12,6 +12,10 @@
<name>baseURL</name>
<description>The URL to access the community APIs</description>
</property>
<property>
<name>startFrom></name>
<value>undelete</value>
</property>
</parameters>
@ -26,12 +30,20 @@
</configuration>
</global>
<start to="reset_outputpath"/>
<start to="startFrom"/>
<kill name="Kill">
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<decision name="startFrom">
<switch>
<case to="exec_bulktag">${wf:conf('startFrom') eq 'undelete'}</case>
<default to="reset_outputpath"/>
</switch>
</decision>
<action name="reset_outputpath">
<fs>
<delete path="${workingDir}"/>
@ -45,7 +57,7 @@
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn-cluster</master>
<mode>cluster</mode>
<name>bulkTagging-publication</name>
<name>bulkTagging</name>
<class>eu.dnetlib.dhp.bulktag.SparkBulkTagJob</class>
<jar>dhp-enrichment-${projectVersion}.jar</jar>
<spark-opts>
@ -53,6 +65,8 @@
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.executor.memoryOverhead=${memoryOverhead}
--conf spark.sql.shuffle.partitions=${partitions}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}

View File

@ -45,11 +45,11 @@
</property>
<property>
<name>sparkExecutorMemory</name>
<value>6G</value>
<value>5G</value>
</property>
<property>
<name>sparkExecutorCores</name>
<value>1</value>
<value>4</value>
</property>
<property>
<name>spark2MaxExecutors</name>

View File

@ -12,6 +12,10 @@
<name>allowedtypes</name>
<description>the allowed types</description>
</property>
<property>
<name>startFrom</name>
<value>undelete</value>
</property>
</parameters>
<global>
@ -25,7 +29,15 @@
</configuration>
</global>
<start to="reset_outputpath"/>
<start to="resumeFrom"/>
<decision name="resumeFrom">
<switch>
<case to="prepare_datasource_country_association">${wf:conf('startFrom') eq 'undelete'}</case>
<default to="reset_outputpath"/>
</switch>
</decision>
<kill name="Kill">
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
@ -61,7 +73,7 @@
<arg>--sourcePath</arg><arg>${sourcePath}</arg>
<arg>--whitelist</arg><arg>${whitelist}</arg>
<arg>--allowedtypes</arg><arg>${allowedtypes}</arg>
<arg>--outputPath</arg><arg>${workingDir}/preparedInfo</arg>
<arg>--outputPath</arg><arg>${workingDir}/country/preparedInfo</arg>
</spark>
<ok to="fork_prepare_result_country"/>
<error to="Kill"/>
@ -95,10 +107,10 @@
--conf spark.sql.shuffle.partitions=3840
</spark-opts>
<arg>--sourcePath</arg><arg>${sourcePath}/publication</arg>
<arg>--outputPath</arg><arg>${workingDir}/publication</arg>
<arg>--workingPath</arg><arg>${workingDir}/workingP</arg>
<arg>--outputPath</arg><arg>${workingDir}/country/publication</arg>
<arg>--workingPath</arg><arg>${workingDir}/country/workingP</arg>
<arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.Publication</arg>
<arg>--preparedInfoPath</arg><arg>${workingDir}/preparedInfo</arg>
<arg>--preparedInfoPath</arg><arg>${workingDir}/country/preparedInfo</arg>
</spark>
<ok to="wait_prepare"/>
<error to="Kill"/>
@ -125,10 +137,10 @@
--conf spark.sql.shuffle.partitions=3840
</spark-opts>
<arg>--sourcePath</arg><arg>${sourcePath}/dataset</arg>
<arg>--outputPath</arg><arg>${workingDir}/dataset</arg>
<arg>--workingPath</arg><arg>${workingDir}/workingD</arg>
<arg>--outputPath</arg><arg>${workingDir}/country/dataset</arg>
<arg>--workingPath</arg><arg>${workingDir}/country/workingD</arg>
<arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.Dataset</arg>
<arg>--preparedInfoPath</arg><arg>${workingDir}/preparedInfo</arg>
<arg>--preparedInfoPath</arg><arg>${workingDir}/country/preparedInfo</arg>
</spark>
<ok to="wait_prepare"/>
<error to="Kill"/>
@ -155,10 +167,10 @@
--conf spark.sql.shuffle.partitions=3840
</spark-opts>
<arg>--sourcePath</arg><arg>${sourcePath}/otherresearchproduct</arg>
<arg>--outputPath</arg><arg>${workingDir}/otherresearchproduct</arg>
<arg>--workingPath</arg><arg>${workingDir}/workingO</arg>
<arg>--outputPath</arg><arg>${workingDir}/country/otherresearchproduct</arg>
<arg>--workingPath</arg><arg>${workingDir}/country/workingO</arg>
<arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.OtherResearchProduct</arg>
<arg>--preparedInfoPath</arg><arg>${workingDir}/preparedInfo</arg>
<arg>--preparedInfoPath</arg><arg>${workingDir}/country/preparedInfo</arg>
</spark>
<ok to="wait_prepare"/>
<error to="Kill"/>
@ -185,10 +197,10 @@
--conf spark.sql.shuffle.partitions=3840
</spark-opts>
<arg>--sourcePath</arg><arg>${sourcePath}/software</arg>
<arg>--outputPath</arg><arg>${workingDir}/software</arg>
<arg>--workingPath</arg><arg>${workingDir}/workingS</arg>
<arg>--outputPath</arg><arg>${workingDir}/country/software</arg>
<arg>--workingPath</arg><arg>${workingDir}/country/workingS</arg>
<arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.Software</arg>
<arg>--preparedInfoPath</arg><arg>${workingDir}/preparedInfo</arg>
<arg>--preparedInfoPath</arg><arg>${workingDir}/country/preparedInfo</arg>
</spark>
<ok to="wait_prepare"/>
<error to="Kill"/>
@ -221,12 +233,12 @@
--conf spark.speculation=false
--conf spark.hadoop.mapreduce.map.speculative=false
--conf spark.hadoop.mapreduce.reduce.speculative=false
--conf spark.sql.shuffle.partitions=3840
--conf spark.sql.shuffle.partitions=7680
</spark-opts>
<arg>--sourcePath</arg><arg>${sourcePath}/publication</arg>
<arg>--preparedInfoPath</arg><arg>${workingDir}/publication</arg>
<arg>--preparedInfoPath</arg><arg>${workingDir}/country/publication</arg>
<arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.Publication</arg>
<arg>--outputPath</arg><arg>${workingDir}/country/publication</arg>
<arg>--outputPath</arg><arg>${workingDir}/country/country/publication</arg>
</spark>
<ok to="wait"/>
<error to="Kill"/>
@ -253,9 +265,9 @@
--conf spark.sql.shuffle.partitions=3840
</spark-opts>
<arg>--sourcePath</arg><arg>${sourcePath}/dataset</arg>
<arg>--preparedInfoPath</arg><arg>${workingDir}/dataset</arg>
<arg>--preparedInfoPath</arg><arg>${workingDir}/country/dataset</arg>
<arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.Dataset</arg>
<arg>--outputPath</arg><arg>${workingDir}/country/dataset</arg>
<arg>--outputPath</arg><arg>${workingDir}/country/country/dataset</arg>
</spark>
<ok to="wait"/>
<error to="Kill"/>
@ -282,9 +294,9 @@
--conf spark.sql.shuffle.partitions=3840
</spark-opts>
<arg>--sourcePath</arg><arg>${sourcePath}/otherresearchproduct</arg>
<arg>--preparedInfoPath</arg><arg>${workingDir}/otherresearchproduct</arg>
<arg>--preparedInfoPath</arg><arg>${workingDir}/country/otherresearchproduct</arg>
<arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.OtherResearchProduct</arg>
<arg>--outputPath</arg><arg>${workingDir}/country/otherresearchproduct</arg>
<arg>--outputPath</arg><arg>${workingDir}/country/country/otherresearchproduct</arg>
</spark>
<ok to="wait"/>
<error to="Kill"/>
@ -311,15 +323,49 @@
--conf spark.sql.shuffle.partitions=3840
</spark-opts>
<arg>--sourcePath</arg><arg>${sourcePath}/software</arg>
<arg>--preparedInfoPath</arg><arg>${workingDir}/software</arg>
<arg>--preparedInfoPath</arg><arg>${workingDir}/country/software</arg>
<arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.Software</arg>
<arg>--outputPath</arg><arg>${workingDir}/country/software</arg>
<arg>--outputPath</arg><arg>${workingDir}/country/country/software</arg>
</spark>
<ok to="wait"/>
<error to="Kill"/>
</action>
<join name="wait" to="reset_workingDir"/>
<join name="wait" to="move-results"/>
<action name="move-results">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>community2resultfromorganization - move results</name>
<class>eu.dnetlib.dhp.MoveResult</class>
<jar>dhp-enrichment-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=6
--executor-memory=5G
--conf spark.executor.memoryOverhead=3g
--conf spark.sql.shuffle.partitions=3284
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.dynamicAllocation.maxExecutors=${spark2MaxExecutors}
</spark-opts>
<arg>--sourcePath</arg><arg>${workingDir}/country/country/</arg>
<arg>--outputPath</arg><arg>${sourcePath}/</arg>
<!-- <arg>&#45;&#45;outputPath</arg><arg>/tmp/miriam/rescomm/</arg>-->
</spark>
<ok to="deleteWD"/>
<error to="Kill"/>
</action>
<decision name="deleteWD">
<switch>
<case to="End">${wf:conf('startFrom') eq 'undelete'}</case>
<default to="reset_workingDir"/>
</switch>
</decision>
<action name="reset_workingDir">
<fs>
<delete path="${workingDir}"/>

View File

@ -4,7 +4,10 @@
<name>sourcePath</name>
<description>the source path</description>
</property>
<property>
<name>startFrom</name>
<value>undelete</value>
</property>
</parameters>
<global>
@ -18,7 +21,15 @@
</configuration>
</global>
<start to="reset_outputpath"/>
<start to="startFrom"/>
<decision name="startFrom">
<switch>
<case to="prepare_info">${wf:conf('startFrom') eq 'undelete'}</case>
<default to="reset_outputpath"/>
</switch>
</decision>
<kill name="Kill">
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>

View File

@ -0,0 +1,22 @@
[
{
"paramName":"s",
"paramLongName":"sourcePath",
"paramDescription": "the path of the sequencial file to read",
"paramRequired": true
},
{
"paramName": "out",
"paramLongName": "outputPath",
"paramDescription": "the path used to store temporary output files",
"paramRequired": true
},
{
"paramName": "ssm",
"paramLongName": "isSparkSessionManaged",
"paramDescription": "true if the spark session is managed, false otherwise",
"paramRequired": false
}
]

View File

@ -114,7 +114,7 @@
<arg>--sourcePath</arg><arg>${sourcePath}</arg>
<arg>--hive_metastore_uris</arg><arg>${hive_metastore_uris}</arg>
<arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.Publication</arg>
<arg>--outputPath</arg><arg>${workingDir}/preparedInfo/targetOrcidAssoc</arg>
<arg>--outputPath</arg><arg>${workingDir}/orcid/targetOrcidAssoc</arg>
<arg>--allowedsemrels</arg><arg>${allowedsemrels}</arg>
</spark>
<ok to="wait"/>
@ -142,7 +142,7 @@
<arg>--sourcePath</arg><arg>${sourcePath}</arg>
<arg>--hive_metastore_uris</arg><arg>${hive_metastore_uris}</arg>
<arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.Dataset</arg>
<arg>--outputPath</arg><arg>${workingDir}/preparedInfo/targetOrcidAssoc</arg>
<arg>--outputPath</arg><arg>${workingDir}/orcid/targetOrcidAssoc</arg>
<arg>--allowedsemrels</arg><arg>${allowedsemrels}</arg>
</spark>
<ok to="wait"/>
@ -170,7 +170,7 @@
<arg>--sourcePath</arg><arg>${sourcePath}</arg>
<arg>--hive_metastore_uris</arg><arg>${hive_metastore_uris}</arg>
<arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.OtherResearchProduct</arg>
<arg>--outputPath</arg><arg>${workingDir}/preparedInfo/targetOrcidAssoc</arg>
<arg>--outputPath</arg><arg>${workingDir}/orcid/targetOrcidAssoc</arg>
<arg>--allowedsemrels</arg><arg>${allowedsemrels}</arg>
</spark>
<ok to="wait"/>
@ -198,7 +198,7 @@
<arg>--sourcePath</arg><arg>${sourcePath}</arg>
<arg>--hive_metastore_uris</arg><arg>${hive_metastore_uris}</arg>
<arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.Software</arg>
<arg>--outputPath</arg><arg>${workingDir}/preparedInfo/targetOrcidAssoc</arg>
<arg>--outputPath</arg><arg>${workingDir}/orcid/targetOrcidAssoc</arg>
<arg>--allowedsemrels</arg><arg>${allowedsemrels}</arg>
</spark>
<ok to="wait"/>
@ -225,8 +225,8 @@
--conf spark.dynamicAllocation.enabled=true
--conf spark.dynamicAllocation.maxExecutors=${spark2MaxExecutors}
</spark-opts>
<arg>--sourcePath</arg><arg>${workingDir}/orcidprop</arg>
<arg>--outputPath</arg><arg>${workingDir}/orcidprop/mergedOrcidAssoc</arg>
<arg>--sourcePath</arg><arg>${workingDir}/orcid/targetOrcidAssoc</arg>
<arg>--outputPath</arg><arg>${workingDir}/orcid/mergedOrcidAssoc</arg>
</spark>
<ok to="fork-join-exec-propagation"/>
<error to="Kill"/>
@ -247,9 +247,10 @@
<class>eu.dnetlib.dhp.orcidtoresultfromsemrel.SparkOrcidToResultFromSemRelJob</class>
<jar>dhp-enrichment-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemory}
--executor-cores=4
--executor-memory=4G
--driver-memory=${sparkDriverMemory}
--conf spark.executor.memoryOverhead=5G
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
@ -259,9 +260,9 @@
--conf spark.speculation=false
--conf spark.hadoop.mapreduce.map.speculative=false
--conf spark.hadoop.mapreduce.reduce.speculative=false
--conf spark.sql.shuffle.partitions=3840
--conf spark.sql.shuffle.partitions=15000
</spark-opts>
<arg>--possibleUpdatesPath</arg><arg>${workingDir}/orcidprop/mergedOrcidAssoc</arg>
<arg>--possibleUpdatesPath</arg><arg>${workingDir}/orcid/mergedOrcidAssoc</arg>
<arg>--sourcePath</arg><arg>${sourcePath}/publication</arg>
<arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.Publication</arg>
<arg>--outputPath</arg><arg>${outputPath}/publication</arg>
@ -291,7 +292,7 @@
--conf spark.hadoop.mapreduce.map.speculative=false
--conf spark.hadoop.mapreduce.reduce.speculative=false
</spark-opts>
<arg>--possibleUpdatesPath</arg><arg>${workingDir}/orcidprop/mergedOrcidAssoc</arg>
<arg>--possibleUpdatesPath</arg><arg>${workingDir}/orcid/mergedOrcidAssoc</arg>
<arg>--sourcePath</arg><arg>${sourcePath}/dataset</arg>
<arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.Dataset</arg>
<arg>--outputPath</arg><arg>${outputPath}/dataset</arg>
@ -321,7 +322,7 @@
--conf spark.hadoop.mapreduce.map.speculative=false
--conf spark.hadoop.mapreduce.reduce.speculative=false
</spark-opts>
<arg>--possibleUpdatesPath</arg><arg>${workingDir}/orcidprop/mergedOrcidAssoc</arg>
<arg>--possibleUpdatesPath</arg><arg>${workingDir}/orcid/mergedOrcidAssoc</arg>
<arg>--sourcePath</arg><arg>${sourcePath}/otherresearchproduct</arg>
<arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.OtherResearchProduct</arg>
<arg>--outputPath</arg><arg>${outputPath}/otherresearchproduct</arg>
@ -351,7 +352,7 @@
--conf spark.hadoop.mapreduce.map.speculative=false
--conf spark.hadoop.mapreduce.reduce.speculative=false
</spark-opts>
<arg>--possibleUpdatesPath</arg><arg>${workingDir}/orcidprop/mergedOrcidAssoc</arg>
<arg>--possibleUpdatesPath</arg><arg>${workingDir}/orcid/mergedOrcidAssoc</arg>
<arg>--sourcePath</arg><arg>${sourcePath}/software</arg>
<arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.Software</arg>
<arg>--outputPath</arg><arg>${outputPath}/software</arg>

View File

@ -8,7 +8,10 @@
<name>allowedsemrels</name>
<description>the allowed semantics </description>
</property>
<property>
<name>startFrom</name>
<value>undelete</value>
</property>
</parameters>
<global>
@ -22,7 +25,14 @@
</configuration>
</global>
<start to="reset_outputpath"/>
<start to="startFrom"/>
<decision name="startFrom">
<switch>
<case to="prepare_project_results_association">${wf:conf('startFrom') eq 'undelete'}</case>
<default to="reset_outputpath"/>
</switch>
</decision>
<kill name="Kill">
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
@ -86,17 +96,9 @@
<arg>--potentialUpdatePath</arg><arg>${workingDir}/resultproject/preparedInfo/potentialUpdates</arg>
<arg>--alreadyLinkedPath</arg><arg>${workingDir}/resultproject/preparedInfo/alreadyLinked</arg>
</spark>
<ok to="reset_workingDir"/>
<error to="Kill"/>
</action>
<action name="reset_workingDir">
<fs>
<delete path="${workingDir}"/>
<mkdir path="${workingDir}"/>
</fs>
<ok to="End"/>
<error to="Kill"/>
</action>
<end name="End"/>
<end name="End"/>
</workflow-app>

View File

@ -8,6 +8,10 @@
<name>baseURL</name>
<description>the baseURL from where to reach the community APIs</description>
</property>
<property>
<name>startFrom</name>
<value>undelete</value>
</property>
</parameters>
<global>
@ -21,7 +25,15 @@
</configuration>
</global>
<start to="reset_outputpath"/>
<start to="startFrom"/>
<decision name="startFrom">
<switch>
<case to="prepare_result_communitylist">${wf:conf('startFrom') eq 'undelete'}</case>
<default to="reset_outputpath"/>
</switch>
</decision>
<kill name="Kill">
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
@ -69,7 +81,7 @@
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>community2resultfromorganization-Publication</name>
<name>community2resultfromorganization</name>
<class>eu.dnetlib.dhp.resulttocommunityfromorganization.SparkResultToCommunityFromOrganizationJob</class>
<jar>dhp-enrichment-${projectVersion}.jar</jar>
<spark-opts>
@ -88,6 +100,33 @@
<arg>--sourcePath</arg><arg>${sourcePath}/</arg>
<arg>--outputPath</arg><arg>${workingDir}/communityorganization/resulttocommunityfromorganization/</arg>
</spark>
<ok to="move-results"/>
<error to="Kill"/>
</action>
<action name="move-results">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>community2resultfromorganization - move results</name>
<class>eu.dnetlib.dhp.MoveResult</class>
<jar>dhp-enrichment-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=6
--executor-memory=5G
--conf spark.executor.memoryOverhead=3g
--conf spark.sql.shuffle.partitions=3284
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.dynamicAllocation.maxExecutors=${spark2MaxExecutors}
</spark-opts>
<arg>--sourcePath</arg><arg>${workingDir}/communityorganization/resulttocommunityfromorganization/</arg>
<arg>--outputPath</arg><arg>${sourcePath}/</arg>
<!-- <arg>&#45;&#45;outputPath</arg><arg>/tmp/miriam/rescomm/</arg>-->
</spark>
<ok to="End"/>
<error to="Kill"/>
</action>

View File

@ -8,6 +8,10 @@
<name>baseURL</name>
<description>the base URL to use to select the right community APIs</description>
</property>
<property>
<name>startFrom</name>
<value>undelete</value>
</property>
</parameters>
<global>
@ -21,7 +25,15 @@
</configuration>
</global>
<start to="reset_outputpath"/>
<start to="startFrom"/>
<decision name="startFrom">
<switch>
<case to="prepare_result_communitylist">${wf:conf('startFrom') eq 'undelete'}</case>
<default to="reset_outputpath"/>
</switch>
</decision>
<kill name="Kill">
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
@ -86,12 +98,37 @@
<arg>--sourcePath</arg><arg>${sourcePath}/</arg>
<arg>--outputPath</arg><arg>${workingDir}/communitythroughproject/</arg>
</spark>
<ok to="move-results"/>
<error to="Kill"/>
</action>
<action name="move-results">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>move results</name>
<class>eu.dnetlib.dhp.MoveResult</class>
<jar>dhp-enrichment-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=6
--executor-memory=5G
--conf spark.executor.memoryOverhead=3g
--conf spark.sql.shuffle.partitions=3284
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.dynamicAllocation.maxExecutors=${spark2MaxExecutors}
</spark-opts>
<arg>--sourcePath</arg><arg>${workingDir}/communitythroughproject/</arg>
<arg>--outputPath</arg><arg>${sourcePath}/</arg>
<!-- <arg>outputPath</arg><arg>/tmp/miriam/rescomm/</arg>-->
</spark>
<ok to="End"/>
<error to="Kill"/>
</action>
<end name="End"/>
</workflow-app>

View File

@ -16,9 +16,21 @@
<name>outputPath</name>
<description>the output path</description>
</property>
<property>
<name>startFrom</name>
<value>undelete</value>
</property>
</parameters>
<start to="reset_outputpath"/>
<start to="startFrom"/>
<decision name="startFrom">
<switch>
<case to="fork_prepare_assoc_step1">${wf:conf('startFrom') eq 'undelete'}</case>
<default to="reset_outputpath"/>
</switch>
</decision>
<kill name="Kill">
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
@ -209,9 +221,9 @@
<jar>dhp-enrichment-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=6
--executor-memory=5G
--conf spark.executor.memoryOverhead=3g
--conf spark.sql.shuffle.partitions=3284
--executor-memory=4G
--conf spark.executor.memoryOverhead=5G
--conf spark.sql.shuffle.partitions=15000
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
@ -324,7 +336,34 @@
<error to="Kill"/>
</action>
<join name="wait2" to="End"/>
<join name="wait2" to="move-results"/>
<action name="move-results">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>move results</name>
<class>eu.dnetlib.dhp.MoveResult</class>
<jar>dhp-enrichment-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=6
--executor-memory=5G
--conf spark.executor.memoryOverhead=3g
--conf spark.sql.shuffle.partitions=3284
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.dynamicAllocation.maxExecutors=${spark2MaxExecutors}
</spark-opts>
<arg>--sourcePath</arg><arg>${workingDir}/communitysemrel/</arg>
<arg>--outputPath</arg><arg>${sourcePath}/</arg>
<!-- <arg>outputPath</arg><arg>/tmp/miriam/rescomm/</arg>-->
</spark>
<ok to="End"/>
<error to="Kill"/>
</action>
<end name="End"/>

View File

@ -8,6 +8,10 @@
<name>blacklist</name>
<description>The list of institutional repositories that should not be used for the propagation</description>
</property>
<property>
<name>startFrom</name>
<value>undelete</value>
</property>
</parameters>
<global>
@ -21,7 +25,15 @@
</configuration>
</global>
<start to="reset_outputpath"/>
<start to="startFrom"/>
<decision name="startFrom">
<switch>
<case to="prepare_result_organization_association">${wf:conf('startFrom') eq 'undelete'}</case>
<default to="reset_outputpath"/>
</switch>
</decision>
<kill name="Kill">
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>

View File

@ -43,6 +43,17 @@
<arg>--graphPath</arg><arg>${graphPath}</arg>
<arg>--master</arg><arg>yarn</arg>
</spark>
<ok to="reset_outputpath"/>
<error to="Kill"/>
</action>
<action name="reset_outputpath">
<fs>
<delete path="${graphPath}/datasource"/>
<delete path="${graphPath}/organization"/>
<delete path="${graphPath}/project"/>
<delete path="${graphPath}/relation"/>
</fs>
<ok to="copy_datasource"/>
<error to="Kill"/>
</action>

View File

@ -1250,6 +1250,21 @@ class MappersTest {
System.out.println("***************");
}
@Test
void testIRISPub() throws IOException, DocumentException {
final String xml = IOUtils.toString(Objects.requireNonNull(getClass().getResourceAsStream("iris-odf.xml")));
final List<Oaf> list = new OdfToOafMapper(vocs, false, true).processMdRecord(xml);
System.out.println("***************");
System.out.println(new ObjectMapper().writeValueAsString(list));
System.out.println("***************");
final Publication p = (Publication) list.get(0);
assertNotNull(p.getInstance().get(0).getUrl().get(0));
assertValidId(p.getId());
System.out.println(p.getInstance().get(0).getUrl());
p.getPid().forEach(x -> System.out.println(x.getValue()));
p.getInstance().get(0).getAlternateIdentifier().forEach(x -> System.out.println(x.getValue()));
}
@Test
void testNotWellFormed() throws IOException {
final String xml = IOUtils

View File

@ -0,0 +1,215 @@
<?xml version="1.0" encoding="UTF-8"?>
<record xmlns:datacite="http://datacite.org/schema/kernel-4"
xmlns:dc="http://purl.org/dc/elements/1.1/"
xmlns:dr="http://www.driver-repository.eu/namespace/dr"
xmlns:dri="http://www.driver-repository.eu/namespace/dri"
xmlns:oaf="http://namespace.openaire.eu/oaf"
xmlns:oai="http://www.openarchives.org/OAI/2.0/"
xmlns:oaire="http://namespace.openaire.eu/schema/oaire/"
xmlns:xs="http://www.w3.org/2001/XMLSchema" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
<oai:header xmlns="http://namespace.openaire.eu/" xmlns:prov="http://www.openarchives.org/OAI/2.0/provenance">
<identifier>oai:air.unimi.it:2434/907506</identifier>
<datestamp>2024-01-04T12:42:51Z</datestamp>
<setSpec>com_2434_73555</setSpec>
<setSpec>col_2434_73557</setSpec>
<setSpec>openaire</setSpec>
<dr:dateOfTransformation>2024-01-29T16:56:50.632Z</dr:dateOfTransformation>
<dri:objIdentifier>od______1261::ff2d9e058e7bea90a27f41c31078e601</dri:objIdentifier>
<dri:recordIdentifier>oai:air.unimi.it:2434/907506</dri:recordIdentifier>
<dri:dateOfCollection/>
<dri:mdFormat/>
<dri:mdFormatInterpretation/>
<dri:repositoryId/>
<oaf:datasourceprefix> od______1261</oaf:datasourceprefix>
</oai:header>
<metadata>
<oaire:resource xmlns:oaire="http://namespace.openaire.eu/schema/oaire/"
xmlns:exslt="http://exslt.org/common"
xmlns:xs="http://www.w3.org/2001/XMLSchema"
xmlns:rdf="http://www.w3.org/TR/rdf-concepts/"
xmlns:doc="http://www.lyncode.com/xoai"
xmlns:dc="http://purl.org/dc/elements/1.1/"
xmlns:datacite="http://datacite.org/schema/kernel-4"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:vc="http://www.w3.org/2007/XMLSchema-versioning"
xmlns="http://www.openarchives.org/OAI/2.0/"
xsi:schemaLocation="http://namespace.openaire.eu/schema/oaire/ https://www.openaire.eu/schema/repo-lit/4.0/openaire.xsd">
<datacite:titles>
<datacite:title xml:lang="en">Ensuring tests of conservation interventions build on existing literature</datacite:title>
</datacite:titles>
<datacite:creator>
<datacite:creator>
<datacite:creatorName>W.J. Sutherland</datacite:creatorName>
</datacite:creator>
<datacite:creator>
<datacite:creatorName>S.T. Alvarez-Castaneda</datacite:creatorName>
</datacite:creator>
<datacite:creator>
<datacite:creatorName>T. Amano</datacite:creatorName>
</datacite:creator>
<datacite:creator>
<datacite:creatorName>R. Ambrosini</datacite:creatorName>
</datacite:creator>
<datacite:creator>
<datacite:creatorName>P. Atkinson</datacite:creatorName>
</datacite:creator>
<datacite:creator>
<datacite:creatorName>J.M. Baxter</datacite:creatorName>
</datacite:creator>
<datacite:creator>
<datacite:creatorName>A.L. Bond</datacite:creatorName>
</datacite:creator>
<datacite:creator>
<datacite:creatorName>P.J. Boon</datacite:creatorName>
</datacite:creator>
<datacite:creator>
<datacite:creatorName>K.L. Buchanan</datacite:creatorName>
</datacite:creator>
<datacite:creator>
<datacite:creatorName>J. Barlow</datacite:creatorName>
</datacite:creator>
<datacite:creator>
<datacite:creatorName>G. Bogliani</datacite:creatorName>
</datacite:creator>
<datacite:creator>
<datacite:creatorName>O.M. Bragg</datacite:creatorName>
</datacite:creator>
<datacite:creator>
<datacite:creatorName>M. Burgman</datacite:creatorName>
</datacite:creator>
<datacite:creator>
<datacite:creatorName>M.W. Cadotte</datacite:creatorName>
</datacite:creator>
<datacite:creator>
<datacite:creatorName>M. Calver</datacite:creatorName>
</datacite:creator>
<datacite:creator>
<datacite:creatorName>S.J. Cooke</datacite:creatorName>
</datacite:creator>
<datacite:creator>
<datacite:creatorName>R.T. Corlett</datacite:creatorName>
</datacite:creator>
<datacite:creator>
<datacite:creatorName>V. Devictor</datacite:creatorName>
</datacite:creator>
<datacite:creator>
<datacite:creatorName>J.G. Ewen</datacite:creatorName>
</datacite:creator>
<datacite:creator>
<datacite:creatorName>M. Fisher</datacite:creatorName>
</datacite:creator>
<datacite:creator>
<datacite:creatorName>G. Freeman</datacite:creatorName>
</datacite:creator>
<datacite:creator>
<datacite:creatorName>E. Game</datacite:creatorName>
</datacite:creator>
<datacite:creator>
<datacite:creatorName>B.J. Godley</datacite:creatorName>
</datacite:creator>
<datacite:creator>
<datacite:creatorName>C. Gortazar</datacite:creatorName>
</datacite:creator>
<datacite:creator>
<datacite:creatorName>I.R. Hartley</datacite:creatorName>
</datacite:creator>
<datacite:creator>
<datacite:creatorName>D.L. Hawksworth</datacite:creatorName>
</datacite:creator>
<datacite:creator>
<datacite:creatorName>K.A. Hobson</datacite:creatorName>
</datacite:creator>
<datacite:creator>
<datacite:creatorName>M.-. Lu</datacite:creatorName>
</datacite:creator>
<datacite:creator>
<datacite:creatorName>B. Martin-Lopez</datacite:creatorName>
</datacite:creator>
<datacite:creator>
<datacite:creatorName>K. Ma</datacite:creatorName>
</datacite:creator>
<datacite:creator>
<datacite:creatorName>A. Machado</datacite:creatorName>
</datacite:creator>
<datacite:creator>
<datacite:creatorName>D. Mae</datacite:creatorName>
</datacite:creator>
<datacite:creator>
<datacite:creatorName>M. Mangiacotti</datacite:creatorName>
</datacite:creator>
<datacite:creator>
<datacite:creatorName>D.J. Mccafferty</datacite:creatorName>
</datacite:creator>
<datacite:creator>
<datacite:creatorName>V. Melfi</datacite:creatorName>
</datacite:creator>
<datacite:creator>
<datacite:creatorName>S. Molur</datacite:creatorName>
</datacite:creator>
<datacite:creator>
<datacite:creatorName>A.J. Moore</datacite:creatorName>
</datacite:creator>
<datacite:creator>
<datacite:creatorName>S.D. Murphy</datacite:creatorName>
</datacite:creator>
<datacite:creator>
<datacite:creatorName>D. Norri</datacite:creatorName>
</datacite:creator>
<datacite:creator>
<datacite:creatorName>A.P.E. van Oudenhoven</datacite:creatorName>
</datacite:creator>
<datacite:creator>
<datacite:creatorName>J. Power</datacite:creatorName>
</datacite:creator>
<datacite:creator>
<datacite:creatorName>E.C. Ree</datacite:creatorName>
</datacite:creator>
<datacite:creator>
<datacite:creatorName>M.W. Schwartz</datacite:creatorName>
</datacite:creator>
<datacite:creator>
<datacite:creatorName>I. Storch</datacite:creatorName>
</datacite:creator>
<datacite:creator>
<datacite:creatorName>C. Wordley</datacite:creatorName>
</datacite:creator>
</datacite:creator>
<datacite:relatedIdentifiers>
</datacite:relatedIdentifiers>
<datacite:dates>
<datacite:date dateType="Accepted">2020</datacite:date>
<datacite:date dateType="Issued">2020</datacite:date>
<datacite:date dateType="Available">2022-06-20</datacite:date>
</datacite:dates>
<dc:language>eng</dc:language>
<dc:publisher>Wiley Blackwell Publishing</dc:publisher>
<oaire:resourceType resourceTypeGeneral="literature"
uri="http://purl.org/coar/resource_type/c_6501">journal article</oaire:resourceType>
<dc:format>application/pdf</dc:format>
<datacite:identifier xmlns:datacite="http://datacite.org/schema/kernel-3"
identifierType="Handle">2434/907506</datacite:identifier>
<datacite:rights rightsURI="http://purl.org/coar/access_right/c_abf2">open access</datacite:rights>
<datacite:subjects>
<datacite:subject>Conservation of Natural Resources</datacite:subject>
</datacite:subjects>
<datacite:sizes/>
<datacite:sizes/>
<datacite:sizes>
<datacite:size>191802 bytes</datacite:size>
</datacite:sizes>
<oaire:file accessRightsURI="" mimeType="application/pdf" objectType="fulltext">https://air.unimi.it/bitstream/2434/907506/4/Full%20manuscript%20resubmitted.pdf</oaire:file>
</oaire:resource>
<oaf:identifier identifierType="DOI">10.1111/cobi.13555</oaf:identifier>
<oaf:identifier identifierType="PMID">32779884</oaf:identifier>
<oaf:fulltext>https://air.unimi.it/bitstream/2434/907506/4/Full%20manuscript%20resubmitted.pdf</oaf:fulltext>
<dr:CobjCategory type="publication">0001</dr:CobjCategory>
<oaf:dateAccepted>2020-01-01</oaf:dateAccepted>
<oaf:accessrights>OPEN</oaf:accessrights>
<oaf:language>eng</oaf:language>
<oaf:hostedBy name="Archivio Istituzionale della Ricerca dell'Università degli Studi di Milano"
id="opendoar____::1261"/>
<oaf:collectedFrom name="Archivio Istituzionale della Ricerca dell'Università degli Studi di Milano"
id="opendoar____::1261"/>
</metadata>
</record>

View File

@ -62,8 +62,8 @@ public class XmlConverterJob {
final String outputPath = parser.get("outputPath");
log.info("outputPath: {}", outputPath);
final String isLookupUrl = parser.get("isLookupUrl");
log.info("isLookupUrl: {}", isLookupUrl);
final String contextApiBaseUrl = parser.get("contextApiBaseUrl");
log.info("contextApiBaseUrl: {}", contextApiBaseUrl);
final SparkConf conf = new SparkConf();
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
@ -71,7 +71,7 @@ public class XmlConverterJob {
runWithSparkSession(conf, isSparkSessionManaged, spark -> {
removeOutputDir(spark, outputPath);
convertToXml(spark, inputPath, outputPath, ContextMapper.fromIS(isLookupUrl));
convertToXml(spark, inputPath, outputPath, ContextMapper.fromAPI(contextApiBaseUrl));
});
}

View File

@ -1,18 +1,22 @@
package eu.dnetlib.dhp.oa.provision.utils;
import java.io.Serializable;
import java.io.StringReader;
import java.io.*;
import java.net.HttpURLConnection;
import java.net.URL;
import java.util.HashMap;
import org.dom4j.Document;
import org.dom4j.DocumentException;
import org.dom4j.Node;
import org.dom4j.io.SAXReader;
import org.jetbrains.annotations.NotNull;
import org.xml.sax.SAXException;
import com.google.common.base.Joiner;
import eu.dnetlib.dhp.common.api.context.*;
import eu.dnetlib.dhp.common.rest.DNetRestClient;
import eu.dnetlib.dhp.utils.ISLookupClientFactory;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
@ -23,6 +27,45 @@ public class ContextMapper extends HashMap<String, ContextDef> implements Serial
private static final String XQUERY = "for $x in //RESOURCE_PROFILE[.//RESOURCE_TYPE/@value='ContextDSResourceType']//*[name()='context' or name()='category' or name()='concept'] return <entry id=\"{$x/@id}\" label=\"{$x/@label|$x/@name}\" name=\"{$x/name()}\" type=\"{$x/@type}\"/>";
public static ContextMapper fromAPI(final String baseURL) throws Exception {
final ContextMapper contextMapper = new ContextMapper();
for (ContextSummary ctx : DNetRestClient
.doGET(String.format("%s/contexts", baseURL), ContextSummaryList.class)) {
contextMapper.put(ctx.getId(), new ContextDef(ctx.getId(), ctx.getLabel(), "context", ctx.getType()));
for (CategorySummary cat : DNetRestClient
.doGET(String.format("%s/context/%s?all=true", baseURL, ctx.getId()), CategorySummaryList.class)) {
contextMapper.put(cat.getId(), new ContextDef(cat.getId(), cat.getLabel(), "category", ""));
if (cat.isHasConcept()) {
for (ConceptSummary c : DNetRestClient
.doGET(
String.format("%s/context/category/%s?all=true", baseURL, cat.getId()),
ConceptSummaryList.class)) {
contextMapper.put(c.getId(), new ContextDef(c.getId(), c.getLabel(), "concept", ""));
if (c.isHasSubConcept()) {
for (ConceptSummary cs : c.getConcepts()) {
contextMapper.put(cs.getId(), new ContextDef(cs.getId(), cs.getLabel(), "concept", ""));
if (cs.isHasSubConcept()) {
for (ConceptSummary css : cs.getConcepts()) {
contextMapper
.put(
css.getId(),
new ContextDef(css.getId(), css.getLabel(), "concept", ""));
}
}
}
}
}
}
}
}
return contextMapper;
}
@Deprecated
public static ContextMapper fromIS(final String isLookupUrl)
throws DocumentException, ISLookUpException, SAXException {
ISLookUpService isLookUp = ISLookupClientFactory.getLookUpService(isLookupUrl);
@ -32,6 +75,7 @@ public class ContextMapper extends HashMap<String, ContextDef> implements Serial
return fromXml(sb.toString());
}
@Deprecated
public static ContextMapper fromXml(final String xml) throws DocumentException, SAXException {
final ContextMapper contextMapper = new ContextMapper();

View File

@ -12,9 +12,9 @@
"paramRequired": true
},
{
"paramName": "ilu",
"paramLongName": "isLookupUrl",
"paramDescription": "URL of the isLookUp Service",
"paramName": "cau",
"paramLongName": "contextApiBaseUrl",
"paramDescription": "URL of the context API",
"paramRequired": true
}
]

View File

@ -9,6 +9,10 @@
<name>isLookupUrl</name>
<description>URL for the isLookup service</description>
</property>
<property>
<name>contextApiBaseUrl</name>
<description>context API URL</description>
</property>
<property>
<name>relPartitions</name>
<description>number or partitions for the relations Dataset</description>
@ -589,7 +593,7 @@
</spark-opts>
<arg>--inputPath</arg><arg>${workingDir}/join_entities</arg>
<arg>--outputPath</arg><arg>${workingDir}/xml</arg>
<arg>--isLookupUrl</arg><arg>${isLookupUrl}</arg>
<arg>--contextApiBaseUrl</arg><arg>${contextApiBaseUrl}</arg>
</spark>
<ok to="should_index"/>
<error to="Kill"/>

View File

@ -244,4 +244,27 @@ public class XmlRecordFactoryTest {
}
@Test
public void testIrisGuidelines4() throws DocumentException, IOException {
final ContextMapper contextMapper = new ContextMapper();
final XmlRecordFactory xmlRecordFactory = new XmlRecordFactory(contextMapper, false,
XmlConverterJob.schemaLocation);
final Publication p = OBJECT_MAPPER
.readValue(
IOUtils.toString(getClass().getResourceAsStream("iris-odf-4.json")),
Publication.class);
final String xml = xmlRecordFactory.build(new JoinedEntity<>(p));
assertNotNull(xml);
final Document doc = new SAXReader().read(new StringReader(xml));
assertNotNull(doc);
System.out.println(doc.asXML());
}
}

File diff suppressed because one or more lines are too long

View File

@ -8,6 +8,11 @@
<modelVersion>4.0.0</modelVersion>
<artifactId>dhp-stats-update</artifactId>
<dependencies>
<dependency>
<groupId>eu.dnetlib.dhp</groupId>
<artifactId>dhp-common</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.binary.version}</artifactId>

View File

@ -64,6 +64,26 @@
<name>hadoop_user_name</name>
<description>user name of the wf owner</description>
</property>
<property>
<name>sparkSqlWarehouseDir</name>
</property>
<!-- General oozie workflow properties -->
<property>
<name>sparkClusterOpts</name>
<value>--conf spark.network.timeout=600 --conf spark.extraListeners= --conf spark.sql.queryExecutionListeners= --conf spark.yarn.historyServer.address=http://iis-cdh5-test-m3.ocean.icm.edu.pl:18088 --conf spark.eventLog.dir=hdfs://nameservice1/user/spark/applicationHistory</value>
<description>spark cluster-wide options</description>
</property>
<property>
<name>sparkResourceOpts</name>
<value>--executor-memory=6G --conf spark.executor.memoryOverhead=4G --executor-cores=6 --driver-memory=8G --driver-cores=4</value>
<description>spark resource options</description>
</property>
<property>
<name>sparkApplicationOpts</name>
<value>--conf spark.sql.shuffle.partitions=3840</value>
<description>spark resource options</description>
</property>
</parameters>
<global>
@ -75,13 +95,21 @@
<value>${hive_metastore_uris}</value>
</property>
<property>
<name>hive.txn.timeout</name>
<value>${hive_timeout}</value>
<name>hive.txn.timeout</name>
<value>${hive_timeout}</value>
</property>
<property>
<name>hive.mapjoin.followby.gby.localtask.max.memory.usage</name>
<value>0.80</value>
</property>
<property>
<name>oozie.action.sharelib.for.spark</name>
<value>${oozieActionShareLibForSpark2}</value>
</property>
<property>
<name>mapred.job.queue.name</name>
<value>analytics</value>
</property>
<property>
<name>mapred.job.queue.name</name>
<value>analytics</value>
</property>
</configuration>
</global>
@ -129,164 +157,164 @@
<hive2 xmlns="uri:oozie:hive2-action:0.1">
<jdbc-url>${hive_jdbc_url}</jdbc-url>
<script>scripts/step1.sql</script>
<param>stats_db_name=${stats_db_name}</param>
<param>openaire_db_name=${openaire_db_name}</param>
<param>stats_db_name=${stats_db_name}</param>
<param>openaire_db_name=${openaire_db_name}</param>
</hive2>
<ok to="Step2"/>
<error to="Kill"/>
</action>
<action name="Step2">
<hive2 xmlns="uri:oozie:hive2-action:0.1">
<jdbc-url>${hive_jdbc_url}</jdbc-url>
<script>scripts/step2.sql</script>
<param>stats_db_name=${stats_db_name}</param>
<param>openaire_db_name=${openaire_db_name}</param>
<param>stats_db_name=${stats_db_name}</param>
<param>openaire_db_name=${openaire_db_name}</param>
</hive2>
<ok to="Step3"/>
<error to="Kill"/>
</action>
<action name="Step3">
<hive2 xmlns="uri:oozie:hive2-action:0.1">
<jdbc-url>${hive_jdbc_url}</jdbc-url>
<script>scripts/step3.sql</script>
<param>stats_db_name=${stats_db_name}</param>
<param>openaire_db_name=${openaire_db_name}</param>
<param>stats_db_name=${stats_db_name}</param>
<param>openaire_db_name=${openaire_db_name}</param>
</hive2>
<ok to="Step4"/>
<error to="Kill"/>
</action>
<action name="Step4">
<hive2 xmlns="uri:oozie:hive2-action:0.1">
<hive2 xmlns="uri:oozie:hive2-action:0.1">
<jdbc-url>${hive_jdbc_url}</jdbc-url>
<script>scripts/step4.sql</script>
<param>stats_db_name=${stats_db_name}</param>
<param>openaire_db_name=${openaire_db_name}</param>
<param>stats_db_name=${stats_db_name}</param>
<param>openaire_db_name=${openaire_db_name}</param>
</hive2>
<ok to="Step5"/>
<error to="Kill"/>
</action>
<action name="Step5">
<hive2 xmlns="uri:oozie:hive2-action:0.1">
<hive2 xmlns="uri:oozie:hive2-action:0.1">
<jdbc-url>${hive_jdbc_url}</jdbc-url>
<script>scripts/step5.sql</script>
<param>stats_db_name=${stats_db_name}</param>
<param>openaire_db_name=${openaire_db_name}</param>
<param>stats_db_name=${stats_db_name}</param>
<param>openaire_db_name=${openaire_db_name}</param>
</hive2>
<ok to="Step6"/>
<error to="Kill"/>
</action>
<action name="Step6">
<hive2 xmlns="uri:oozie:hive2-action:0.1">
<hive2 xmlns="uri:oozie:hive2-action:0.1">
<jdbc-url>${hive_jdbc_url}</jdbc-url>
<script>scripts/step6.sql</script>
<param>stats_db_name=${stats_db_name}</param>
<param>openaire_db_name=${openaire_db_name}</param>
<param>stats_db_name=${stats_db_name}</param>
<param>openaire_db_name=${openaire_db_name}</param>
</hive2>
<ok to="Step7"/>
<error to="Kill"/>
</action>
<action name="Step7">
<hive2 xmlns="uri:oozie:hive2-action:0.1">
<hive2 xmlns="uri:oozie:hive2-action:0.1">
<jdbc-url>${hive_jdbc_url}</jdbc-url>
<script>scripts/step7.sql</script>
<param>stats_db_name=${stats_db_name}</param>
<param>openaire_db_name=${openaire_db_name}</param>
<param>stats_db_name=${stats_db_name}</param>
<param>openaire_db_name=${openaire_db_name}</param>
</hive2>
<ok to="Step8"/>
<error to="Kill"/>
</action>
<action name="Step8">
<hive2 xmlns="uri:oozie:hive2-action:0.1">
<hive2 xmlns="uri:oozie:hive2-action:0.1">
<jdbc-url>${hive_jdbc_url}</jdbc-url>
<script>scripts/step8.sql</script>
<param>stats_db_name=${stats_db_name}</param>
<param>openaire_db_name=${openaire_db_name}</param>
<param>stats_db_name=${stats_db_name}</param>
<param>openaire_db_name=${openaire_db_name}</param>
</hive2>
<ok to="Step9"/>
<error to="Kill"/>
</action>
<action name="Step9">
<hive2 xmlns="uri:oozie:hive2-action:0.1">
<hive2 xmlns="uri:oozie:hive2-action:0.1">
<jdbc-url>${hive_jdbc_url}</jdbc-url>
<script>scripts/step9.sql</script>
<param>stats_db_name=${stats_db_name}</param>
<param>openaire_db_name=${openaire_db_name}</param>
<param>stats_db_name=${stats_db_name}</param>
<param>openaire_db_name=${openaire_db_name}</param>
</hive2>
<ok to="Step10"/>
<error to="Kill"/>
</action>
<action name="Step10">
<hive2 xmlns="uri:oozie:hive2-action:0.1">
<hive2 xmlns="uri:oozie:hive2-action:0.1">
<jdbc-url>${hive_jdbc_url}</jdbc-url>
<script>scripts/step10.sql</script>
<param>stats_db_name=${stats_db_name}</param>
<param>openaire_db_name=${openaire_db_name}</param>
<param>external_stats_db_name=${external_stats_db_name}</param>
<param>stats_db_name=${stats_db_name}</param>
<param>openaire_db_name=${openaire_db_name}</param>
<param>external_stats_db_name=${external_stats_db_name}</param>
</hive2>
<ok to="Step11"/>
<error to="Kill"/>
</action>
</action>
<action name="Step11">
<hive2 xmlns="uri:oozie:hive2-action:0.1">
<hive2 xmlns="uri:oozie:hive2-action:0.1">
<jdbc-url>${hive_jdbc_url}</jdbc-url>
<script>scripts/step11.sql</script>
<param>stats_db_name=${stats_db_name}</param>
<param>openaire_db_name=${openaire_db_name}</param>
<param>external_stats_db_name=${external_stats_db_name}</param>
<param>stats_db_name=${stats_db_name}</param>
<param>openaire_db_name=${openaire_db_name}</param>
<param>external_stats_db_name=${external_stats_db_name}</param>
</hive2>
<ok to="Step12"/>
<error to="Kill"/>
</action>
</action>
<action name="Step12">
<hive2 xmlns="uri:oozie:hive2-action:0.1">
<jdbc-url>${hive_jdbc_url}</jdbc-url>
<script>scripts/step12.sql</script>
<param>stats_db_name=${stats_db_name}</param>
<param>openaire_db_name=${openaire_db_name}</param>
<param>stats_db_name=${stats_db_name}</param>
<param>openaire_db_name=${openaire_db_name}</param>
</hive2>
<ok to="Step13"/>
<error to="Kill"/>
</action>
<action name="Step13">
<hive2 xmlns="uri:oozie:hive2-action:0.1">
<jdbc-url>${hive_jdbc_url}</jdbc-url>
<script>scripts/step13.sql</script>
<param>stats_db_name=${stats_db_name}</param>
<param>openaire_db_name=${openaire_db_name}</param>
<param>stats_db_name=${stats_db_name}</param>
<param>openaire_db_name=${openaire_db_name}</param>
</hive2>
<ok to="Step14"/>
<error to="Kill"/>
</action>
<action name="Step14">
<hive2 xmlns="uri:oozie:hive2-action:0.1">
<jdbc-url>${hive_jdbc_url}</jdbc-url>
<script>scripts/step14.sql</script>
<param>stats_db_name=${stats_db_name}</param>
<param>openaire_db_name=${openaire_db_name}</param>
<param>stats_db_name=${stats_db_name}</param>
<param>openaire_db_name=${openaire_db_name}</param>
</hive2>
<ok to="Step15"/>
<error to="Kill"/>
</action>
<action name="Step15">
<hive2 xmlns="uri:oozie:hive2-action:0.1">
<jdbc-url>${hive_jdbc_url}</jdbc-url>
<script>scripts/step15.sql</script>
<param>stats_db_name=${stats_db_name}</param>
<param>openaire_db_name=${openaire_db_name}</param>
<param>stats_db_name=${stats_db_name}</param>
<param>openaire_db_name=${openaire_db_name}</param>
</hive2>
<ok to="Step15_5"/>
<error to="Kill"/>
@ -318,12 +346,23 @@
</action>
<action name="Step16-createIndicatorsTables">
<hive2 xmlns="uri:oozie:hive2-action:0.1">
<jdbc-url>${hive_jdbc_url}</jdbc-url>
<script>scripts/step16-createIndicatorsTables.sql</script>
<param>stats_db_name=${stats_db_name}</param>
<param>external_stats_db_name=${external_stats_db_name}</param>
</hive2>
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Step16-createIndicatorsTables</name>
<class>eu.dnetlib.dhp.oozie.RunSQLSparkJob</class>
<jar>dhp-stats-update-${projectVersion}.jar</jar>
<spark-opts>
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
${sparkClusterOpts}
${sparkResourceOpts}
${sparkApplicationOpts}
</spark-opts>
<arg>--hiveMetastoreUris</arg><arg>${hive_metastore_uris}</arg>
<arg>--sql</arg><arg>eu/dnetlib/dhp/oa/graph/stats/oozie_app/scripts/step16-createIndicatorsTables.sql</arg>
<arg>--stats_db_name</arg><arg>${stats_db_name}</arg>
<arg>--external_stats_db_name</arg><arg>${external_stats_db_name}</arg>
</spark>
<ok to="Step16_1-definitions"/>
<error to="Kill"/>
</action>
@ -383,18 +422,18 @@
<error to="Kill"/>
</action>
<!-- <action name="step20-createMonitorDB-post">-->
<!-- <shell xmlns="uri:oozie:shell-action:0.1">-->
<!-- <job-tracker>${jobTracker}</job-tracker>-->
<!-- <name-node>${nameNode}</name-node>-->
<!-- <exec>monitor-post.sh</exec>-->
<!-- <argument>${monitor_db_name}</argument>-->
<!-- <argument>${monitor_db_shadow_name}</argument>-->
<!-- <file>monitor-post.sh</file>-->
<!-- </shell>-->
<!-- <ok to="step21-createObservatoryDB-pre"/>-->
<!-- <error to="Kill"/>-->
<!-- </action>-->
<!-- <action name="step20-createMonitorDB-post">-->
<!-- <shell xmlns="uri:oozie:shell-action:0.1">-->
<!-- <job-tracker>${jobTracker}</job-tracker>-->
<!-- <name-node>${nameNode}</name-node>-->
<!-- <exec>monitor-post.sh</exec>-->
<!-- <argument>${monitor_db_name}</argument>-->
<!-- <argument>${monitor_db_shadow_name}</argument>-->
<!-- <file>monitor-post.sh</file>-->
<!-- </shell>-->
<!-- <ok to="step21-createObservatoryDB-pre"/>-->
<!-- <error to="Kill"/>-->
<!-- </action>-->
<action name="step21-createObservatoryDB-pre">
<shell xmlns="uri:oozie:shell-action:0.1">
@ -439,8 +478,8 @@
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<exec>copyDataToImpalaCluster.sh</exec>
<!-- <env-var>HADOOP_USER_NAME=${wf:user()}</env-var>-->
<!-- <argument>${external_stats_db_name}</argument>-->
<!-- <env-var>HADOOP_USER_NAME=${wf:user()}</env-var>-->
<!-- <argument>${external_stats_db_name}</argument>-->
<argument>${stats_db_name}</argument>
<argument>${monitor_db_name}</argument>
<argument>${observatory_db_name}</argument>
@ -501,4 +540,4 @@
</action>
<end name="End"/>
</workflow-app>
</workflow-app>