forked from D-Net/dnet-hadoop
mergin with branch beta
This commit is contained in:
commit
6e58d79623
|
@ -0,0 +1,39 @@
|
||||||
|
|
||||||
|
package eu.dnetlib.dhp.common.api.context;
|
||||||
|
|
||||||
|
public class CategorySummary {
|
||||||
|
|
||||||
|
private String id;
|
||||||
|
|
||||||
|
private String label;
|
||||||
|
|
||||||
|
private boolean hasConcept;
|
||||||
|
|
||||||
|
public String getId() {
|
||||||
|
return id;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getLabel() {
|
||||||
|
return label;
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean isHasConcept() {
|
||||||
|
return hasConcept;
|
||||||
|
}
|
||||||
|
|
||||||
|
public CategorySummary setId(final String id) {
|
||||||
|
this.id = id;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
public CategorySummary setLabel(final String label) {
|
||||||
|
this.label = label;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
public CategorySummary setHasConcept(final boolean hasConcept) {
|
||||||
|
this.hasConcept = hasConcept;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,7 @@
|
||||||
|
|
||||||
|
package eu.dnetlib.dhp.common.api.context;
|
||||||
|
|
||||||
|
import java.util.ArrayList;
|
||||||
|
|
||||||
|
public class CategorySummaryList extends ArrayList<CategorySummary> {
|
||||||
|
}
|
|
@ -0,0 +1,52 @@
|
||||||
|
|
||||||
|
package eu.dnetlib.dhp.common.api.context;
|
||||||
|
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
public class ConceptSummary {
|
||||||
|
|
||||||
|
private String id;
|
||||||
|
|
||||||
|
private String label;
|
||||||
|
|
||||||
|
public boolean hasSubConcept;
|
||||||
|
|
||||||
|
private List<ConceptSummary> concepts;
|
||||||
|
|
||||||
|
public String getId() {
|
||||||
|
return id;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getLabel() {
|
||||||
|
return label;
|
||||||
|
}
|
||||||
|
|
||||||
|
public List<ConceptSummary> getConcepts() {
|
||||||
|
return concepts;
|
||||||
|
}
|
||||||
|
|
||||||
|
public ConceptSummary setId(final String id) {
|
||||||
|
this.id = id;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
public ConceptSummary setLabel(final String label) {
|
||||||
|
this.label = label;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean isHasSubConcept() {
|
||||||
|
return hasSubConcept;
|
||||||
|
}
|
||||||
|
|
||||||
|
public ConceptSummary setHasSubConcept(final boolean hasSubConcept) {
|
||||||
|
this.hasSubConcept = hasSubConcept;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
public ConceptSummary setConcept(final List<ConceptSummary> concepts) {
|
||||||
|
this.concepts = concepts;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,7 @@
|
||||||
|
|
||||||
|
package eu.dnetlib.dhp.common.api.context;
|
||||||
|
|
||||||
|
import java.util.ArrayList;
|
||||||
|
|
||||||
|
public class ConceptSummaryList extends ArrayList<ConceptSummary> {
|
||||||
|
}
|
|
@ -0,0 +1,50 @@
|
||||||
|
|
||||||
|
package eu.dnetlib.dhp.common.api.context;
|
||||||
|
|
||||||
|
public class ContextSummary {
|
||||||
|
|
||||||
|
private String id;
|
||||||
|
|
||||||
|
private String label;
|
||||||
|
|
||||||
|
private String type;
|
||||||
|
|
||||||
|
private String status;
|
||||||
|
|
||||||
|
public String getId() {
|
||||||
|
return id;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getLabel() {
|
||||||
|
return label;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getType() {
|
||||||
|
return type;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getStatus() {
|
||||||
|
return status;
|
||||||
|
}
|
||||||
|
|
||||||
|
public ContextSummary setId(final String id) {
|
||||||
|
this.id = id;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
public ContextSummary setLabel(final String label) {
|
||||||
|
this.label = label;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
public ContextSummary setType(final String type) {
|
||||||
|
this.type = type;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
public ContextSummary setStatus(final String status) {
|
||||||
|
this.status = status;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,7 @@
|
||||||
|
|
||||||
|
package eu.dnetlib.dhp.common.api.context;
|
||||||
|
|
||||||
|
import java.util.ArrayList;
|
||||||
|
|
||||||
|
public class ContextSummaryList extends ArrayList<ContextSummary> {
|
||||||
|
}
|
|
@ -0,0 +1,77 @@
|
||||||
|
|
||||||
|
package eu.dnetlib.dhp.oozie;
|
||||||
|
|
||||||
|
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkHiveSession;
|
||||||
|
|
||||||
|
import java.net.URL;
|
||||||
|
import java.nio.charset.StandardCharsets;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.Optional;
|
||||||
|
|
||||||
|
import org.apache.commons.lang3.time.DurationFormatUtils;
|
||||||
|
import org.apache.commons.text.StringSubstitutor;
|
||||||
|
import org.apache.spark.SparkConf;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import com.google.common.io.Resources;
|
||||||
|
|
||||||
|
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
||||||
|
|
||||||
|
public class RunSQLSparkJob {
|
||||||
|
private static final Logger log = LoggerFactory.getLogger(RunSQLSparkJob.class);
|
||||||
|
|
||||||
|
private final ArgumentApplicationParser parser;
|
||||||
|
|
||||||
|
public RunSQLSparkJob(ArgumentApplicationParser parser) {
|
||||||
|
this.parser = parser;
|
||||||
|
}
|
||||||
|
|
||||||
|
public static void main(String[] args) throws Exception {
|
||||||
|
|
||||||
|
Map<String, String> params = new HashMap<>();
|
||||||
|
for (int i = 0; i < args.length - 1; i++) {
|
||||||
|
if (args[i].startsWith("--")) {
|
||||||
|
params.put(args[i].substring(2), args[++i]);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* String jsonConfiguration = IOUtils .toString( Objects .requireNonNull( RunSQLSparkJob.class
|
||||||
|
* .getResourceAsStream( "/eu/dnetlib/dhp/oozie/run_sql_parameters.json"))); final ArgumentApplicationParser
|
||||||
|
* parser = new ArgumentApplicationParser(jsonConfiguration); parser.parseArgument(args);
|
||||||
|
*/
|
||||||
|
|
||||||
|
Boolean isSparkSessionManaged = Optional
|
||||||
|
.ofNullable(params.get("isSparkSessionManaged"))
|
||||||
|
.map(Boolean::valueOf)
|
||||||
|
.orElse(Boolean.TRUE);
|
||||||
|
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
|
||||||
|
|
||||||
|
URL url = com.google.common.io.Resources.getResource(params.get("sql"));
|
||||||
|
String raw_sql = Resources.toString(url, StandardCharsets.UTF_8);
|
||||||
|
|
||||||
|
String sql = StringSubstitutor.replace(raw_sql, params);
|
||||||
|
log.info("sql: {}", sql);
|
||||||
|
|
||||||
|
SparkConf conf = new SparkConf();
|
||||||
|
conf.set("hive.metastore.uris", params.get("hiveMetastoreUris"));
|
||||||
|
|
||||||
|
runWithSparkHiveSession(
|
||||||
|
conf,
|
||||||
|
isSparkSessionManaged,
|
||||||
|
spark -> {
|
||||||
|
for (String statement : sql.split(";\\s*/\\*\\s*EOS\\s*\\*/\\s*")) {
|
||||||
|
log.info("executing: {}", statement);
|
||||||
|
long startTime = System.currentTimeMillis();
|
||||||
|
spark.sql(statement).show();
|
||||||
|
log
|
||||||
|
.info(
|
||||||
|
"executed in {}",
|
||||||
|
DurationFormatUtils.formatDuration(System.currentTimeMillis() - startTime, "HH:mm:ss.S"));
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,20 @@
|
||||||
|
[
|
||||||
|
{
|
||||||
|
"paramName": "issm",
|
||||||
|
"paramLongName": "isSparkSessionManaged",
|
||||||
|
"paramDescription": "when true will stop SparkSession after job execution",
|
||||||
|
"paramRequired": false
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"paramName": "hmu",
|
||||||
|
"paramLongName": "hiveMetastoreUris",
|
||||||
|
"paramDescription": "the hive metastore uris",
|
||||||
|
"paramRequired": true
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"paramName": "sql",
|
||||||
|
"paramLongName": "sql",
|
||||||
|
"paramDescription": "sql script to execute",
|
||||||
|
"paramRequired": true
|
||||||
|
}
|
||||||
|
]
|
|
@ -1,6 +1,16 @@
|
||||||
|
|
||||||
package eu.dnetlib.dhp.oa.dedup;
|
package eu.dnetlib.dhp.oa.dedup;
|
||||||
|
|
||||||
|
import java.util.*;
|
||||||
|
import java.util.stream.Stream;
|
||||||
|
|
||||||
|
import org.apache.commons.beanutils.BeanUtils;
|
||||||
|
import org.apache.commons.lang3.StringUtils;
|
||||||
|
import org.apache.spark.api.java.function.FlatMapFunction;
|
||||||
|
import org.apache.spark.api.java.function.MapFunction;
|
||||||
|
import org.apache.spark.api.java.function.ReduceFunction;
|
||||||
|
import org.apache.spark.sql.*;
|
||||||
|
|
||||||
import eu.dnetlib.dhp.oa.dedup.model.Identifier;
|
import eu.dnetlib.dhp.oa.dedup.model.Identifier;
|
||||||
import eu.dnetlib.dhp.oa.merge.AuthorMerger;
|
import eu.dnetlib.dhp.oa.merge.AuthorMerger;
|
||||||
import eu.dnetlib.dhp.schema.common.ModelSupport;
|
import eu.dnetlib.dhp.schema.common.ModelSupport;
|
||||||
|
@ -8,19 +18,10 @@ import eu.dnetlib.dhp.schema.oaf.Author;
|
||||||
import eu.dnetlib.dhp.schema.oaf.DataInfo;
|
import eu.dnetlib.dhp.schema.oaf.DataInfo;
|
||||||
import eu.dnetlib.dhp.schema.oaf.OafEntity;
|
import eu.dnetlib.dhp.schema.oaf.OafEntity;
|
||||||
import eu.dnetlib.dhp.schema.oaf.Result;
|
import eu.dnetlib.dhp.schema.oaf.Result;
|
||||||
import org.apache.commons.beanutils.BeanUtils;
|
|
||||||
import org.apache.commons.lang3.StringUtils;
|
|
||||||
import org.apache.spark.api.java.function.FlatMapFunction;
|
|
||||||
import org.apache.spark.api.java.function.MapFunction;
|
|
||||||
import org.apache.spark.api.java.function.ReduceFunction;
|
|
||||||
import org.apache.spark.sql.*;
|
|
||||||
import scala.Tuple2;
|
import scala.Tuple2;
|
||||||
import scala.Tuple3;
|
import scala.Tuple3;
|
||||||
import scala.collection.JavaConversions;
|
import scala.collection.JavaConversions;
|
||||||
|
|
||||||
import java.util.*;
|
|
||||||
import java.util.stream.Stream;
|
|
||||||
|
|
||||||
public class DedupRecordFactory {
|
public class DedupRecordFactory {
|
||||||
public static final class DedupRecordReduceState {
|
public static final class DedupRecordReduceState {
|
||||||
public final String dedupId;
|
public final String dedupId;
|
||||||
|
@ -39,7 +40,8 @@ public class DedupRecordFactory {
|
||||||
} else {
|
} else {
|
||||||
if (Result.class.isAssignableFrom(entity.getClass())) {
|
if (Result.class.isAssignableFrom(entity.getClass())) {
|
||||||
Result result = (Result) entity;
|
Result result = (Result) entity;
|
||||||
if (result.getDateofacceptance() != null && StringUtils.isNotBlank(result.getDateofacceptance().getValue())) {
|
if (result.getDateofacceptance() != null
|
||||||
|
&& StringUtils.isNotBlank(result.getDateofacceptance().getValue())) {
|
||||||
acceptanceDate.add(result.getDateofacceptance().getValue());
|
acceptanceDate.add(result.getDateofacceptance().getValue());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -50,6 +52,7 @@ public class DedupRecordFactory {
|
||||||
return dedupId;
|
return dedupId;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private static final int MAX_ACCEPTANCE_DATE = 20;
|
private static final int MAX_ACCEPTANCE_DATE = 20;
|
||||||
|
|
||||||
private DedupRecordFactory() {
|
private DedupRecordFactory() {
|
||||||
|
@ -90,8 +93,12 @@ public class DedupRecordFactory {
|
||||||
.join(entities, JavaConversions.asScalaBuffer(Collections.singletonList("id")), "left")
|
.join(entities, JavaConversions.asScalaBuffer(Collections.singletonList("id")), "left")
|
||||||
.select("dedupId", "id", "kryoObject")
|
.select("dedupId", "id", "kryoObject")
|
||||||
.as(Encoders.tuple(Encoders.STRING(), Encoders.STRING(), kryoEncoder))
|
.as(Encoders.tuple(Encoders.STRING(), Encoders.STRING(), kryoEncoder))
|
||||||
.map((MapFunction<Tuple3<String, String, OafEntity>, DedupRecordReduceState>) t -> new DedupRecordReduceState(t._1(), t._2(), t._3()), Encoders.kryo(DedupRecordReduceState.class))
|
.map(
|
||||||
.groupByKey((MapFunction<DedupRecordReduceState, String>) DedupRecordReduceState::getDedupId, Encoders.STRING())
|
(MapFunction<Tuple3<String, String, OafEntity>, DedupRecordReduceState>) t -> new DedupRecordReduceState(
|
||||||
|
t._1(), t._2(), t._3()),
|
||||||
|
Encoders.kryo(DedupRecordReduceState.class))
|
||||||
|
.groupByKey(
|
||||||
|
(MapFunction<DedupRecordReduceState, String>) DedupRecordReduceState::getDedupId, Encoders.STRING())
|
||||||
.reduceGroups(
|
.reduceGroups(
|
||||||
(ReduceFunction<DedupRecordReduceState>) (t1, t2) -> {
|
(ReduceFunction<DedupRecordReduceState>) (t1, t2) -> {
|
||||||
if (t1.entity == null) {
|
if (t1.entity == null) {
|
||||||
|
@ -105,10 +112,8 @@ public class DedupRecordFactory {
|
||||||
t1.entity = reduceEntity(t1.entity, t2.entity);
|
t1.entity = reduceEntity(t1.entity, t2.entity);
|
||||||
|
|
||||||
return t1;
|
return t1;
|
||||||
}
|
})
|
||||||
)
|
.flatMap((FlatMapFunction<Tuple2<String, DedupRecordReduceState>, OafEntity>) t -> {
|
||||||
.flatMap
|
|
||||||
((FlatMapFunction<Tuple2<String, DedupRecordReduceState>, OafEntity>) t -> {
|
|
||||||
String dedupId = t._1();
|
String dedupId = t._1();
|
||||||
DedupRecordReduceState agg = t._2();
|
DedupRecordReduceState agg = t._2();
|
||||||
|
|
||||||
|
@ -116,7 +121,8 @@ public class DedupRecordFactory {
|
||||||
return Collections.emptyIterator();
|
return Collections.emptyIterator();
|
||||||
}
|
}
|
||||||
|
|
||||||
return Stream.concat(Stream.of(agg.getDedupId()), agg.aliases.stream())
|
return Stream
|
||||||
|
.concat(Stream.of(agg.getDedupId()), agg.aliases.stream())
|
||||||
.map(id -> {
|
.map(id -> {
|
||||||
try {
|
try {
|
||||||
OafEntity res = (OafEntity) BeanUtils.cloneBean(agg.entity);
|
OafEntity res = (OafEntity) BeanUtils.cloneBean(agg.entity);
|
||||||
|
@ -127,7 +133,8 @@ public class DedupRecordFactory {
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
throw new RuntimeException(e);
|
throw new RuntimeException(e);
|
||||||
}
|
}
|
||||||
}).iterator();
|
})
|
||||||
|
.iterator();
|
||||||
}, beanEncoder);
|
}, beanEncoder);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -137,7 +144,6 @@ public class DedupRecordFactory {
|
||||||
return entity;
|
return entity;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
int compare = new IdentifierComparator<>()
|
int compare = new IdentifierComparator<>()
|
||||||
.compare(Identifier.newInstance(entity), Identifier.newInstance(duplicate));
|
.compare(Identifier.newInstance(entity), Identifier.newInstance(duplicate));
|
||||||
|
|
||||||
|
|
|
@ -242,13 +242,14 @@ public class SparkCreateMergeRels extends AbstractSparkAction {
|
||||||
|
|
||||||
// this was a pivot in a previous graph but it has been merged into a new group with different
|
// this was a pivot in a previous graph but it has been merged into a new group with different
|
||||||
// pivot
|
// pivot
|
||||||
if (!r.isNullAt(r.fieldIndex("lastUsage")) && !pivot.equals(id) && !dedupId.equals(pivotDedupId)) {
|
if (!r.isNullAt(r.fieldIndex("lastUsage")) && !pivot.equals(id)
|
||||||
|
&& !dedupId.equals(pivotDedupId)) {
|
||||||
// materialize the previous dedup record as a merge relation with the new one
|
// materialize the previous dedup record as a merge relation with the new one
|
||||||
res.add(new Tuple3<>(dedupId, pivotDedupId, null));
|
res.add(new Tuple3<>(dedupId, pivotDedupId, null));
|
||||||
}
|
}
|
||||||
|
|
||||||
// add merge relations
|
// add merge relations
|
||||||
if (cut <=0 || r.<Integer>getAs("position") <= cut) {
|
if (cut <= 0 || r.<Integer> getAs("position") <= cut) {
|
||||||
res.add(new Tuple3<>(id, pivotDedupId, pivot));
|
res.add(new Tuple3<>(id, pivotDedupId, pivot));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,26 @@
|
||||||
|
<configuration>
|
||||||
|
<property>
|
||||||
|
<name>jobTracker</name>
|
||||||
|
<value>yarnRM</value>
|
||||||
|
</property>
|
||||||
|
<property>
|
||||||
|
<name>nameNode</name>
|
||||||
|
<value>hdfs://nameservice1</value>
|
||||||
|
</property>
|
||||||
|
<property>
|
||||||
|
<name>oozie.use.system.libpath</name>
|
||||||
|
<value>true</value>
|
||||||
|
</property>
|
||||||
|
<property>
|
||||||
|
<name>oozie.action.sharelib.for.spark</name>
|
||||||
|
<value>spark2</value>
|
||||||
|
</property>
|
||||||
|
<property>
|
||||||
|
<name>hiveMetastoreUris</name>
|
||||||
|
<value>thrift://iis-cdh5-test-m3.ocean.icm.edu.pl:9083</value>
|
||||||
|
</property>
|
||||||
|
<property>
|
||||||
|
<name>sparkSqlWarehouseDir</name>
|
||||||
|
<value>/user/hive/warehouse</value>
|
||||||
|
</property>
|
||||||
|
</configuration>
|
|
@ -0,0 +1,62 @@
|
||||||
|
|
||||||
|
CREATE TABLE `${pivot_history_db}`.`dataset_new` STORED AS PARQUET AS
|
||||||
|
WITH pivots (
|
||||||
|
SELECT property.value AS id, '${new_graph_date}' AS usedIn FROM `${new_graph_db}`.`relation`
|
||||||
|
LEFT SEMI JOIN `${new_graph_db}`.`dataset` ON relation.source = dataset.id
|
||||||
|
LATERAL VIEW EXPLODE(properties) AS property WHERE relClass = 'isMergedIn' AND property.key = 'pivot'
|
||||||
|
UNION
|
||||||
|
SELECT id, usedIn FROM `${pivot_history_db}`.`dataset` LATERAL VIEW EXPLODE(usages) AS usedIn
|
||||||
|
)
|
||||||
|
SELECT id, min(usedIn) as firstUsage, max(usedIn) as lastUsage, collect_set(usedIn) as usages
|
||||||
|
FROM pivots
|
||||||
|
GROUP BY id; /*EOS*/
|
||||||
|
CREATE TABLE `${pivot_history_db}`.`publication_new` STORED AS PARQUET AS
|
||||||
|
WITH pivots (
|
||||||
|
SELECT property.value AS id, '${new_graph_date}' AS usedIn FROM `${new_graph_db}`.`relation`
|
||||||
|
LEFT SEMI JOIN `${new_graph_db}`.`publication` ON relation.source = publication.id
|
||||||
|
LATERAL VIEW EXPLODE(properties) AS property WHERE relClass = 'isMergedIn' AND property.key = 'pivot'
|
||||||
|
UNION
|
||||||
|
SELECT id, usedIn FROM `${pivot_history_db}`.`publication` LATERAL VIEW EXPLODE(usages) AS usedIn
|
||||||
|
)
|
||||||
|
SELECT id, min(usedIn) as firstUsage, max(usedIn) as lastUsage, collect_set(usedIn) as usages
|
||||||
|
FROM pivots
|
||||||
|
GROUP BY id; /*EOS*/
|
||||||
|
CREATE TABLE `${pivot_history_db}`.`software_new` STORED AS PARQUET AS
|
||||||
|
WITH pivots (
|
||||||
|
SELECT property.value AS id, '${new_graph_date}' AS usedIn FROM `${new_graph_db}`.`relation`
|
||||||
|
LEFT SEMI JOIN `${new_graph_db}`.`software` ON relation.source = software.id
|
||||||
|
LATERAL VIEW EXPLODE(properties) AS property WHERE relClass = 'isMergedIn' AND property.key = 'pivot'
|
||||||
|
UNION
|
||||||
|
SELECT id, usedIn FROM `${pivot_history_db}`.`software` LATERAL VIEW EXPLODE(usages) AS usedIn
|
||||||
|
)
|
||||||
|
SELECT id, min(usedIn) as firstUsage, max(usedIn) as lastUsage, collect_set(usedIn) as usages
|
||||||
|
FROM pivots
|
||||||
|
GROUP BY id; /*EOS*/
|
||||||
|
CREATE TABLE `${pivot_history_db}`.`otherresearchproduct_new` STORED AS PARQUET AS
|
||||||
|
WITH pivots (
|
||||||
|
SELECT property.value AS id, '${new_graph_date}' AS usedIn FROM `${new_graph_db}`.`relation`
|
||||||
|
LEFT SEMI JOIN `${new_graph_db}`.`otherresearchproduct` ON relation.source = otherresearchproduct.id
|
||||||
|
LATERAL VIEW EXPLODE(properties) AS property WHERE relClass = 'isMergedIn' AND property.key = 'pivot'
|
||||||
|
UNION
|
||||||
|
SELECT id, usedIn FROM `${pivot_history_db}`.`otherresearchproduct` LATERAL VIEW EXPLODE(usages) AS usedIn
|
||||||
|
)
|
||||||
|
SELECT id, min(usedIn) as firstUsage, max(usedIn) as lastUsage, collect_set(usedIn) as usages
|
||||||
|
FROM pivots
|
||||||
|
GROUP BY id; /*EOS*/
|
||||||
|
|
||||||
|
|
||||||
|
DROP TABLE IF EXISTS `${pivot_history_db}`.`dataset_old`; /*EOS*/
|
||||||
|
ALTER TABLE `${pivot_history_db}`.`dataset` RENAME TO `${pivot_history_db}`.`dataset_old`; /*EOS*/
|
||||||
|
ALTER TABLE `${pivot_history_db}`.`dataset_new` RENAME TO `${pivot_history_db}`.`dataset`; /*EOS*/
|
||||||
|
|
||||||
|
DROP TABLE IF EXISTS `${pivot_history_db}`.`publication_old`; /*EOS*/
|
||||||
|
ALTER TABLE `${pivot_history_db}`.`publication` RENAME TO `${pivot_history_db}`.`publication_old`; /*EOS*/
|
||||||
|
ALTER TABLE `${pivot_history_db}`.`publication_new` RENAME TO `${pivot_history_db}`.`publication`; /*EOS*/
|
||||||
|
|
||||||
|
DROP TABLE IF EXISTS `${pivot_history_db}`.`software_old`; /*EOS*/
|
||||||
|
ALTER TABLE `${pivot_history_db}`.`software` RENAME TO `${pivot_history_db}`.`software_old`; /*EOS*/
|
||||||
|
ALTER TABLE `${pivot_history_db}`.`software_new` RENAME TO `${pivot_history_db}`.`software`; /*EOS*/
|
||||||
|
|
||||||
|
DROP TABLE IF EXISTS `${pivot_history_db}`.`otherresearchproduct_old`; /*EOS*/
|
||||||
|
ALTER TABLE `${pivot_history_db}`.`otherresearchproduct` RENAME TO `${pivot_history_db}`.`otherresearchproduct_old`; /*EOS*/
|
||||||
|
ALTER TABLE `${pivot_history_db}`.`otherresearchproduct_new` RENAME TO `${pivot_history_db}`.`otherresearchproduct`; /*EOS*/
|
|
@ -0,0 +1,95 @@
|
||||||
|
<workflow-app name="Update pivot history" xmlns="uri:oozie:workflow:0.5">
|
||||||
|
<parameters>
|
||||||
|
<!-- properties used in SQL -->
|
||||||
|
<property>
|
||||||
|
<name>pivot_history_db</name>
|
||||||
|
<!-- <value>openaire_beta_pivots_test</value> -->
|
||||||
|
<description>Pivot history DB on hive</description>
|
||||||
|
</property>
|
||||||
|
<property>
|
||||||
|
<name>new_graph_db</name>
|
||||||
|
<!--<value>openaire_beta_20231208</value> -->
|
||||||
|
<description>New graph DB on hive</description>
|
||||||
|
</property>
|
||||||
|
<property>
|
||||||
|
<name>new_graph_date</name>
|
||||||
|
<!-- <value>20231208</value> -->
|
||||||
|
<description>Creation date of new graph db</description>
|
||||||
|
</property>
|
||||||
|
|
||||||
|
<!-- RunSQLSparkJob properties -->
|
||||||
|
<property>
|
||||||
|
<name>hiveMetastoreUris</name>
|
||||||
|
<description>hive server metastore URIs</description>
|
||||||
|
</property>
|
||||||
|
<property>
|
||||||
|
<name>sparkSqlWarehouseDir</name>
|
||||||
|
</property>
|
||||||
|
<!-- General oozie workflow properties -->
|
||||||
|
<property>
|
||||||
|
<name>sparkClusterOpts</name>
|
||||||
|
<value>--conf spark.network.timeout=600 --conf spark.extraListeners= --conf spark.sql.queryExecutionListeners= --conf spark.yarn.historyServer.address=http://iis-cdh5-test-m3.ocean.icm.edu.pl:18088 --conf spark.eventLog.dir=hdfs://nameservice1/user/spark/applicationHistory</value>
|
||||||
|
<description>spark cluster-wide options</description>
|
||||||
|
</property>
|
||||||
|
<property>
|
||||||
|
<name>sparkResourceOpts</name>
|
||||||
|
<value>--executor-memory=3G --conf spark.executor.memoryOverhead=3G --executor-cores=6 --driver-memory=8G --driver-cores=4</value>
|
||||||
|
<description>spark resource options</description>
|
||||||
|
</property>
|
||||||
|
<property>
|
||||||
|
<name>sparkApplicationOpts</name>
|
||||||
|
<value>--conf spark.sql.shuffle.partitions=3840</value>
|
||||||
|
<description>spark resource options</description>
|
||||||
|
</property>
|
||||||
|
</parameters>
|
||||||
|
|
||||||
|
<global>
|
||||||
|
<job-tracker>${jobTracker}</job-tracker>
|
||||||
|
<name-node>${nameNode}</name-node>
|
||||||
|
<configuration>
|
||||||
|
<property>
|
||||||
|
<name>mapreduce.job.queuename</name>
|
||||||
|
<value>${queueName}</value>
|
||||||
|
</property>
|
||||||
|
<property>
|
||||||
|
<name>oozie.launcher.mapred.job.queue.name</name>
|
||||||
|
<value>${oozieLauncherQueueName}</value>
|
||||||
|
</property>
|
||||||
|
<property>
|
||||||
|
<name>oozie.action.sharelib.for.spark</name>
|
||||||
|
<value>${oozieActionShareLibForSpark2}</value>
|
||||||
|
</property>
|
||||||
|
</configuration>
|
||||||
|
</global>
|
||||||
|
|
||||||
|
<start to="UpgradePivotHistory"/>
|
||||||
|
|
||||||
|
<kill name="Kill">
|
||||||
|
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
|
||||||
|
</kill>
|
||||||
|
|
||||||
|
<action name="UpgradePivotHistory">
|
||||||
|
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||||
|
<master>yarn</master>
|
||||||
|
<mode>cluster</mode>
|
||||||
|
<name>Upgrade Pivot History</name>
|
||||||
|
<class>eu.dnetlib.dhp.oozie.RunSQLSparkJob</class>
|
||||||
|
<jar>dhp-dedup-openaire-${projectVersion}.jar</jar>
|
||||||
|
<spark-opts>
|
||||||
|
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
|
||||||
|
${sparkClusterOpts}
|
||||||
|
${sparkResourceOpts}
|
||||||
|
${sparkApplicationOpts}
|
||||||
|
</spark-opts>
|
||||||
|
<arg>--hiveMetastoreUris</arg><arg>${hiveMetastoreUris}</arg>
|
||||||
|
<arg>--sql</arg><arg>eu/dnetlib/dhp/oa/dedup/pivothistory/oozie_app/sql.sql</arg>
|
||||||
|
<arg>--pivot_history_db</arg><arg>${pivot_history_db}</arg>
|
||||||
|
<arg>--new_graph_db</arg><arg>${new_graph_db}</arg>
|
||||||
|
<arg>--new_graph_date</arg><arg>${new_graph_date}</arg>
|
||||||
|
</spark>
|
||||||
|
<ok to="End"/>
|
||||||
|
<error to="Kill"/>
|
||||||
|
</action>
|
||||||
|
|
||||||
|
<end name="End"/>
|
||||||
|
</workflow-app>
|
|
@ -43,6 +43,17 @@
|
||||||
<arg>--graphPath</arg><arg>${graphPath}</arg>
|
<arg>--graphPath</arg><arg>${graphPath}</arg>
|
||||||
<arg>--master</arg><arg>yarn</arg>
|
<arg>--master</arg><arg>yarn</arg>
|
||||||
</spark>
|
</spark>
|
||||||
|
<ok to="reset_outputpath"/>
|
||||||
|
<error to="Kill"/>
|
||||||
|
</action>
|
||||||
|
|
||||||
|
<action name="reset_outputpath">
|
||||||
|
<fs>
|
||||||
|
<delete path="${graphPath}/datasource"/>
|
||||||
|
<delete path="${graphPath}/organization"/>
|
||||||
|
<delete path="${graphPath}/project"/>
|
||||||
|
<delete path="${graphPath}/relation"/>
|
||||||
|
</fs>
|
||||||
<ok to="copy_datasource"/>
|
<ok to="copy_datasource"/>
|
||||||
<error to="Kill"/>
|
<error to="Kill"/>
|
||||||
</action>
|
</action>
|
||||||
|
|
|
@ -62,8 +62,8 @@ public class XmlConverterJob {
|
||||||
final String outputPath = parser.get("outputPath");
|
final String outputPath = parser.get("outputPath");
|
||||||
log.info("outputPath: {}", outputPath);
|
log.info("outputPath: {}", outputPath);
|
||||||
|
|
||||||
final String isLookupUrl = parser.get("isLookupUrl");
|
final String contextApiBaseUrl = parser.get("contextApiBaseUrl");
|
||||||
log.info("isLookupUrl: {}", isLookupUrl);
|
log.info("contextApiBaseUrl: {}", contextApiBaseUrl);
|
||||||
|
|
||||||
final SparkConf conf = new SparkConf();
|
final SparkConf conf = new SparkConf();
|
||||||
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
|
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
|
||||||
|
@ -71,7 +71,7 @@ public class XmlConverterJob {
|
||||||
|
|
||||||
runWithSparkSession(conf, isSparkSessionManaged, spark -> {
|
runWithSparkSession(conf, isSparkSessionManaged, spark -> {
|
||||||
removeOutputDir(spark, outputPath);
|
removeOutputDir(spark, outputPath);
|
||||||
convertToXml(spark, inputPath, outputPath, ContextMapper.fromIS(isLookupUrl));
|
convertToXml(spark, inputPath, outputPath, ContextMapper.fromAPI(contextApiBaseUrl));
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1,18 +1,22 @@
|
||||||
|
|
||||||
package eu.dnetlib.dhp.oa.provision.utils;
|
package eu.dnetlib.dhp.oa.provision.utils;
|
||||||
|
|
||||||
import java.io.Serializable;
|
import java.io.*;
|
||||||
import java.io.StringReader;
|
import java.net.HttpURLConnection;
|
||||||
|
import java.net.URL;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
|
|
||||||
import org.dom4j.Document;
|
import org.dom4j.Document;
|
||||||
import org.dom4j.DocumentException;
|
import org.dom4j.DocumentException;
|
||||||
import org.dom4j.Node;
|
import org.dom4j.Node;
|
||||||
import org.dom4j.io.SAXReader;
|
import org.dom4j.io.SAXReader;
|
||||||
|
import org.jetbrains.annotations.NotNull;
|
||||||
import org.xml.sax.SAXException;
|
import org.xml.sax.SAXException;
|
||||||
|
|
||||||
import com.google.common.base.Joiner;
|
import com.google.common.base.Joiner;
|
||||||
|
|
||||||
|
import eu.dnetlib.dhp.common.api.context.*;
|
||||||
|
import eu.dnetlib.dhp.common.rest.DNetRestClient;
|
||||||
import eu.dnetlib.dhp.utils.ISLookupClientFactory;
|
import eu.dnetlib.dhp.utils.ISLookupClientFactory;
|
||||||
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
|
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
|
||||||
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
|
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
|
||||||
|
@ -23,6 +27,42 @@ public class ContextMapper extends HashMap<String, ContextDef> implements Serial
|
||||||
|
|
||||||
private static final String XQUERY = "for $x in //RESOURCE_PROFILE[.//RESOURCE_TYPE/@value='ContextDSResourceType']//*[name()='context' or name()='category' or name()='concept'] return <entry id=\"{$x/@id}\" label=\"{$x/@label|$x/@name}\" name=\"{$x/name()}\" type=\"{$x/@type}\"/>";
|
private static final String XQUERY = "for $x in //RESOURCE_PROFILE[.//RESOURCE_TYPE/@value='ContextDSResourceType']//*[name()='context' or name()='category' or name()='concept'] return <entry id=\"{$x/@id}\" label=\"{$x/@label|$x/@name}\" name=\"{$x/name()}\" type=\"{$x/@type}\"/>";
|
||||||
|
|
||||||
|
public static ContextMapper fromAPI(final String baseURL) throws Exception {
|
||||||
|
|
||||||
|
final ContextMapper contextMapper = new ContextMapper();
|
||||||
|
|
||||||
|
for (ContextSummary ctx : DNetRestClient.doGET(baseURL + "/contexts", ContextSummaryList.class)) {
|
||||||
|
|
||||||
|
contextMapper.put(ctx.getId(), new ContextDef(ctx.getId(), ctx.getLabel(), "context", ctx.getType()));
|
||||||
|
|
||||||
|
for (CategorySummary cat : DNetRestClient
|
||||||
|
.doGET(baseURL + "/context/" + ctx.getId(), CategorySummaryList.class)) {
|
||||||
|
contextMapper.put(cat.getId(), new ContextDef(cat.getId(), cat.getLabel(), "category", ""));
|
||||||
|
if (cat.isHasConcept()) {
|
||||||
|
for (ConceptSummary c : DNetRestClient
|
||||||
|
.doGET(baseURL + "/context/category/" + cat.getId(), ConceptSummaryList.class)) {
|
||||||
|
contextMapper.put(c.getId(), new ContextDef(c.getId(), c.getLabel(), "concept", ""));
|
||||||
|
if (c.isHasSubConcept()) {
|
||||||
|
for (ConceptSummary cs : c.getConcepts()) {
|
||||||
|
contextMapper.put(cs.getId(), new ContextDef(cs.getId(), cs.getLabel(), "concept", ""));
|
||||||
|
if (cs.isHasSubConcept()) {
|
||||||
|
for (ConceptSummary css : cs.getConcepts()) {
|
||||||
|
contextMapper
|
||||||
|
.put(
|
||||||
|
css.getId(),
|
||||||
|
new ContextDef(css.getId(), css.getLabel(), "concept", ""));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return contextMapper;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Deprecated
|
||||||
public static ContextMapper fromIS(final String isLookupUrl)
|
public static ContextMapper fromIS(final String isLookupUrl)
|
||||||
throws DocumentException, ISLookUpException, SAXException {
|
throws DocumentException, ISLookUpException, SAXException {
|
||||||
ISLookUpService isLookUp = ISLookupClientFactory.getLookUpService(isLookupUrl);
|
ISLookUpService isLookUp = ISLookupClientFactory.getLookUpService(isLookupUrl);
|
||||||
|
@ -32,6 +72,7 @@ public class ContextMapper extends HashMap<String, ContextDef> implements Serial
|
||||||
return fromXml(sb.toString());
|
return fromXml(sb.toString());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Deprecated
|
||||||
public static ContextMapper fromXml(final String xml) throws DocumentException, SAXException {
|
public static ContextMapper fromXml(final String xml) throws DocumentException, SAXException {
|
||||||
final ContextMapper contextMapper = new ContextMapper();
|
final ContextMapper contextMapper = new ContextMapper();
|
||||||
|
|
||||||
|
|
|
@ -12,9 +12,9 @@
|
||||||
"paramRequired": true
|
"paramRequired": true
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
"paramName": "ilu",
|
"paramName": "cau",
|
||||||
"paramLongName": "isLookupUrl",
|
"paramLongName": "contextApiBaseUrl",
|
||||||
"paramDescription": "URL of the isLookUp Service",
|
"paramDescription": "URL of the context API",
|
||||||
"paramRequired": true
|
"paramRequired": true
|
||||||
}
|
}
|
||||||
]
|
]
|
||||||
|
|
|
@ -9,6 +9,10 @@
|
||||||
<name>isLookupUrl</name>
|
<name>isLookupUrl</name>
|
||||||
<description>URL for the isLookup service</description>
|
<description>URL for the isLookup service</description>
|
||||||
</property>
|
</property>
|
||||||
|
<property>
|
||||||
|
<name>contextApiBaseUrl</name>
|
||||||
|
<description>context API URL</description>
|
||||||
|
</property>
|
||||||
<property>
|
<property>
|
||||||
<name>relPartitions</name>
|
<name>relPartitions</name>
|
||||||
<description>number or partitions for the relations Dataset</description>
|
<description>number or partitions for the relations Dataset</description>
|
||||||
|
@ -589,7 +593,7 @@
|
||||||
</spark-opts>
|
</spark-opts>
|
||||||
<arg>--inputPath</arg><arg>${workingDir}/join_entities</arg>
|
<arg>--inputPath</arg><arg>${workingDir}/join_entities</arg>
|
||||||
<arg>--outputPath</arg><arg>${workingDir}/xml</arg>
|
<arg>--outputPath</arg><arg>${workingDir}/xml</arg>
|
||||||
<arg>--isLookupUrl</arg><arg>${isLookupUrl}</arg>
|
<arg>--contextApiBaseUrl</arg><arg>${contextApiBaseUrl}</arg>
|
||||||
</spark>
|
</spark>
|
||||||
<ok to="should_index"/>
|
<ok to="should_index"/>
|
||||||
<error to="Kill"/>
|
<error to="Kill"/>
|
||||||
|
|
Loading…
Reference in New Issue