some wf fixing

This commit is contained in:
Michele Artini 2020-06-18 13:14:10 +02:00
parent 9e2c23e391
commit 9a847b4557
4 changed files with 53 additions and 54 deletions

View File

@ -24,11 +24,7 @@
<groupId>org.apache.spark</groupId> <groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId> <artifactId>spark-sql_2.11</artifactId>
</dependency> </dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_2.11</artifactId>
<scope>test</scope>
</dependency>
<dependency> <dependency>
<groupId>eu.dnetlib.dhp</groupId> <groupId>eu.dnetlib.dhp</groupId>
@ -45,10 +41,6 @@
<artifactId>dnet-pace-core</artifactId> <artifactId>dnet-pace-core</artifactId>
</dependency> </dependency>
<dependency>
<groupId>com.jayway.jsonpath</groupId>
<artifactId>json-path</artifactId>
</dependency>
<dependency> <dependency>
<groupId>dom4j</groupId> <groupId>dom4j</groupId>
<artifactId>dom4j</artifactId> <artifactId>dom4j</artifactId>

View File

@ -31,7 +31,6 @@ import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.OpenaireBrokerResultAg
import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.RelatedEntityFactory; import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.RelatedEntityFactory;
import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.RelatedProject; import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.RelatedProject;
import eu.dnetlib.dhp.common.HdfsSupport; import eu.dnetlib.dhp.common.HdfsSupport;
import eu.dnetlib.dhp.schema.oaf.OtherResearchProduct;
import eu.dnetlib.dhp.schema.oaf.Project; import eu.dnetlib.dhp.schema.oaf.Project;
import eu.dnetlib.dhp.schema.oaf.Publication; import eu.dnetlib.dhp.schema.oaf.Publication;
import eu.dnetlib.dhp.schema.oaf.Relation; import eu.dnetlib.dhp.schema.oaf.Relation;
@ -51,9 +50,8 @@ public class GenerateEventsApplication {
public static void main(final String[] args) throws Exception { public static void main(final String[] args) throws Exception {
final ArgumentApplicationParser parser = new ArgumentApplicationParser( final ArgumentApplicationParser parser = new ArgumentApplicationParser(
IOUtils IOUtils
.toString( .toString(GenerateEventsApplication.class
GenerateEventsApplication.class .getResourceAsStream("/eu/dnetlib/dhp/broker/oa/generate_broker_events.json")));
.getResourceAsStream("/eu/dnetlib/dhp/broker/oa/generate_broker_events.json")));
parser.parseArgument(args); parser.parseArgument(args);
final Boolean isSparkSessionManaged = Optional final Boolean isSparkSessionManaged = Optional
@ -78,18 +76,21 @@ public class GenerateEventsApplication {
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
conf.registerKryoClasses(BrokerConstants.getModelClasses()); conf.registerKryoClasses(BrokerConstants.getModelClasses());
final DedupConfig dedupConfig = loadDedupConfig(isLookupUrl, dedupConfigProfileId); // TODO UNCOMMENT
// final DedupConfig dedupConfig = loadDedupConfig(isLookupUrl, dedupConfigProfileId);
final DedupConfig dedupConfig = null;
runWithSparkSession(conf, isSparkSessionManaged, spark -> { runWithSparkSession(conf, isSparkSessionManaged, spark -> {
removeOutputDir(spark, eventsPath); removeOutputDir(spark, eventsPath);
// TODO UNCOMMENT
spark spark
.emptyDataset(Encoders.kryo(Event.class)) .emptyDataset(Encoders.kryo(Event.class))
.union(generateEvents(spark, graphPath, Publication.class, dedupConfig)) .union(generateEvents(spark, graphPath, Publication.class, dedupConfig))
.union(generateEvents(spark, graphPath, eu.dnetlib.dhp.schema.oaf.Dataset.class, dedupConfig)) // .union(generateEvents(spark, graphPath, eu.dnetlib.dhp.schema.oaf.Dataset.class, dedupConfig))
.union(generateEvents(spark, graphPath, Software.class, dedupConfig)) // .union(generateEvents(spark, graphPath, Software.class, dedupConfig))
.union(generateEvents(spark, graphPath, OtherResearchProduct.class, dedupConfig)) // .union(generateEvents(spark, graphPath, OtherResearchProduct.class, dedupConfig))
.write() .write()
.mode(SaveMode.Overwrite) .mode(SaveMode.Overwrite)
.option("compression", "gzip") .option("compression", "gzip")
@ -117,15 +118,12 @@ public class GenerateEventsApplication {
.toColumn(); .toColumn();
return results return results
.joinWith(mergedRels, results.col("result.id").equalTo(mergedRels.col("source")), "inner") .joinWith(mergedRels, results.col("openaireId").equalTo(mergedRels.col("source")), "inner")
.groupByKey( .groupByKey((MapFunction<Tuple2<OpenaireBrokerResult, Relation>, String>) t -> t._2.getTarget(), Encoders.STRING())
(MapFunction<Tuple2<OpenaireBrokerResult, Relation>, String>) t -> t._2.getTarget(), Encoders.STRING())
.agg(aggr) .agg(aggr)
.map((MapFunction<Tuple2<String, ResultGroup>, ResultGroup>) t -> t._2, Encoders.kryo(ResultGroup.class)) .map((MapFunction<Tuple2<String, ResultGroup>, ResultGroup>) t -> t._2, Encoders.kryo(ResultGroup.class))
.filter(ResultGroup::isValid) .filter(ResultGroup::isValid)
.map( .map((MapFunction<ResultGroup, EventGroup>) g -> EventFinder.generateEvents(g, dedupConfig), Encoders.kryo(EventGroup.class))
(MapFunction<ResultGroup, EventGroup>) g -> EventFinder.generateEvents(g, dedupConfig),
Encoders.kryo(EventGroup.class))
.flatMap(group -> group.getData().iterator(), Encoders.kryo(Event.class)); .flatMap(group -> group.getData().iterator(), Encoders.kryo(Event.class));
} }
@ -133,9 +131,9 @@ public class GenerateEventsApplication {
final SparkSession spark, final SparkSession spark,
final String graphPath, final String graphPath,
final Class<SRC> sourceClass) { final Class<SRC> sourceClass) {
final Dataset<Project> projects = readPath(spark, graphPath + "/project", Project.class); final Dataset<Project> projects = readPath(spark, graphPath + "/project", Project.class);
final Dataset<eu.dnetlib.dhp.schema.oaf.Dataset> datasets = readPath( final Dataset<eu.dnetlib.dhp.schema.oaf.Dataset> datasets = readPath(spark, graphPath + "/dataset", eu.dnetlib.dhp.schema.oaf.Dataset.class);
spark, graphPath + "/dataset", eu.dnetlib.dhp.schema.oaf.Dataset.class);
final Dataset<Software> softwares = readPath(spark, graphPath + "/software", Software.class); final Dataset<Software> softwares = readPath(spark, graphPath + "/software", Software.class);
final Dataset<Publication> publications = readPath(spark, graphPath + "/publication", Publication.class); final Dataset<Publication> publications = readPath(spark, graphPath + "/publication", Publication.class);
@ -143,17 +141,14 @@ public class GenerateEventsApplication {
.filter(r -> !r.getRelClass().equals(BrokerConstants.IS_MERGED_IN_CLASS)) .filter(r -> !r.getRelClass().equals(BrokerConstants.IS_MERGED_IN_CLASS))
.cache(); .cache();
final Dataset<OpenaireBrokerResult> r0 = readPath( final Dataset<OpenaireBrokerResult> r0 = readPath(spark, graphPath + "/" + sourceClass.getSimpleName().toLowerCase(), Result.class)
spark, graphPath + "/" + sourceClass.getSimpleName().toLowerCase(), Result.class) .filter(r -> r.getDataInfo().getDeletedbyinference())
.filter(r -> r.getDataInfo().getDeletedbyinference()) .map(ConversionUtils::oafResultToBrokerResult, Encoders.kryo(OpenaireBrokerResult.class));
.map(ConversionUtils::oafResultToBrokerResult, Encoders.kryo(OpenaireBrokerResult.class));
final Dataset<OpenaireBrokerResult> r1 = join(r0, rels, relatedEntities(projects, rels, RelatedProject.class)); final Dataset<OpenaireBrokerResult> r1 = join(r0, rels, relatedEntities(projects, rels, RelatedProject.class));
final Dataset<OpenaireBrokerResult> r2 = join(r1, rels, relatedEntities(softwares, rels, RelatedProject.class)); final Dataset<OpenaireBrokerResult> r2 = join(r1, rels, relatedEntities(softwares, rels, RelatedProject.class));
final Dataset<OpenaireBrokerResult> r3 = join(r2, rels, relatedEntities(datasets, rels, RelatedProject.class)); final Dataset<OpenaireBrokerResult> r3 = join(r2, rels, relatedEntities(datasets, rels, RelatedProject.class));
final Dataset<OpenaireBrokerResult> r4 = join( final Dataset<OpenaireBrokerResult> r4 = join(r3, rels, relatedEntities(publications, rels, RelatedProject.class));;
r3, rels, relatedEntities(publications, rels, RelatedProject.class));
;
return r4; return r4;
} }
@ -163,9 +158,7 @@ public class GenerateEventsApplication {
final Class<RT> clazz) { final Class<RT> clazz) {
return rels return rels
.joinWith(targets, targets.col("id").equalTo(rels.col("target")), "inner") .joinWith(targets, targets.col("id").equalTo(rels.col("target")), "inner")
.map( .map(t -> RelatedEntityFactory.newRelatedEntity(t._1.getSource(), t._1.getRelType(), t._2, clazz), Encoders.kryo(clazz));
t -> RelatedEntityFactory.newRelatedEntity(t._1.getSource(), t._1.getRelType(), t._2, clazz),
Encoders.kryo(clazz));
} }
private static <T> Dataset<OpenaireBrokerResult> join(final Dataset<OpenaireBrokerResult> sources, private static <T> Dataset<OpenaireBrokerResult> join(final Dataset<OpenaireBrokerResult> sources,
@ -173,15 +166,13 @@ public class GenerateEventsApplication {
final Dataset<T> typedRels) { final Dataset<T> typedRels) {
final TypedColumn<Tuple2<OpenaireBrokerResult, T>, OpenaireBrokerResult> aggr = new OpenaireBrokerResultAggregator<T>() final TypedColumn<Tuple2<OpenaireBrokerResult, T>, OpenaireBrokerResult> aggr = new OpenaireBrokerResultAggregator<T>()
.toColumn(); .toColumn();;
;
return sources return sources.joinWith(typedRels, sources.col("openaireId").equalTo(rels.col("source")), "left_outer")
.joinWith(typedRels, sources.col("result.id").equalTo(rels.col("source")), "left_outer") .groupByKey((MapFunction<Tuple2<OpenaireBrokerResult, T>, String>) t -> t._1.getOpenaireId(), Encoders.STRING())
.groupByKey(
(MapFunction<Tuple2<OpenaireBrokerResult, T>, String>) t -> t._1.getOpenaireId(), Encoders.STRING())
.agg(aggr) .agg(aggr)
.map(t -> t._2, Encoders.kryo(OpenaireBrokerResult.class)); .map(t -> t._2, Encoders.kryo(OpenaireBrokerResult.class));
} }
public static <R> Dataset<R> readPath( public static <R> Dataset<R> readPath(
@ -195,14 +186,12 @@ public class GenerateEventsApplication {
} }
private static DedupConfig loadDedupConfig(final String isLookupUrl, final String profId) throws Exception { private static DedupConfig loadDedupConfig(final String isLookupUrl, final String profId) throws Exception {
final ISLookUpService isLookUpService = ISLookupClientFactory.getLookUpService(isLookupUrl); final ISLookUpService isLookUpService = ISLookupClientFactory.getLookUpService(isLookupUrl);
final String conf = isLookUpService final String conf = isLookUpService
.getResourceProfileByQuery( .getResourceProfileByQuery(String
String .format("for $x in /RESOURCE_PROFILE[.//RESOURCE_IDENTIFIER/@value = '%s'] return $x//DEDUPLICATION/text()", profId));
.format(
"for $x in /RESOURCE_PROFILE[.//RESOURCE_IDENTIFIER/@value = '%s'] return $x//DEDUPLICATION/text()",
profId));
final DedupConfig dedupConfig = new ObjectMapper().readValue(conf, DedupConfig.class); final DedupConfig dedupConfig = new ObjectMapper().readValue(conf, DedupConfig.class);
dedupConfig.getPace().initModel(); dedupConfig.getPace().initModel();

View File

@ -63,8 +63,14 @@ public final class UpdateInfo<T> {
return target; return target;
} }
private float calculateTrust(final DedupConfig dedupConfig, final OpenaireBrokerResult r1, private float calculateTrust(final DedupConfig dedupConfig,
final OpenaireBrokerResult r1,
final OpenaireBrokerResult r2) { final OpenaireBrokerResult r2) {
if (dedupConfig == null) {
return BrokerConstants.MIN_TRUST;
}
try { try {
final ObjectMapper objectMapper = new ObjectMapper(); final ObjectMapper objectMapper = new ObjectMapper();
final MapDocument doc1 = MapDocumentUtil final MapDocument doc1 = MapDocumentUtil

View File

@ -80,20 +80,32 @@
</kill> </kill>
<action name="generate_events"> <action name="generate_events">
<java> <spark xmlns="uri:oozie:spark-action:0.2">
<prepare> <master>yarn</master>
<delete path="${eventsOutputPath}"/> <mode>cluster</mode>
</prepare> <name>GenerateEvents</name>
<main-class>eu.dnetlib.dhp.broker.oa.GenerateEventsApplication</main-class> <class>eu.dnetlib.dhp.broker.oa.GenerateEventsApplication</class>
<jar>dhp-broker-events-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemory}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=3840
</spark-opts>
<arg>--graphPath</arg><arg>${graphInputPath}</arg> <arg>--graphPath</arg><arg>${graphInputPath}</arg>
<arg>--eventsPath</arg><arg>${eventsOutputPath}</arg> <arg>--eventsPath</arg><arg>${eventsOutputPath}</arg>
<arg>--isLookupUrl</arg><arg>${isLookupUrl}</arg> <arg>--isLookupUrl</arg><arg>${isLookupUrl}</arg>
<arg>--dedupConfProfile</arg><arg>${dedupConfProfId}</arg> <arg>--dedupConfProfile</arg><arg>${dedupConfProfId}</arg>
</java> </spark>
<ok to="End"/> <ok to="End"/>
<error to="Kill"/> <error to="Kill"/>
</action> </action>
<end name="End"/> <end name="End"/>
</workflow-app> </workflow-app>