[SKG-IF denormalized] refactoring

This commit is contained in:
Miriam Baglioni 2024-03-26 11:45:59 +01:00
parent 9f9ade077b
commit 8fe452b61d
14 changed files with 251 additions and 78 deletions

View File

@ -80,12 +80,18 @@ public class FilterEntities implements Serializable {
Dataset<Row> filterIds = spark.read().parquet(filterPath + e.name() + "_ids"); Dataset<Row> filterIds = spark.read().parquet(filterPath + e.name() + "_ids");
result result
.joinWith(filterIds, result.col("id").equalTo(filterIds.col("id"))) .join(filterIds, result.col("id").equalTo(filterIds.col("id")), "leftsemi")
.map((MapFunction<Tuple2<R, Row>, R>) t2 -> t2._1(), Encoders.bean(resultClazz))
.write() .write()
.mode(SaveMode.Overwrite) .mode(SaveMode.Overwrite)
.option("compression", "gzip") .option("compression", "gzip")
.json(workingDir + e.name()); .json(workingDir + e.name());
// result
// .joinWith(filterIds, result.col("id").equalTo(filterIds.col("id")))
// .map((MapFunction<Tuple2<R, Row>, R>) t2 -> t2._1(), Encoders.bean(resultClazz))
// .write()
// .mode(SaveMode.Overwrite)
// .option("compression", "gzip")
// .json(workingDir + e.name());
} }
}); });

View File

@ -109,9 +109,8 @@ public class SelectConnectedEntities implements Serializable {
.col("source") .col("source")
.equalTo(resultIds.col("value")), .equalTo(resultIds.col("value")),
"leftsemi") "leftsemi")
.select("target") .select("target")
.distinct() .distinct();
;
Dataset<Row> organization = spark Dataset<Row> organization = spark
.read() .read()
@ -129,7 +128,6 @@ public class SelectConnectedEntities implements Serializable {
.getFunderName(p.getFundingtree().get(0).getValue()) .getFunderName(p.getFundingtree().get(0).getValue())
.equalsIgnoreCase("European Commission")); .equalsIgnoreCase("European Commission"));
organization organization
.join(matchingRels, organization.col("id").equalTo(matchingRels.col("target")), "leftsemi") .join(matchingRels, organization.col("id").equalTo(matchingRels.col("target")), "leftsemi")
.write() .write()
@ -137,7 +135,6 @@ public class SelectConnectedEntities implements Serializable {
.option("compression", "gzip") .option("compression", "gzip")
.json(workingDir + "organization"); .json(workingDir + "organization");
projects projects
.join(matchingRels, projects.col("id").equalTo(matchingRels.col("target")), "leftsemi") .join(matchingRels, projects.col("id").equalTo(matchingRels.col("target")), "leftsemi")
.write() .write()

View File

@ -0,0 +1,94 @@
package eu.dnetlib.dhp.oa.graph.dump.filterentities;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import java.io.Serializable;
import java.util.Optional;
import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.*;
import org.apache.spark.sql.functions.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.oa.graph.dump.skgif.EmitFromEntities;
import eu.dnetlib.dhp.oa.graph.dump.skgif.Utils;
import eu.dnetlib.dhp.schema.common.ModelSupport;
import eu.dnetlib.dhp.schema.oaf.Result;
import scala.Tuple2;
/**
* @author miriam.baglioni
* @Date 20/03/24
*/
public class SelectEOSCEntities implements Serializable {
private static final Logger log = LoggerFactory.getLogger(SelectEOSCEntities.class);
public static void main(String[] args) throws Exception {
String jsonConfiguration = IOUtils
.toString(
FilterEntities.class
.getResourceAsStream(
"/eu/dnetlib/dhp/oa/graph/dump/skgif/eosc_entities_parameters.json"));
final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
parser.parseArgument(args);
Boolean isSparkSessionManaged = Optional
.ofNullable(parser.get("isSparkSessionManaged"))
.map(Boolean::valueOf)
.orElse(Boolean.TRUE);
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
final String inputPath = parser.get("sourcePath");
log.info("inputPath: {}", inputPath);
final String filterPath = parser.get("filterPath");
log.info("filterPath: {}", filterPath);
SparkConf conf = new SparkConf();
runWithSparkSession(
conf,
isSparkSessionManaged,
spark -> {
selectEntities(spark, inputPath, filterPath);
});
}
private static <R extends Result> void selectEntities(SparkSession spark, String inputPath, String filterPath) {
ModelSupport.entityTypes.keySet().forEach(e -> {
if (ModelSupport.isResult(e)) {
spark
.read()
.schema(Encoders.bean(Result.class).schema())
.json(inputPath + e.name())
.where("datainfo.deletedbyinference != true and datainfo.invisible != true")
.select("id", "context")
.withColumn(
"contexts",
org.apache.spark.sql.functions
.explode(
org.apache.spark.sql.functions.col("context")))
.selectExpr("id", "contexts.id as context")
.where("context == 'eosc'")
.drop("context")
.write()
.mode(SaveMode.Overwrite)
.option("compression", "gzip")
.parquet(filterPath + e.name() + "_ids");
//
}
});
}
}

View File

@ -70,7 +70,7 @@ public class DumpDatasource implements Serializable {
conf, conf,
isSparkSessionManaged, isSparkSessionManaged,
spark -> { spark -> {
Utils.removeOutputDir(spark, outputPath + "Datasources"); Utils.removeOutputDir(spark, outputPath + "datasources");
mapDatasource(spark, inputPath, outputPath, workingDir); mapDatasource(spark, inputPath, outputPath, workingDir);
}); });
@ -99,7 +99,7 @@ public class DumpDatasource implements Serializable {
.write() .write()
.mode(SaveMode.Overwrite) .mode(SaveMode.Overwrite)
.option("compression", "gzip") .option("compression", "gzip")
.json(outputPath + "Datasource"); .json(outputPath + "datasource");
} }
private static eu.dnetlib.dhp.skgif.model.Datasource dumpDatasource(Datasource d) { private static eu.dnetlib.dhp.skgif.model.Datasource dumpDatasource(Datasource d) {

View File

@ -71,7 +71,7 @@ public class DumpGrant implements Serializable {
conf, conf,
isSparkSessionManaged, isSparkSessionManaged,
spark -> { spark -> {
Utils.removeOutputDir(spark, outputPath + "Grant"); Utils.removeOutputDir(spark, outputPath + "grants");
mapGrants(spark, inputPath, outputPath, workingDir); mapGrants(spark, inputPath, outputPath, workingDir);
}); });
@ -100,7 +100,7 @@ public class DumpGrant implements Serializable {
.write() .write()
.mode(SaveMode.Overwrite) .mode(SaveMode.Overwrite)
.option("compression", "gzip") .option("compression", "gzip")
.json(outputPath + "Grant"); .json(outputPath + "grants");
} }
private static Grant dumpGrant(Project project) throws DocumentException { private static Grant dumpGrant(Project project) throws DocumentException {

View File

@ -62,7 +62,7 @@ public class DumpOrganization implements Serializable {
conf, conf,
isSparkSessionManaged, isSparkSessionManaged,
spark -> { spark -> {
Utils.removeOutputDir(spark, outputPath + "Organization"); Utils.removeOutputDir(spark, outputPath + "organizations");
mapOrganization(spark, inputPath, outputPath); mapOrganization(spark, inputPath, outputPath);
}); });
@ -122,7 +122,7 @@ public class DumpOrganization implements Serializable {
.write() .write()
.mode(SaveMode.Overwrite) .mode(SaveMode.Overwrite)
.option("compression", "gzip") .option("compression", "gzip")
.json(outputPath + "Organization"); .json(outputPath + "organizations");
} }
private static String getOrganizationType(Organization o) { private static String getOrganizationType(Organization o) {

View File

@ -68,7 +68,7 @@ public class DumpResearchProduct implements Serializable {
conf, conf,
isSparkSessionManaged, isSparkSessionManaged,
spark -> { spark -> {
Utils.removeOutputDir(spark, outputPath + "ResearchProduct"); Utils.removeOutputDir(spark, outputPath + "products");
emitFromResult(spark, inputPath, outputPath, workingDir); emitFromResult(spark, inputPath, outputPath, workingDir);
}); });
@ -87,13 +87,16 @@ public class DumpResearchProduct implements Serializable {
for (EntityType e : ModelSupport.entityTypes.keySet()) { for (EntityType e : ModelSupport.entityTypes.keySet()) {
if (ModelSupport.isResult(e)) if (ModelSupport.isResult(e))
researchProducts = researchProducts researchProducts = researchProducts
.union(Utils.readPath(spark, workingDir + e.name() + "/researchproduct", ResearchProduct.class)); .union(
Utils
.readPath(
spark, workingDir + "products" + e.name() + "/researchproduct", ResearchProduct.class));
} }
researchProducts researchProducts
.write() .write()
.mode(SaveMode.Overwrite) .mode(SaveMode.Overwrite)
.option("compression", "gzip") .option("compression", "gzip")
.json(outputPath + "ResearchProduct"); .json(outputPath + "products");
} }
private static <R extends Result> void dumpResearchProduct(SparkSession spark, String inputPath, String workingDir, private static <R extends Result> void dumpResearchProduct(SparkSession spark, String inputPath, String workingDir,
@ -120,7 +123,7 @@ public class DumpResearchProduct implements Serializable {
Dataset<ResearchProduct> pprWitGrants = spark Dataset<ResearchProduct> pprWitGrants = spark
.read() .read()
.schema(Encoders.bean(ResearchProduct.class).schema()) .schema(Encoders.bean(ResearchProduct.class).schema())
.json(workingDir + e.name() + "/temp_researchproductgrant") .json(workingDir + "products" + e.name() + "/temp_researchproductgrant")
.as(Encoders.bean(ResearchProduct.class)); .as(Encoders.bean(ResearchProduct.class));
Dataset<ProductsRelation> relatedResults = Utils Dataset<ProductsRelation> relatedResults = Utils
.readPath(spark, workingDir + "/relations/related_products", ProductsRelation.class); .readPath(spark, workingDir + "/relations/related_products", ProductsRelation.class);
@ -139,15 +142,15 @@ public class DumpResearchProduct implements Serializable {
.write() .write()
.mode(SaveMode.Overwrite) .mode(SaveMode.Overwrite)
.option("compression", "gzip") .option("compression", "gzip")
.json(workingDir + e.name() + "/researchproduct"); .json(workingDir + "products" + e.name() + "/researchproduct");
Utils.removeOutputDir(spark, workingDir + e.name() + "/temp_researchproductgrant"); Utils.removeOutputDir(spark, workingDir + "products" + e.name() + "/temp_researchproductgrant");
} }
private static void includeFunding(SparkSession spark, String workingDir, EntityType e) { private static void includeFunding(SparkSession spark, String workingDir, EntityType e) {
Dataset<ResearchProduct> prrWithAffiliation = spark Dataset<ResearchProduct> prrWithAffiliation = spark
.read() .read()
.schema(Encoders.bean(ResearchProduct.class).schema()) .schema(Encoders.bean(ResearchProduct.class).schema())
.json(workingDir + e.name() + "/temp_researchproductaff") .json(workingDir + "products" + e.name() + "/temp_researchproductaff")
.as(Encoders.bean(ResearchProduct.class)); .as(Encoders.bean(ResearchProduct.class));
Dataset<GrantRelation> grants = Utils Dataset<GrantRelation> grants = Utils
@ -166,19 +169,19 @@ public class DumpResearchProduct implements Serializable {
.write() .write()
.mode(SaveMode.Overwrite) .mode(SaveMode.Overwrite)
.option("compression", "gzip") .option("compression", "gzip")
.json(workingDir + e.name() + "/temp_researchproductgrant"); .json(workingDir + "products" + e.name() + "/temp_researchproductgrant");
Utils.removeOutputDir(spark, workingDir + e.name() + "/temp_researchproductaff"); Utils.removeOutputDir(spark, workingDir + "products" + e.name() + "/temp_researchproductaff");
} }
private static void includeRelevantOrganization(SparkSession spark, String workingDir, EntityType e) { private static void includeRelevantOrganization(SparkSession spark, String workingDir, EntityType e) {
Dataset<ExtendingOrganization> affiliations = Utils Dataset<ExtendingOrganization> affiliations = Utils
.readPath( .readPath(
spark, workingDir + "relations/result_relevant_organizations", ExtendingOrganization.class); spark, workingDir + "relations/result_relevant_organizations", ExtendingOrganization.class);
Dataset<ResearchProduct> partialResearchProduct = spark Dataset<ResearchProduct> partialResearchProduct = spark
.read() .read()
.schema(Encoders.bean(ResearchProduct.class).schema()) .schema(Encoders.bean(ResearchProduct.class).schema())
.json(workingDir + e.name() + "/temp_researchProduct") .json(workingDir + "products" + e.name() + "/temp_researchProduct")
.as(Encoders.bean(ResearchProduct.class)); .as(Encoders.bean(ResearchProduct.class));
// Dataset<PartialResearchProduct> prrWithAffiliation = // Dataset<PartialResearchProduct> prrWithAffiliation =
partialResearchProduct partialResearchProduct
@ -196,11 +199,12 @@ public class DumpResearchProduct implements Serializable {
.write() .write()
.mode(SaveMode.Overwrite) .mode(SaveMode.Overwrite)
.option("compression", "gzip") .option("compression", "gzip")
.json(workingDir + e.name() + "/temp_researchproductaff"); .json(workingDir + "products" + e.name() + "/temp_researchproductaff");
Utils.removeOutputDir(spark, workingDir + e.name() + "/temp_researchProduct"); Utils.removeOutputDir(spark, workingDir + "products" + e.name() + "/temp_researchProduct");
} }
private static <R extends Result> void dumpOtherResults(SparkSession spark, String inputPath, String workingDir, EntityType e, Class<R> resultClazz) { private static <R extends Result> void dumpOtherResults(SparkSession spark, String inputPath, String workingDir,
EntityType e, Class<R> resultClazz) {
Dataset<R> results = Utils.readPath(spark, inputPath + e.name(), resultClazz); Dataset<R> results = Utils.readPath(spark, inputPath + e.name(), resultClazz);
results.map((MapFunction<R, ResearchProduct>) r -> { results.map((MapFunction<R, ResearchProduct>) r -> {
ArrayList<String> journalHbIds = new ArrayList<>(); ArrayList<String> journalHbIds = new ArrayList<>();
@ -219,10 +223,11 @@ public class DumpResearchProduct implements Serializable {
.write() .write()
.mode(SaveMode.Overwrite) .mode(SaveMode.Overwrite)
.option("compression", "gzip") .option("compression", "gzip")
.json(workingDir + e.name() + "/temp_researchProduct"); .json(workingDir + "products" + e.name() + "/temp_researchProduct");
} }
private static <R extends Result> void dumpPublication(SparkSession spark, String inputPath, String workingDir, EntityType e, Class<R> resultClazz) { private static <R extends Result> void dumpPublication(SparkSession spark, String inputPath, String workingDir,
EntityType e, Class<R> resultClazz) {
Dataset<Tuple2<String, String>> resultHostedBy = Utils Dataset<Tuple2<String, String>> resultHostedBy = Utils
.readPath(spark, inputPath + e.name(), resultClazz) .readPath(spark, inputPath + e.name(), resultClazz)
.flatMap( .flatMap(
@ -278,7 +283,7 @@ public class DumpResearchProduct implements Serializable {
.write() .write()
.mode(SaveMode.Overwrite) .mode(SaveMode.Overwrite)
.option("compression", "gzip") .option("compression", "gzip")
.json(workingDir + e.name() + "/temp_researchProduct"); .json(workingDir + "products" + e.name() + "/temp_researchProduct");
} }
@NotNull @NotNull

View File

@ -12,10 +12,8 @@ import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FilterFunction; import org.apache.spark.api.java.function.FilterFunction;
import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.api.java.function.MapGroupsFunction; import org.apache.spark.api.java.function.MapGroupsFunction;
import org.apache.spark.sql.Dataset; import org.apache.spark.sql.*;
import org.apache.spark.sql.Encoders; import org.apache.spark.sql.types.StructType;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -65,26 +63,29 @@ public class DumpVenue implements Serializable {
conf, conf,
isSparkSessionManaged, isSparkSessionManaged,
spark -> { spark -> {
Utils.removeOutputDir(spark, outputPath + "Venues"); Utils.removeOutputDir(spark, outputPath + "venues");
mapVenue(spark, inputPath, outputPath, workingDir); mapVenue(spark, inputPath, outputPath, workingDir);
}); });
} }
private static void mapVenue(SparkSession spark, String inputPath, String outputPath, String workingDir) { private static void mapVenue(SparkSession spark, String inputPath, String outputPath, String workingDir) {
Dataset<EmitPerManifestation> manifestationDataset = Utils StructType tp = StructType.fromDDL("`hostedby` STRING, `publisher` STRING");
.readPath(spark, workingDir + "datasourcePublisher", EmitPerManifestation.class);
Dataset<Datasource> datasourceDataset = Utils Dataset<Row> journalIdsDataset = spark.read().schema(tp).json(workingDir + "datasourcePublisher");
.readPath(spark, inputPath + "datasource", Datasource.class) Dataset<Datasource> datasourceDataset;
.filter( datasourceDataset = spark
(FilterFunction<Datasource>) d -> !d.getDataInfo().getInvisible() .read()
&& !d.getDataInfo().getDeletedbyinference() .schema(Encoders.bean(Datasource.class).schema())
&& d.getEoscdatasourcetype().getClassid().equalsIgnoreCase("Journal archive")); .json(inputPath + "datasource")
.filter("datainfo.deletedbyinference != true and eoscdatasourcetype.classid == 'Journal archive' ")
.as(Encoders.bean(Datasource.class));
datasourceDataset datasourceDataset
.joinWith( .joinWith(
manifestationDataset, datasourceDataset.col("id").equalTo(manifestationDataset.col("hostedby")), journalIdsDataset, datasourceDataset.col("id").equalTo(journalIdsDataset.col("hostedby")),
"left") "left")
.map((MapFunction<Tuple2<Datasource, EmitPerManifestation>, Venue>) t2 -> { .map((MapFunction<Tuple2<Datasource, Row>, Venue>) t2 -> {
if (!Optional.ofNullable(t2._1().getJournal()).isPresent()) if (!Optional.ofNullable(t2._1().getJournal()).isPresent())
return null; return null;
Venue venue = new Venue(); Venue venue = new Venue();
@ -99,7 +100,7 @@ public class DumpVenue implements Serializable {
venue.setName(d.getOfficialname().getValue()); venue.setName(d.getOfficialname().getValue());
venue.setType(VenueType.JOURNAL.label); venue.setType(VenueType.JOURNAL.label);
if (Optional.ofNullable(t2._2()).isPresent()) if (Optional.ofNullable(t2._2()).isPresent())
venue.setPublisher(t2._2().getPublisher()); venue.setPublisher(t2._2().getAs("publisher"));
venue.setAcronym(null); venue.setAcronym(null);
venue.setSeries(null); venue.setSeries(null);
venue.setIs_currently_full_oa(null); venue.setIs_currently_full_oa(null);
@ -111,16 +112,16 @@ public class DumpVenue implements Serializable {
.write() .write()
.mode(SaveMode.Overwrite) .mode(SaveMode.Overwrite)
.option("compression", "gzip") .option("compression", "gzip")
.json(workingDir + "Venues"); .json(workingDir + "venues");
Utils Utils
.readPath(spark, workingDir + "Venues", Venue.class) .readPath(spark, workingDir + "venues", Venue.class)
.groupByKey((MapFunction<Venue, String>) v -> v.getLocal_identifier(), Encoders.STRING()) .groupByKey((MapFunction<Venue, String>) v -> v.getLocal_identifier(), Encoders.STRING())
.mapGroups((MapGroupsFunction<String, Venue, Venue>) (k, v) -> v.next(), Encoders.bean(Venue.class)) .mapGroups((MapGroupsFunction<String, Venue, Venue>) (k, v) -> v.next(), Encoders.bean(Venue.class))
.write() .write()
.mode(SaveMode.Overwrite) .mode(SaveMode.Overwrite)
.option("compression", "gzip") .option("compression", "gzip")
.json(outputPath + "Venues"); .json(outputPath + "venues");
} }
private static List<Identifier> getVenueIdentifier(Journal journal) { private static List<Identifier> getVenueIdentifier(Journal journal) {

View File

@ -210,7 +210,7 @@ public class EmitFromEntities implements Serializable {
.schema(Encoders.bean(Datasource.class).schema()) .schema(Encoders.bean(Datasource.class).schema())
.json((inputPath + "datasource")) .json((inputPath + "datasource"))
.filter( .filter(
"datainfo.deletedbyinference !=true false and " + "datainfo.deletedbyinference !=true and " +
"eoscdatasourcetype.classid == 'Journal archive' ") "eoscdatasourcetype.classid == 'Journal archive' ")
.select("id"); .select("id");
@ -314,7 +314,7 @@ public class EmitFromEntities implements Serializable {
.write() .write()
.mode(SaveMode.Overwrite) .mode(SaveMode.Overwrite)
.option("compression", "gzip") .option("compression", "gzip")
.json(outputPath + "/Topic"); .json(outputPath + "/topics");
} }
@ -389,7 +389,7 @@ public class EmitFromEntities implements Serializable {
.write() .write()
.mode(SaveMode.Overwrite) .mode(SaveMode.Overwrite)
.option("compression", "gzip") .option("compression", "gzip")
.json(outputPath + "/Persons"); .json(outputPath + "/persons");
} }

View File

@ -424,7 +424,7 @@ public class SelectRelation implements Serializable {
"from org " + "from org " +
"lateral view explode (pid) p as pide "; "lateral view explode (pid) p as pide ";
Dataset<MinOrganization> minOrganizations = spark return spark
.sql(query) .sql(query)
.groupByKey((MapFunction<Row, String>) r -> r.getAs("id"), Encoders.STRING()) .groupByKey((MapFunction<Row, String>) r -> r.getAs("id"), Encoders.STRING())
.mapGroups((MapGroupsFunction<String, Row, MinOrganization>) (k, v) -> { .mapGroups((MapGroupsFunction<String, Row, MinOrganization>) (k, v) -> {
@ -436,7 +436,7 @@ public class SelectRelation implements Serializable {
v.forEachRemaining(row -> addOrganizationPid(mn, row)); v.forEachRemaining(row -> addOrganizationPid(mn, row));
return mn; return mn;
}, Encoders.bean(MinOrganization.class)); }, Encoders.bean(MinOrganization.class));
return minOrganizations;
} }
private static void addOrganizationPid(MinOrganization mo, Row next) { private static void addOrganizationPid(MinOrganization mo, Row next) {

View File

@ -0,0 +1,20 @@
[
{
"paramName":"s",
"paramLongName":"sourcePath",
"paramDescription": "the path of the sequencial file to read",
"paramRequired": true
},
{
"paramName": "fp",
"paramLongName": "filterPath",
"paramDescription": "the path used to store temporary output files",
"paramRequired": true
},
{
"paramName": "ssm",
"paramLongName": "isSparkSessionManaged",
"paramDescription": "true if the spark session is managed, false otherwise",
"paramRequired": false
}
]

View File

@ -62,12 +62,39 @@
</property> </property>
</configuration> </configuration>
</global> </global>
<start to="dump_organization"/> <start to="dump_venue"/>
<kill name="Kill"> <kill name="Kill">
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message> <message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill> </kill>
<action name="select_eosc_results">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Selecting subset of results</name>
<class>eu.dnetlib.dhp.oa.graph.dump.filterentities.SelectEOSCEntities</class>
<jar>dump-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=4
--executor-memory=4G
--driver-memory=${sparkDriverMemory}
--conf spark.executor.memoryOverhead=5G
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
--conf spark.sql.shuffle.partitions=15000
</spark-opts>
<arg>--sourcePath</arg><arg>${sourcePath}</arg>
<arg>--filterPath</arg><arg>${filterPath}/eoscIds/</arg>
</spark>
<ok to="filter"/>
<error to="Kill"/>
</action>
<action name="filter"> <action name="filter">
<spark xmlns="uri:oozie:spark-action:0.2"> <spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master> <master>yarn</master>
@ -89,8 +116,8 @@
--conf spark.sql.shuffle.partitions=15000 --conf spark.sql.shuffle.partitions=15000
</spark-opts> </spark-opts>
<arg>--sourcePath</arg><arg>${sourcePath}</arg> <arg>--sourcePath</arg><arg>${sourcePath}</arg>
<arg>--workingDir</arg><arg>${workingDir}/graph/</arg> <arg>--workingDir</arg><arg>${filterPath}/graph/</arg>
<arg>--filterPath</arg><arg>${filterPath}</arg> <arg>--filterPath</arg><arg>${filterPath}/eoscIds/</arg>
</spark> </spark>
<ok to="select_relevant_graph_subset"/> <ok to="select_relevant_graph_subset"/>
<error to="Kill"/> <error to="Kill"/>
@ -115,9 +142,9 @@
--conf spark.sql.shuffle.partitions=15000 --conf spark.sql.shuffle.partitions=15000
</spark-opts> </spark-opts>
<arg>--sourcePath</arg><arg>${sourcePath}</arg> <arg>--sourcePath</arg><arg>${sourcePath}</arg>
<arg>--workingDir</arg><arg>${workingDir}/graph/</arg> <arg>--workingDir</arg><arg>${filterPath}/graph/</arg>
<!-- <arg>&#45;&#45;workingDir</arg><arg>/user/miriam.baglioni/oa/graph/dump/temp/graph/</arg>--> <!-- <arg>&#45;&#45;workingDir</arg><arg>/user/miriam.baglioni/oa/graph/dump/temp/graph/</arg>-->
<arg>--filterPath</arg><arg>${filterPath}</arg> <arg>--filterPath</arg><arg>${filterPath}/eoscIds/</arg>
</spark> </spark>
<ok to="select_relation"/> <ok to="select_relation"/>
@ -147,7 +174,7 @@
<arg>--sourcePath</arg><arg>${sourcePath}</arg> <arg>--sourcePath</arg><arg>${sourcePath}</arg>
<arg>--relationPath</arg><arg>${sourcePath}/relation</arg> <arg>--relationPath</arg><arg>${sourcePath}/relation</arg>
<!-- <arg>&#45;&#45;workingDir</arg><arg>${workingDir}/</arg>--> <!-- <arg>&#45;&#45;workingDir</arg><arg>${workingDir}/</arg>-->
<arg>--workingDir</arg><arg>${workingDir}/</arg> <arg>--workingDir</arg><arg>${filterPath}/</arg>
<!-- <arg>&#45;&#45;workingDir</arg><arg>/user/miriam.baglioni/oa/graph/dump/temp/working_dir/</arg>--> <!-- <arg>&#45;&#45;workingDir</arg><arg>/user/miriam.baglioni/oa/graph/dump/temp/working_dir/</arg>-->
<!-- <arg>&#45;&#45;sourcePath</arg><arg>/user/miriam.baglioni/oa/graph/dump/temp/working_dir/graph/</arg>--> <!-- <arg>&#45;&#45;sourcePath</arg><arg>/user/miriam.baglioni/oa/graph/dump/temp/working_dir/graph/</arg>-->
</spark> </spark>
@ -172,8 +199,8 @@
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir} --conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
</spark-opts> </spark-opts>
<arg>--sourcePath</arg><arg>${workingDir}/graph/</arg> <arg>--sourcePath</arg><arg>${filterPath}/graph/</arg>
<arg>--workingDir</arg><arg>${workingDir}/</arg> <arg>--workingDir</arg><arg>${filterPath}/</arg>
<arg>--outputPath</arg><arg>${outputPath}</arg> <arg>--outputPath</arg><arg>${outputPath}</arg>
<!-- <arg>&#45;&#45;workingDir</arg><arg>/user/miriam.baglioni/oa/graph/dump/temp/working_dir/</arg>--> <!-- <arg>&#45;&#45;workingDir</arg><arg>/user/miriam.baglioni/oa/graph/dump/temp/working_dir/</arg>-->
<!-- <arg>&#45;&#45;sourcePath</arg><arg>/user/miriam.baglioni/oa/graph/dump/temp/working_dir/graph/</arg>--> <!-- <arg>&#45;&#45;sourcePath</arg><arg>/user/miriam.baglioni/oa/graph/dump/temp/working_dir/graph/</arg>-->
@ -227,10 +254,10 @@
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir} --conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
</spark-opts> </spark-opts>
<arg>--sourcePath</arg><arg>${workingDir}/graph/</arg> <arg>--sourcePath</arg><arg>${filterPath}/graph/</arg>
<!-- <arg>&#45;&#45;sourcePath</arg><arg>/user/miriam.baglioni/oa/graph/dump/temp/working_dir/graph/</arg>--> <!-- <arg>&#45;&#45;sourcePath</arg><arg>/user/miriam.baglioni/oa/graph/dump/temp/working_dir/graph/</arg>-->
<arg>--outputPath</arg><arg>${outputPath}</arg> <arg>--outputPath</arg><arg>${outputPath}</arg>
<arg>--workingDir</arg><arg>${workingDir}/</arg> <arg>--workingDir</arg><arg>${filterPath}/</arg>
<!-- <arg>&#45;&#45;workingDir</arg><arg>/user/miriam.baglioni/oa/graph/dump/temp/working_dir/</arg>--> <!-- <arg>&#45;&#45;workingDir</arg><arg>/user/miriam.baglioni/oa/graph/dump/temp/working_dir/</arg>-->
</spark> </spark>
<ok to="dump_venue"/> <ok to="dump_venue"/>
@ -253,10 +280,10 @@
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir} --conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
</spark-opts> </spark-opts>
<arg>--sourcePath</arg><arg>${workingDir}/graph/</arg> <arg>--sourcePath</arg><arg>${filterPath}/graph/</arg>
<!-- <arg>&#45;&#45;sourcePath</arg><arg>/user/miriam.baglioni/oa/graph/dump/temp/working_dir/graph/</arg>--> <!-- <arg>&#45;&#45;sourcePath</arg><arg>/user/miriam.baglioni/oa/graph/dump/temp/working_dir/graph/</arg>-->
<arg>--outputPath</arg><arg>${outputPath}</arg> <arg>--outputPath</arg><arg>${outputPath}</arg>
<arg>--workingDir</arg><arg>${workingDir}/</arg> <arg>--workingDir</arg><arg>${filterPath}/</arg>
<!-- <arg>&#45;&#45;workingDir</arg><arg>/user/miriam.baglioni/oa/graph/dump/temp/working_dir/</arg>--> <!-- <arg>&#45;&#45;workingDir</arg><arg>/user/miriam.baglioni/oa/graph/dump/temp/working_dir/</arg>-->
</spark> </spark>
<ok to="dump_organization"/> <ok to="dump_organization"/>
@ -280,11 +307,11 @@
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir} --conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
</spark-opts> </spark-opts>
<!-- <arg>&#45;&#45;sourcePath</arg><arg>${workingDir}/graph/</arg>--> <arg>--sourcePath</arg><arg>${filterPath}/graph/</arg>
<arg>--sourcePath</arg><arg>/user/miriam.baglioni/oa/graph/dump/temp/working_dir/graph/</arg> <!-- <arg>&#45;&#45;sourcePath</arg><arg>/user/miriam.baglioni/oa/graph/dump/temp/working_dir/graph/</arg>-->
<arg>--outputPath</arg><arg>${outputPath}</arg> <arg>--outputPath</arg><arg>${outputPath}</arg>
<!-- <arg>&#45;&#45;workingDir</arg><arg>${workingDir}/</arg>--> <arg>--workingDir</arg><arg>${filterPath}/</arg>
<arg>--workingDir</arg><arg>/user/miriam.baglioni/oa/graph/dump/temp/working_dir/</arg> <!-- <arg>&#45;&#45;workingDir</arg><arg>/user/miriam.baglioni/oa/graph/dump/temp/working_dir/</arg>-->
</spark> </spark>
<ok to="dump_grant"/> <ok to="dump_grant"/>
<error to="Kill"/> <error to="Kill"/>
@ -307,11 +334,11 @@
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir} --conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
</spark-opts> </spark-opts>
<!-- <arg>&#45;&#45;sourcePath</arg><arg>${workingDir}/graph/</arg>--> <arg>--sourcePath</arg><arg>${filterPath}/graph/</arg>
<arg>--sourcePath</arg><arg>/user/miriam.baglioni/oa/graph/dump/temp/working_dir/graph/</arg> <!-- <arg>&#45;&#45;sourcePath</arg><arg>/user/miriam.baglioni/oa/graph/dump/temp/working_dir/graph/</arg>-->
<arg>--outputPath</arg><arg>${outputPath}</arg> <arg>--outputPath</arg><arg>${outputPath}</arg>
<!-- <arg>&#45;&#45;workingDir</arg><arg>${workingDir}/</arg>--> <arg>--workingDir</arg><arg>${filterPath}/</arg>
<arg>--workingDir</arg><arg>/user/miriam.baglioni/oa/graph/dump/temp/working_dir/</arg> <!-- <arg>&#45;&#45;workingDir</arg><arg>/user/miriam.baglioni/oa/graph/dump/temp/working_dir/</arg>-->
</spark> </spark>
<ok to="End"/> <ok to="End"/>
<error to="Kill"/> <error to="Kill"/>
@ -336,10 +363,10 @@
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir} --conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
--conf spark.sql.shuffle.partitions=15000 --conf spark.sql.shuffle.partitions=15000
</spark-opts> </spark-opts>
<arg>--sourcePath</arg><arg>${workingDir}/graph/</arg> <arg>--sourcePath</arg><arg>${filterPath}/graph/</arg>
<!-- <arg>&#45;&#45;sourcePath</arg><arg>/user/miriam.baglioni/oa/graph/dump/temp/working_dir/graph/</arg>--> <!-- <arg>&#45;&#45;sourcePath</arg><arg>/user/miriam.baglioni/oa/graph/dump/temp/working_dir/graph/</arg>-->
<arg>--outputPath</arg><arg>${outputPath}</arg> <arg>--outputPath</arg><arg>${outputPath}</arg>
<arg>--workingDir</arg><arg>${workingDir}/</arg> <arg>--workingDir</arg><arg>${filterPath}/</arg>
<!-- <arg>&#45;&#45;workingDir</arg><arg>/user/miriam.baglioni/oa/graph/dump/temp/working_dir/</arg>--> <!-- <arg>&#45;&#45;workingDir</arg><arg>/user/miriam.baglioni/oa/graph/dump/temp/working_dir/</arg>-->
</spark> </spark>
<ok to="dump_datasource"/> <ok to="dump_datasource"/>

View File

@ -23,6 +23,7 @@ import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.oa.graph.dump.filterentities.SelectEOSCEntities;
import eu.dnetlib.dhp.skgif.model.Datasource; import eu.dnetlib.dhp.skgif.model.Datasource;
import eu.dnetlib.dhp.skgif.model.Venue; import eu.dnetlib.dhp.skgif.model.Venue;
@ -99,4 +100,26 @@ public class DumpVenueTest implements Serializable {
.foreach((ForeachFunction<Venue>) d -> System.out.println(OBJECT_MAPPER.writeValueAsString(d))); .foreach((ForeachFunction<Venue>) d -> System.out.println(OBJECT_MAPPER.writeValueAsString(d)));
} }
@Test
public void testSelectEoscResults() throws Exception {
final String sourcePath = getClass()
.getResource("/eu/dnetlib/dhp/oa/graph/dump/skgif/graphForAPIExample/")
.getPath();
final String workingDir = getClass()
.getResource("/eu/dnetlib/dhp/oa/graph/dump/skgif/workingDirApiExample/")
.getPath();
SelectEOSCEntities
.main(
new String[] {
"-isSparkSessionManaged", Boolean.FALSE.toString(),
"-sourcePath", sourcePath,
"-filterPath", "/tmp/",
"-workingDir", workingDir
});
}
} }