This commit is contained in:
Miriam Baglioni 2020-05-07 18:22:59 +02:00
commit d6b9de9f46
37 changed files with 846 additions and 1507 deletions

View File

@ -22,21 +22,18 @@ import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.community.*;
import eu.dnetlib.dhp.schema.oaf.*;
public class SparkBulkTagJob2 {
public class SparkBulkTagJob {
private static final Logger log = LoggerFactory.getLogger(SparkBulkTagJob2.class);
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
private static final Logger log = LoggerFactory.getLogger(SparkBulkTagJob.class);
public static void main(String[] args) throws Exception {
String jsonConfiguration = IOUtils
.toString(
SparkBulkTagJob2.class
SparkBulkTagJob.class
.getResourceAsStream(
"/eu/dnetlib/dhp/bulktag/input_bulkTag_parameters.json"));
final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
parser.parseArgument(args);
Boolean isSparkSessionManaged = Optional
@ -58,7 +55,6 @@ public class SparkBulkTagJob2 {
log.info("outputPath: {}", outputPath);
ProtoMap protoMappingParams = new Gson().fromJson(parser.get("pathMap"), ProtoMap.class);
;
log.info("pathMap: {}", new Gson().toJson(protoMappingParams));
final String resultClassName = parser.get("resultTableName");
@ -89,45 +85,6 @@ public class SparkBulkTagJob2 {
spark -> {
execBulkTag(spark, inputPath, outputPath, protoMappingParams, resultClazz, cc);
});
// runWithSparkSession(conf, isSparkSessionManaged,
// spark -> {
// if(isTest(parser)) {
// removeOutputDir(spark, outputPath);
// }
// if(saveGraph)
// execPropagation(spark, possibleUpdates, inputPath, outputPath,
// resultClazz);
// });
//
//
//
//
//
//
// sc.textFile(inputPath + "/publication")
// .map(item -> new ObjectMapper().readValue(item, Publication.class))
// .map(p -> resultTagger.enrichContextCriteria(p, cc, protoMappingParams))
// .map(p -> new ObjectMapper().writeValueAsString(p))
// .saveAsTextFile(outputPath+"/publication");
// sc.textFile(inputPath + "/dataset")
// .map(item -> new ObjectMapper().readValue(item, Dataset.class))
// .map(p -> resultTagger.enrichContextCriteria(p, cc, protoMappingParams))
// .map(p -> new ObjectMapper().writeValueAsString(p))
// .saveAsTextFile(outputPath+"/dataset");
// sc.textFile(inputPath + "/software")
// .map(item -> new ObjectMapper().readValue(item, Software.class))
// .map(p -> resultTagger.enrichContextCriteria(p, cc, protoMappingParams))
// .map(p -> new ObjectMapper().writeValueAsString(p))
// .saveAsTextFile(outputPath+"/software");
// sc.textFile(inputPath + "/otherresearchproduct")
// .map(item -> new ObjectMapper().readValue(item,
// OtherResearchProduct.class))
// .map(p -> resultTagger.enrichContextCriteria(p, cc, protoMappingParams))
// .map(p -> new ObjectMapper().writeValueAsString(p))
// .saveAsTextFile(outputPath+"/otherresearchproduct");
//
}
private static <R extends Result> void execBulkTag(
@ -139,28 +96,23 @@ public class SparkBulkTagJob2 {
CommunityConfiguration communityConfiguration) {
ResultTagger resultTagger = new ResultTagger();
Dataset<R> result = readPathEntity(spark, inputPath, resultClazz);
result
.map(
value -> resultTagger
readPath(spark, inputPath, resultClazz)
.map((MapFunction<R, R>) value -> resultTagger
.enrichContextCriteria(
value, communityConfiguration, protoMappingParams),
Encoders.bean(resultClazz))
.toJSON()
value, communityConfiguration, protoMappingParams),
Encoders.bean(resultClazz))
.write()
.mode(SaveMode.Overwrite)
.option("compression", "gzip")
.text(outputPath);
.json(outputPath);
}
private static <R extends Result> org.apache.spark.sql.Dataset<R> readPathEntity(
SparkSession spark, String inputEntityPath, Class<R> resultClazz) {
private static <R> Dataset<R> readPath(
SparkSession spark, String inputEntityPath, Class<R> clazz) {
return spark
.read()
.textFile(inputEntityPath)
.map(
(MapFunction<String, R>) value -> OBJECT_MAPPER.readValue(value, resultClazz),
Encoders.bean(resultClazz));
.json(inputEntityPath)
.as(Encoders.bean(clazz));
}
}

View File

@ -106,7 +106,7 @@
<master>yarn-cluster</master>
<mode>cluster</mode>
<name>bulkTagging-publication</name>
<class>eu.dnetlib.dhp.bulktag.SparkBulkTagJob2</class>
<class>eu.dnetlib.dhp.bulktag.SparkBulkTagJob</class>
<jar>dhp-bulktag-${projectVersion}.jar</jar>
<spark-opts>
--num-executors=${sparkExecutorNumber}
@ -134,7 +134,7 @@
<master>yarn-cluster</master>
<mode>cluster</mode>
<name>bulkTagging-dataset</name>
<class>eu.dnetlib.dhp.bulktag.SparkBulkTagJob2</class>
<class>eu.dnetlib.dhp.bulktag.SparkBulkTagJob</class>
<jar>dhp-bulktag-${projectVersion}.jar</jar>
<spark-opts>
--num-executors=${sparkExecutorNumber}
@ -162,7 +162,7 @@
<master>yarn-cluster</master>
<mode>cluster</mode>
<name>bulkTagging-orp</name>
<class>eu.dnetlib.dhp.bulktag.SparkBulkTagJob2</class>
<class>eu.dnetlib.dhp.bulktag.SparkBulkTagJob</class>
<jar>dhp-bulktag-${projectVersion}.jar</jar>
<spark-opts>
--num-executors=${sparkExecutorNumber}
@ -190,7 +190,7 @@
<master>yarn-cluster</master>
<mode>cluster</mode>
<name>bulkTagging-software</name>
<class>eu.dnetlib.dhp.bulktag.SparkBulkTagJob2</class>
<class>eu.dnetlib.dhp.bulktag.SparkBulkTagJob</class>
<jar>dhp-bulktag-${projectVersion}.jar</jar>
<spark-opts>
--num-executors=${sparkExecutorNumber}

View File

@ -24,7 +24,7 @@ import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.bulktag.SparkBulkTagJob2;
import eu.dnetlib.dhp.bulktag.SparkBulkTagJob;
import eu.dnetlib.dhp.schema.oaf.Dataset;
import eu.dnetlib.dhp.schema.oaf.OtherResearchProduct;
import eu.dnetlib.dhp.schema.oaf.Publication;
@ -84,7 +84,7 @@ public class BulkTagJobTest {
@Test
public void noUpdatesTest() throws Exception {
SparkBulkTagJob2
SparkBulkTagJob
.main(
new String[] {
"-isTest",
@ -134,7 +134,7 @@ public class BulkTagJobTest {
@Test
public void bulktagBySubjectNoPreviousContextTest() throws Exception {
SparkBulkTagJob2
SparkBulkTagJob
.main(
new String[] {
"-isTest",
@ -240,7 +240,7 @@ public class BulkTagJobTest {
@Test
public void bulktagBySubjectPreviousContextNoProvenanceTest() throws Exception {
SparkBulkTagJob2
SparkBulkTagJob
.main(
new String[] {
"-isTest",
@ -332,7 +332,7 @@ public class BulkTagJobTest {
@Test
public void bulktagByDatasourceTest() throws Exception {
SparkBulkTagJob2
SparkBulkTagJob
.main(
new String[] {
"-isTest",
@ -415,7 +415,7 @@ public class BulkTagJobTest {
@Test
public void bulktagByZenodoCommunityTest() throws Exception {
SparkBulkTagJob2
SparkBulkTagJob
.main(
new String[] {
"-isTest",
@ -548,7 +548,7 @@ public class BulkTagJobTest {
@Test
public void bulktagBySubjectDatasourceTest() throws Exception {
SparkBulkTagJob2
SparkBulkTagJob
.main(
new String[] {
"-isTest",
@ -688,7 +688,7 @@ public class BulkTagJobTest {
@Test
public void bulktagBySubjectDatasourceZenodoCommunityTest() throws Exception {
SparkBulkTagJob2
SparkBulkTagJob
.main(
new String[] {
"-isTest",
@ -796,7 +796,7 @@ public class BulkTagJobTest {
@Test
public void bulktagDatasourcewithConstraintsTest() throws Exception {
SparkBulkTagJob2
SparkBulkTagJob
.main(
new String[] {
"-isTest",

View File

@ -1,12 +1,11 @@
package eu.dnetlib.dhp;
import java.io.IOException;
import java.util.*;
import java.util.List;
import java.util.Optional;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
@ -67,6 +66,12 @@ public class PropagationConstant {
public static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
private static final String cfHbforResultQuery = "select distinct r.id, inst.collectedfrom.key cf, inst.hostedby.key hb "
+
"from result r " +
"lateral view explode(instance) i as inst " +
"where r.datainfo.deletedbyinference=false";
public static Country getCountry(String classid, String classname) {
Country nc = new Country();
nc.setClassid(classid);
@ -130,13 +135,6 @@ public class PropagationConstant {
return ret;
}
public static void createOutputDirs(String outputPath, FileSystem fs) throws IOException {
if (fs.exists(new Path(outputPath))) {
fs.delete(new Path(outputPath), true);
}
fs.mkdirs(new Path(outputPath));
}
public static void removeOutputDir(SparkSession spark, String path) {
HdfsSupport.remove(path, spark.sparkContext().hadoopConfiguration());
}
@ -155,50 +153,17 @@ public class PropagationConstant {
.orElse(Boolean.FALSE);
}
public static void createCfHbforresult(SparkSession spark) {
String query;
// query = "SELECT id, inst.collectedfrom.key cf , inst.hostedby.key hb "
// + "FROM ( SELECT id, instance "
// + "FROM result "
// + " WHERE datainfo.deletedbyinference = false) ds "
// + "LATERAL VIEW EXPLODE(instance) i AS inst";
query = "select distinct r.id, inst.collectedfrom.key cf, inst.hostedby.key hb " +
"from result r " +
"lateral view explode(instance) i as inst " +
"where r.datainfo.deletedbyinference=false";
org.apache.spark.sql.Dataset<Row> cfhb = spark.sql(query);
public static void createCfHbforResult(SparkSession spark) {
org.apache.spark.sql.Dataset<Row> cfhb = spark.sql(cfHbforResultQuery);
cfhb.createOrReplaceTempView("cfhb");
}
public static <R extends Result> org.apache.spark.sql.Dataset<R> readPathEntity(
SparkSession spark, String inputEntityPath, Class<R> resultClazz) {
return spark
.read()
.textFile(inputEntityPath)
.map(
(MapFunction<String, R>) value -> OBJECT_MAPPER.readValue(value, resultClazz),
Encoders.bean(resultClazz));
}
public static org.apache.spark.sql.Dataset<Relation> readRelations(
SparkSession spark, String inputPath) {
public static <R> Dataset<R> readPath(
SparkSession spark, String inputPath, Class<R> clazz) {
return spark
.read()
.textFile(inputPath)
.map(
(MapFunction<String, Relation>) value -> OBJECT_MAPPER.readValue(value, Relation.class),
Encoders.bean(Relation.class));
.map((MapFunction<String, R>) value -> OBJECT_MAPPER.readValue(value, clazz), Encoders.bean(clazz));
}
public static org.apache.spark.sql.Dataset<ResultCommunityList> readResultCommunityList(
SparkSession spark, String possibleUpdatesPath) {
return spark
.read()
.textFile(possibleUpdatesPath)
.map(
value -> OBJECT_MAPPER.readValue(value, ResultCommunityList.class),
Encoders.bean(ResultCommunityList.class));
}
}

View File

@ -1,20 +0,0 @@
package eu.dnetlib.dhp;
import java.util.List;
import eu.dnetlib.dhp.utils.ISLookupClientFactory;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
public class QueryInformationSystem {
private static final String XQUERY = "for $x in collection('/db/DRIVER/ContextDSResources/ContextDSResourceType')"
+ " where $x//CONFIGURATION/context[./@type='community' or ./@type='ri']"
+ " and $x//CONFIGURATION/context/param[./@name='status']/text() != 'hidden'"
+ " return $x//CONFIGURATION/context/@id/string()";
public static List<String> getCommunityList(final String isLookupUrl) throws ISLookUpException {
ISLookUpService isLookUp = ISLookupClientFactory.getLookUpService(isLookupUrl);
return isLookUp.quickSearchProfile(XQUERY);
}
}

View File

@ -13,6 +13,7 @@ import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -30,7 +31,6 @@ import eu.dnetlib.dhp.schema.oaf.*;
public class PrepareDatasourceCountryAssociation {
private static final Logger log = LoggerFactory.getLogger(PrepareDatasourceCountryAssociation.class);
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
public static void main(String[] args) throws Exception {
@ -80,31 +80,10 @@ public class PrepareDatasourceCountryAssociation {
for (String i : whitelist) {
whitelisted += " OR id = '" + i + "'";
}
final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
Dataset<Datasource> datasource = spark
.createDataset(
sc
.textFile(inputPath + "/datasource")
.map(item -> OBJECT_MAPPER.readValue(item, Datasource.class))
.rdd(),
Encoders.bean(Datasource.class));
Dataset<Relation> relation = spark
.createDataset(
sc
.textFile(inputPath + "/relation")
.map(item -> OBJECT_MAPPER.readValue(item, Relation.class))
.rdd(),
Encoders.bean(Relation.class));
Dataset<Organization> organization = spark
.createDataset(
sc
.textFile(inputPath + "/organization")
.map(item -> OBJECT_MAPPER.readValue(item, Organization.class))
.rdd(),
Encoders.bean(Organization.class));
Dataset<Datasource> datasource = readPath(spark, inputPath + "/datasource", Datasource.class);
Dataset<Relation> relation = readPath(spark, inputPath + "/relation", Relation.class);
Dataset<Organization> organization = readPath(spark, inputPath + "/organization", Organization.class);
datasource.createOrReplaceTempView("datasource");
relation.createOrReplaceTempView("relation");
@ -128,14 +107,15 @@ public class PrepareDatasourceCountryAssociation {
+ "JOIN (SELECT id, country "
+ " FROM organization "
+ " WHERE datainfo.deletedbyinference = false "
+ " AND length(country.classid)>0) o "
+ " AND length(country.classid) > 0) o "
+ "ON o.id = rel.target";
spark
.sql(query)
.as(Encoders.bean(DatasourceCountry.class))
.toJavaRDD()
.map(c -> OBJECT_MAPPER.writeValueAsString(c))
.saveAsTextFile(outputPath, GzipCodec.class);
.write()
.option("compression", "gzip")
.mode(SaveMode.Overwrite)
.json(outputPath);
}
}

View File

@ -4,31 +4,31 @@ package eu.dnetlib.dhp.countrypropagation;
import static eu.dnetlib.dhp.PropagationConstant.*;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkHiveSession;
import java.util.Arrays;
import java.util.stream.Collectors;
import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.*;
import org.apache.spark.sql.Dataset;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.schema.oaf.*;
public class PrepareResultCountrySet {
private static final Logger log = LoggerFactory.getLogger(PrepareResultCountrySet.class);
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
private static final String RESULT_COUNTRYSET_QUERY = "SELECT id resultId, collect_set(country) countrySet "
+ "FROM ( SELECT id, country "
+ "FROM datasource_country JOIN cfhb ON cf = dataSourceId "
+ "UNION ALL "
+ "SELECT id, country FROM datasource_country "
+ "JOIN cfhb ON hb = dataSourceId ) tmp "
+ "GROUP BY id";
public static void main(String[] args) throws Exception {
String jsonConfiguration = IOUtils
.toString(
SparkCountryPropagationJob2.class
PrepareResultCountrySet.class
.getResourceAsStream(
"/eu/dnetlib/dhp/countrypropagation/input_prepareresultcountry_parameters.json"));
@ -42,6 +42,9 @@ public class PrepareResultCountrySet {
String inputPath = parser.get("sourcePath");
log.info("inputPath: {}", inputPath);
String outputPath = parser.get("outputPath");
log.info("outputPath: {}", outputPath);
final String datasourcecountrypath = parser.get("preparedInfoPath");
log.info("preparedInfoPath: {}", datasourcecountrypath);
@ -60,75 +63,36 @@ public class PrepareResultCountrySet {
getPotentialResultToUpdate(
spark,
inputPath,
outputPath,
datasourcecountrypath,
resultClazz);
});
}
private static <R extends Result> void getPotentialResultToUpdate(
SparkSession spark,
String inputPath,
String outputPath,
String datasourcecountrypath,
Class<R> resultClazz) {
Dataset<R> result = readPathEntity(spark, inputPath, resultClazz);
Dataset<R> result = readPath(spark, inputPath, resultClazz);
result.createOrReplaceTempView("result");
// log.info("number of results: {}", result.count());
createCfHbforresult(spark);
Dataset<DatasourceCountry> datasourcecountryassoc = readAssocDatasourceCountry(spark, datasourcecountrypath);
countryPropagationAssoc(spark, datasourcecountryassoc)
.map((MapFunction<ResultCountrySet, R>) value -> {
R ret = resultClazz.newInstance();
ret.setId(value.getResultId());
ret
.setCountry(
value
.getCountrySet()
.stream()
.map(c -> getCountry(c.getClassid(), c.getClassname()))
.collect(Collectors.toList()));
return ret;
}, Encoders.bean(resultClazz))
createCfHbforResult(spark);
Dataset<DatasourceCountry> datasource_country = readPath(spark, datasourcecountrypath, DatasourceCountry.class);
datasource_country.createOrReplaceTempView("datasource_country");
// log.info("datasource_country number : {}", datasource_country.count());
spark
.sql(RESULT_COUNTRYSET_QUERY)
.as(Encoders.bean(ResultCountrySet.class))
.write()
.option("compression", "gzip")
.mode(SaveMode.Append)
.json(inputPath);
.json(outputPath);
}
private static Dataset<ResultCountrySet> countryPropagationAssoc(
SparkSession spark,
Dataset<DatasourceCountry> datasource_country) {
// Dataset<DatasourceCountry> datasource_country = broadcast_datasourcecountryassoc.value();
datasource_country.createOrReplaceTempView("datasource_country");
log.info("datasource_country number : {}", datasource_country.count());
String query = "SELECT id resultId, collect_set(country) countrySet "
+ "FROM ( SELECT id, country "
+ "FROM datasource_country "
+ "JOIN cfhb "
+ " ON cf = dataSourceId "
+ "UNION ALL "
+ "SELECT id , country "
+ "FROM datasource_country "
+ "JOIN cfhb "
+ " ON hb = dataSourceId ) tmp "
+ "GROUP BY id";
Dataset<ResultCountrySet> potentialUpdates = spark
.sql(query)
.as(Encoders.bean(ResultCountrySet.class));
// log.info("potential update number : {}", potentialUpdates.count());
return potentialUpdates;
}
private static Dataset<DatasourceCountry> readAssocDatasourceCountry(
SparkSession spark, String relationPath) {
return spark
.read()
.textFile(relationPath)
.map(
value -> OBJECT_MAPPER.readValue(value, DatasourceCountry.class),
Encoders.bean(DatasourceCountry.class));
}
}

View File

@ -5,17 +5,11 @@ import static eu.dnetlib.dhp.PropagationConstant.*;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import java.util.*;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.api.java.function.MapGroupsFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SaveMode;
@ -26,15 +20,13 @@ import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.schema.common.ModelSupport;
import eu.dnetlib.dhp.schema.oaf.Country;
import eu.dnetlib.dhp.schema.oaf.KeyValue;
import eu.dnetlib.dhp.schema.oaf.Result;
import scala.Tuple2;
public class SparkCountryPropagationJob3 {
public class SparkCountryPropagationJob {
private static final Logger log = LoggerFactory.getLogger(SparkCountryPropagationJob3.class);
private static final Logger log = LoggerFactory.getLogger(SparkCountryPropagationJob.class);
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
@ -42,7 +34,7 @@ public class SparkCountryPropagationJob3 {
String jsonConfiguration = IOUtils
.toString(
SparkCountryPropagationJob3.class
SparkCountryPropagationJob.class
.getResourceAsStream(
"/eu/dnetlib/dhp/countrypropagation/input_countrypropagation_parameters.json"));
@ -53,8 +45,11 @@ public class SparkCountryPropagationJob3 {
Boolean isSparkSessionManaged = isSparkSessionManaged(parser);
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
String inputPath = parser.get("sourcePath");
log.info("inputPath: {}", inputPath);
String sourcePath = parser.get("sourcePath");
log.info("sourcePath: {}", sourcePath);
String preparedInfoPath = parser.get("preparedInfoPath");
log.info("preparedInfoPath: {}", preparedInfoPath);
final String outputPath = parser.get("outputPath");
log.info("outputPath: {}", outputPath);
@ -76,7 +71,8 @@ public class SparkCountryPropagationJob3 {
isSparkSessionManaged,
spark -> execPropagation(
spark,
inputPath,
sourcePath,
preparedInfoPath,
outputPath,
resultClazz,
saveGraph));
@ -84,21 +80,26 @@ public class SparkCountryPropagationJob3 {
private static <R extends Result> void execPropagation(
SparkSession spark,
String inputPath,
String sourcePath,
String preparedInfoPath,
String outputPath,
Class<R> resultClazz,
boolean saveGraph) {
if (saveGraph) {
// updateResultTable(spark, potentialUpdates, inputPath, resultClazz, outputPath);
log.info("Reading Graph table from: {}", inputPath);
log.info("Reading Graph table from: {}", sourcePath);
Dataset<R> res = readPath(spark, sourcePath, resultClazz);
spark
log.info("Reading prepared info: {}", preparedInfoPath);
Dataset<ResultCountrySet> prepared = spark
.read()
.json(inputPath)
.as(Encoders.bean(resultClazz))
.groupByKey((MapFunction<R, String>) r -> r.getId(), Encoders.STRING())
.mapGroups(getCountryMergeFn(resultClazz), Encoders.bean(resultClazz))
.json(preparedInfoPath)
.as(Encoders.bean(ResultCountrySet.class));
res
.joinWith(prepared, res.col("id").equalTo(prepared.col("resultId")), "left_outer")
.map(getCountryMergeFn(), Encoders.bean(resultClazz))
.write()
.option("compression", "gzip")
.mode(SaveMode.Overwrite)
@ -106,37 +107,26 @@ public class SparkCountryPropagationJob3 {
}
}
private static <R extends Result> MapGroupsFunction<String, R, R> getCountryMergeFn(Class<R> resultClazz) {
return (MapGroupsFunction<String, R, R>) (key, values) -> {
R res = resultClazz.newInstance();
List<Country> countries = new ArrayList<>();
values.forEachRemaining(r -> {
res.mergeFrom(r);
countries.addAll(r.getCountry());
private static <R extends Result> MapFunction<Tuple2<R, ResultCountrySet>, R> getCountryMergeFn() {
return (MapFunction<Tuple2<R, ResultCountrySet>, R>) t -> {
Optional.ofNullable(t._2()).ifPresent(r -> {
t._1().getCountry().addAll(merge(t._1().getCountry(), r.getCountrySet()));
});
res
.setCountry(
countries
.stream()
.collect(
Collectors
.toMap(
Country::getClassid,
Function.identity(),
(c1, c2) -> {
if (Optional
.ofNullable(
c1.getDataInfo().getInferenceprovenance())
.isPresent()) {
return c2;
}
return c1;
}))
.values()
.stream()
.collect(Collectors.toList()));
return res;
return t._1();
};
}
private static List<Country> merge(List<Country> c1, List<CountrySbs> c2) {
HashSet<String> countries = c1
.stream()
.map(c -> c.getClassid())
.collect(Collectors.toCollection(HashSet::new));
return c2
.stream()
.filter(c -> !countries.contains(c.getClassid()))
.map(c -> getCountry(c.getClassid(), c.getClassname()))
.collect(Collectors.toList());
}
}

View File

@ -1,289 +0,0 @@
package eu.dnetlib.dhp.countrypropagation;
import static eu.dnetlib.dhp.PropagationConstant.*;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkHiveSession;
import static jdk.nashorn.internal.objects.NativeDebug.map;
import java.util.*;
import java.util.stream.Collectors;
import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.broadcast.Broadcast;
import org.apache.spark.sql.*;
import org.apache.spark.sql.Dataset;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.schema.oaf.*;
import scala.Tuple2;
public class SparkCountryPropagationJob2 {
private static final Logger log = LoggerFactory.getLogger(SparkCountryPropagationJob2.class);
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
public static void main(String[] args) throws Exception {
String jsonConfiguration = IOUtils
.toString(
SparkCountryPropagationJob2.class
.getResourceAsStream(
"/eu/dnetlib/dhp/countrypropagation/input_countrypropagation_parameters.json"));
final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
parser.parseArgument(args);
Boolean isSparkSessionManaged = isSparkSessionManaged(parser);
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
String inputPath = parser.get("sourcePath");
log.info("inputPath: {}", inputPath);
final String outputPath = parser.get("outputPath");
log.info("outputPath: {}", outputPath);
final String datasourcecountrypath = parser.get("preparedInfoPath");
log.info("preparedInfoPath: {}", datasourcecountrypath);
final String resultClassName = parser.get("resultTableName");
log.info("resultTableName: {}", resultClassName);
final String resultType = resultClassName.substring(resultClassName.lastIndexOf(".") + 1).toLowerCase();
log.info("resultType: {}", resultType);
final String possibleUpdatesPath = datasourcecountrypath
.substring(0, datasourcecountrypath.lastIndexOf("/") + 1)
+ "possibleUpdates/" + resultType;
log.info("possibleUpdatesPath: {}", possibleUpdatesPath);
final Boolean saveGraph = Optional
.ofNullable(parser.get("saveGraph"))
.map(Boolean::valueOf)
.orElse(Boolean.TRUE);
log.info("saveGraph: {}", saveGraph);
Class<? extends Result> resultClazz = (Class<? extends Result>) Class.forName(resultClassName);
SparkConf conf = new SparkConf();
conf.set("hive.metastore.uris", parser.get("hive_metastore_uris"));
runWithSparkHiveSession(
conf,
isSparkSessionManaged,
spark -> {
removeOutputDir(spark, possibleUpdatesPath);
execPropagation(
spark,
datasourcecountrypath,
inputPath,
outputPath,
resultClazz,
saveGraph, possibleUpdatesPath);
});
}
private static <R extends Result> void execPropagation(
SparkSession spark,
String datasourcecountrypath,
String inputPath,
String outputPath,
Class<R> resultClazz,
boolean saveGraph, String possilbeUpdatesPath) {
// final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
// Load file with preprocessed association datasource - country
Dataset<DatasourceCountry> datasourcecountryassoc = readAssocDatasourceCountry(spark, datasourcecountrypath);
// broadcasting the result of the preparation step
// Broadcast<Dataset<DatasourceCountry>> broadcast_datasourcecountryassoc =
// sc.broadcast(datasourcecountryassoc);
Dataset<ResultCountrySet> potentialUpdates = getPotentialResultToUpdate(
spark, inputPath, resultClazz, datasourcecountryassoc)
.as(Encoders.bean(ResultCountrySet.class));
potentialUpdates.write().option("compression", "gzip").mode(SaveMode.Overwrite).json(possilbeUpdatesPath);
if (saveGraph) {
// updateResultTable(spark, potentialUpdates, inputPath, resultClazz, outputPath);
potentialUpdates = spark
.read()
.textFile(possilbeUpdatesPath)
.map(
(MapFunction<String, ResultCountrySet>) value -> OBJECT_MAPPER
.readValue(value, ResultCountrySet.class),
Encoders.bean(ResultCountrySet.class));
updateResultTable(spark, potentialUpdates, inputPath, resultClazz, outputPath);
}
}
private static <R extends Result> void updateResultTable(
SparkSession spark,
Dataset<ResultCountrySet> potentialUpdates,
String inputPath,
Class<R> resultClazz,
String outputPath) {
log.info("Reading Graph table from: {}", inputPath);
Dataset<R> result = readPathEntity(spark, inputPath, resultClazz);
Dataset<R> new_table = result
.joinWith(
potentialUpdates, result
.col("id")
.equalTo(potentialUpdates.col("resultId")),
"left_outer")
.map((MapFunction<Tuple2<R, ResultCountrySet>, R>) value -> {
R r = value._1();
Optional<ResultCountrySet> potentialNewCountries = Optional.ofNullable(value._2());
if (potentialNewCountries.isPresent()) {
HashSet<String> countries = r
.getCountry()
.stream()
.map(c -> c.getClassid())
.collect(Collectors.toCollection(HashSet::new));
r
.getCountry()
.addAll(
potentialNewCountries
.get()
.getCountrySet()
.stream()
.filter(c -> !countries.contains(c.getClassid()))
.map(c -> getCountry(c.getClassid(), c.getClassname()))
.collect(Collectors.toList()));
// Result res = new Result();
// res.setId(r.getId());
// List<Country> countryList = new ArrayList<>();
// for (CountrySbs country : potentialNewCountries
// .get()
// .getCountrySet()) {
// if (!countries.contains(country.getClassid())) {
// countryList
// .add(
// getCountry(
// country.getClassid(),
// country.getClassname()));
// }
// }
// res.setCountry(countryList);
// r.mergeFrom(res);
}
return r;
}, Encoders.bean(resultClazz));
// Dataset<Tuple2<String, R>> result_pair = result
// .map(
// r -> new Tuple2<>(r.getId(), r),
// Encoders.tuple(Encoders.STRING(), Encoders.bean(resultClazz)));
//
// Dataset<R> new_table = result_pair
// .joinWith(
// potentialUpdates,
// result_pair.col("_1").equalTo(potentialUpdates.col("resultId")),
// "left_outer")
// .map(
// (MapFunction<Tuple2<Tuple2<String, R>, ResultCountrySet>, R>) value -> {
// R r = value._1()._2();
// Optional<ResultCountrySet> potentialNewCountries = Optional.ofNullable(value._2());
// if (potentialNewCountries.isPresent()) {
// HashSet<String> countries = new HashSet<>();
// for (Qualifier country : r.getCountry()) {
// countries.add(country.getClassid());
// }
// Result res = new Result();
// res.setId(r.getId());
// List<Country> countryList = new ArrayList<>();
// for (CountrySbs country : potentialNewCountries
// .get()
// .getCountrySet()) {
// if (!countries.contains(country.getClassid())) {
// countryList
// .add(
// getCountry(
// country.getClassid(),
// country.getClassname()));
// }
// }
// res.setCountry(countryList);
// r.mergeFrom(res);
// }
// return r;
// },
// Encoders.bean(resultClazz));
log.info("Saving graph table to path: {}", outputPath);
log.info("number of saved recordsa: {}", new_table.count());
new_table.write().option("compression", "gzip").mode(SaveMode.Overwrite).json(outputPath);
}
private static <R extends Result> Dataset<ResultCountrySet> getPotentialResultToUpdate(
SparkSession spark,
String inputPath,
Class<R> resultClazz,
Dataset<DatasourceCountry> datasourcecountryassoc) {
Dataset<R> result = readPathEntity(spark, inputPath, resultClazz);
result.createOrReplaceTempView("result");
// log.info("number of results: {}", result.count());
createCfHbforresult(spark);
return countryPropagationAssoc(spark, datasourcecountryassoc);
}
private static Dataset<ResultCountrySet> countryPropagationAssoc(
SparkSession spark,
Dataset<DatasourceCountry> datasource_country) {
// Dataset<DatasourceCountry> datasource_country = broadcast_datasourcecountryassoc.value();
datasource_country.createOrReplaceTempView("datasource_country");
log.info("datasource_country number : {}", datasource_country.count());
String query = "SELECT id resultId, collect_set(country) countrySet "
+ "FROM ( SELECT id, country "
+ "FROM datasource_country "
+ "JOIN cfhb "
+ " ON cf = dataSourceId "
+ "UNION ALL "
+ "SELECT id , country "
+ "FROM datasource_country "
+ "JOIN cfhb "
+ " ON hb = dataSourceId ) tmp "
+ "GROUP BY id";
Dataset<ResultCountrySet> potentialUpdates = spark
.sql(query)
.as(Encoders.bean(ResultCountrySet.class))
.map((MapFunction<ResultCountrySet, ResultCountrySet>) r -> {
final ArrayList<CountrySbs> c = r
.getCountrySet()
.stream()
.limit(100)
.collect(Collectors.toCollection(ArrayList::new));
r.setCountrySet(c);
return r;
}, Encoders.bean(ResultCountrySet.class));
// log.info("potential update number : {}", potentialUpdates.count());
return potentialUpdates;
}
private static Dataset<DatasourceCountry> readAssocDatasourceCountry(
SparkSession spark, String relationPath) {
return spark
.read()
.textFile(relationPath)
.map(
(MapFunction<String, DatasourceCountry>) value -> OBJECT_MAPPER
.readValue(value, DatasourceCountry.class),
Encoders.bean(DatasourceCountry.class));
}
}

View File

@ -2,10 +2,11 @@
package eu.dnetlib.dhp.orcidtoresultfromsemrel;
public class AutoritativeAuthor {
String name;
String surname;
String fullname;
String orcid;
private String name;
private String surname;
private String fullname;
private String orcid;
public String getName() {
return name;
@ -38,4 +39,5 @@ public class AutoritativeAuthor {
public void setOrcid(String orcid) {
this.orcid = orcid;
}
}

View File

@ -13,6 +13,7 @@ import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -27,17 +28,14 @@ import eu.dnetlib.dhp.schema.oaf.Result;
public class PrepareResultOrcidAssociationStep1 {
private static final Logger log = LoggerFactory.getLogger(PrepareResultOrcidAssociationStep1.class);
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
public static void main(String[] args) throws Exception {
String jsonConfiguration = IOUtils
String jsonConf = IOUtils
.toString(
SparkOrcidToResultFromSemRelJob3.class
PrepareResultOrcidAssociationStep1.class
.getResourceAsStream(
"/eu/dnetlib/dhp/orcidtoresultfromsemrel/input_prepareorcidtoresult_parameters.json"));
final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConf);
parser.parseArgument(args);
Boolean isSparkSessionManaged = isSparkSessionManaged(parser);
@ -63,6 +61,15 @@ public class PrepareResultOrcidAssociationStep1 {
SparkConf conf = new SparkConf();
conf.set("hive.metastore.uris", parser.get("hive_metastore_uris"));
String inputRelationPath = inputPath + "/relation";
log.info("inputRelationPath: {}", inputRelationPath);
String inputResultPath = inputPath + "/" + resultType;
log.info("inputResultPath: {}", inputResultPath);
String outputResultPath = outputPath + "/" + resultType;
log.info("outputResultPath: {}", outputResultPath);
runWithSparkHiveSession(
conf,
isSparkSessionManaged,
@ -71,39 +78,25 @@ public class PrepareResultOrcidAssociationStep1 {
removeOutputDir(spark, outputPath);
}
prepareInfo(
spark, inputPath, outputPath, resultClazz, resultType, allowedsemrel);
spark, inputRelationPath, inputResultPath, outputResultPath, resultClazz, allowedsemrel);
});
}
private static <R extends Result> void prepareInfo(
SparkSession spark,
String inputPath,
String outputPath,
String inputRelationPath,
String inputResultPath,
String outputResultPath,
Class<R> resultClazz,
String resultType,
List<String> allowedsemrel) {
// read the relation table and the table related to the result it is using
final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
org.apache.spark.sql.Dataset<Relation> relation = spark
.createDataset(
sc
.textFile(inputPath + "/relation")
.map(item -> OBJECT_MAPPER.readValue(item, Relation.class))
.rdd(),
Encoders.bean(Relation.class));
Dataset<Relation> relation = readPath(spark, inputRelationPath, Relation.class);
relation.createOrReplaceTempView("relation");
log.info("Reading Graph table from: {}", inputPath + "/" + resultType);
Dataset<R> result = readPathEntity(spark, inputPath + "/" + resultType, resultClazz);
log.info("Reading Graph table from: {}", inputResultPath);
Dataset<R> result = readPath(spark, inputResultPath, resultClazz);
result.createOrReplaceTempView("result");
getPossibleResultOrcidAssociation(spark, allowedsemrel, outputPath + "/" + resultType);
}
private static void getPossibleResultOrcidAssociation(
SparkSession spark, List<String> allowedsemrel, String outputPath) {
String query = " select target resultId, author authorList"
+ " from (select id, collect_set(named_struct('name', name, 'surname', surname, 'fullname', fullname, 'orcid', orcid)) author "
+ " from ( "
@ -120,18 +113,13 @@ public class PrepareResultOrcidAssociationStep1 {
+ getConstraintList(" relclass = '", allowedsemrel)
+ ") rel_rel "
+ " on source = id";
spark
.sql(query)
.as(Encoders.bean(ResultOrcidList.class))
.toJavaRDD()
.map(r -> OBJECT_MAPPER.writeValueAsString(r))
.saveAsTextFile(outputPath, GzipCodec.class);
// .toJSON()
// .write()
// .mode(SaveMode.Append)
// .option("compression","gzip")
// .text(outputPath)
// ;
.write()
.option("compression", "gzip")
.mode(SaveMode.Overwrite)
.json(outputResultPath);
}
}

View File

@ -59,10 +59,10 @@ public class PrepareResultOrcidAssociationStep2 {
private static void mergeInfo(SparkSession spark, String inputPath, String outputPath) {
Dataset<ResultOrcidList> resultOrcidAssoc = readAssocResultOrcidList(spark, inputPath + "/publication")
.union(readAssocResultOrcidList(spark, inputPath + "/dataset"))
.union(readAssocResultOrcidList(spark, inputPath + "/otherresearchproduct"))
.union(readAssocResultOrcidList(spark, inputPath + "/software"));
Dataset<ResultOrcidList> resultOrcidAssoc = readPath(spark, inputPath + "/publication", ResultOrcidList.class)
.union(readPath(spark, inputPath + "/dataset", ResultOrcidList.class))
.union(readPath(spark, inputPath + "/otherresearchproduct", ResultOrcidList.class))
.union(readPath(spark, inputPath + "/software", ResultOrcidList.class));
resultOrcidAssoc
.toJavaRDD()
@ -77,7 +77,6 @@ public class PrepareResultOrcidAssociationStep2 {
}
Set<String> orcid_set = new HashSet<>();
a.getAuthorList().stream().forEach(aa -> orcid_set.add(aa.getOrcid()));
b
.getAuthorList()
.stream()
@ -95,13 +94,4 @@ public class PrepareResultOrcidAssociationStep2 {
.saveAsTextFile(outputPath, GzipCodec.class);
}
private static Dataset<ResultOrcidList> readAssocResultOrcidList(
SparkSession spark, String relationPath) {
return spark
.read()
.textFile(relationPath)
.map(
value -> OBJECT_MAPPER.readValue(value, ResultOrcidList.class),
Encoders.bean(ResultOrcidList.class));
}
}

View File

@ -6,11 +6,11 @@ import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkHiveSession;
import java.util.List;
import java.util.Optional;
import java.util.function.Consumer;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SaveMode;
@ -25,21 +25,19 @@ import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.schema.oaf.Author;
import eu.dnetlib.dhp.schema.oaf.Result;
import eu.dnetlib.dhp.schema.oaf.StructuredProperty;
import scala.Tuple2;
public class SparkOrcidToResultFromSemRelJob3 {
private static final Logger log = LoggerFactory.getLogger(SparkOrcidToResultFromSemRelJob3.class);
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
public class SparkOrcidToResultFromSemRelJob {
private static final Logger log = LoggerFactory.getLogger(SparkOrcidToResultFromSemRelJob.class);
public static void main(String[] args) throws Exception {
String jsonConfiguration = IOUtils
.toString(
SparkOrcidToResultFromSemRelJob3.class
SparkOrcidToResultFromSemRelJob.class
.getResourceAsStream(
"/eu/dnetlib/dhp/orcidtoresultfromsemrel/input_orcidtoresult_parameters.json"));
final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
parser.parseArgument(args);
Boolean isSparkSessionManaged = isSparkSessionManaged(parser);
@ -88,9 +86,9 @@ public class SparkOrcidToResultFromSemRelJob3 {
Class<R> resultClazz) {
// read possible updates (resultId and list of possible orcid to add
Dataset<ResultOrcidList> possible_updates = readAssocResultOrcidList(spark, possibleUpdatesPath);
Dataset<ResultOrcidList> possible_updates = readPath(spark, possibleUpdatesPath, ResultOrcidList.class);
// read the result we have been considering
Dataset<R> result = readPathEntity(spark, inputPath, resultClazz);
Dataset<R> result = readPath(spark, inputPath, resultClazz);
// make join result left_outer with possible updates
result
@ -98,38 +96,29 @@ public class SparkOrcidToResultFromSemRelJob3 {
possible_updates,
result.col("id").equalTo(possible_updates.col("resultId")),
"left_outer")
.map(
value -> {
R ret = value._1();
Optional<ResultOrcidList> rol = Optional.ofNullable(value._2());
if (rol.isPresent()) {
List<Author> toenrich_author = ret.getAuthor();
List<AutoritativeAuthor> autoritativeAuthors = rol.get().getAuthorList();
for (Author author : toenrich_author) {
if (!containsAllowedPid(author)) {
enrichAuthor(author, autoritativeAuthors);
}
}
}
return ret;
},
Encoders.bean(resultClazz))
.toJSON()
.map(authorEnrichFn(), Encoders.bean(resultClazz))
.write()
.mode(SaveMode.Overwrite)
.option("compression", "gzip")
.text(outputPath);
.json(outputPath);
}
private static Dataset<ResultOrcidList> readAssocResultOrcidList(
SparkSession spark, String relationPath) {
return spark
.read()
.textFile(relationPath)
.map(
value -> OBJECT_MAPPER.readValue(value, ResultOrcidList.class),
Encoders.bean(ResultOrcidList.class));
private static <R extends Result> MapFunction<Tuple2<R, ResultOrcidList>, R> authorEnrichFn() {
return (MapFunction<Tuple2<R, ResultOrcidList>, R>) value -> {
R ret = value._1();
Optional<ResultOrcidList> rol = Optional.ofNullable(value._2());
if (rol.isPresent()) {
List<Author> toenrich_author = ret.getAuthor();
List<AutoritativeAuthor> autoritativeAuthors = rol.get().getAuthorList();
for (Author author : toenrich_author) {
if (!containsAllowedPid(author)) {
enrichAuthor(author, autoritativeAuthors);
}
}
}
return ret;
};
}
private static void enrichAuthor(Author a, List<AutoritativeAuthor> au) {

View File

@ -25,7 +25,6 @@ import eu.dnetlib.dhp.schema.oaf.Relation;
public class PrepareProjectResultsAssociation {
private static final Logger log = LoggerFactory.getLogger(PrepareDatasourceCountryAssociation.class);
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
public static void main(String[] args) throws Exception {
@ -61,8 +60,6 @@ public class PrepareProjectResultsAssociation {
conf,
isSparkSessionManaged,
spark -> {
// removeOutputDir(spark, potentialUpdatePath);
// removeOutputDir(spark, alreadyLinkedPath);
prepareResultProjProjectResults(
spark,
inputPath,
@ -78,28 +75,21 @@ public class PrepareProjectResultsAssociation {
String potentialUpdatePath,
String alreadyLinkedPath,
List<String> allowedsemrel) {
JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
Dataset<Relation> relation = spark
.createDataset(
sc
.textFile(inputPath)
.map(item -> OBJECT_MAPPER.readValue(item, Relation.class))
.rdd(),
Encoders.bean(Relation.class));
Dataset<Relation> relation = readPath(spark, inputPath, Relation.class);
relation.createOrReplaceTempView("relation");
String query = "SELECT source, target "
String resproj_relation_query = "SELECT source, target "
+ " FROM relation "
+ " WHERE datainfo.deletedbyinference = false "
+ " AND relClass = '"
+ RELATION_RESULT_PROJECT_REL_CLASS
+ "'";
Dataset<Row> resproj_relation = spark.sql(query);
Dataset<Row> resproj_relation = spark.sql(resproj_relation_query);
resproj_relation.createOrReplaceTempView("resproj_relation");
query = "SELECT resultId, collect_set(projectId) projectSet "
String potential_update_query = "SELECT resultId, collect_set(projectId) projectSet "
+ "FROM ( "
+ "SELECT r1.target resultId, r2.target projectId "
+ " FROM (SELECT source, target "
@ -111,46 +101,26 @@ public class PrepareProjectResultsAssociation {
+ " ON r1.source = r2.source "
+ " ) tmp "
+ "GROUP BY resultId ";
// query =
// "SELECT projectId, collect_set(resId) resultSet "
// + "FROM ("
// + " SELECT r1.target resId, r2.target projectId "
// + " FROM (SELECT source, target "
// + " FROM relation "
// + " WHERE datainfo.deletedbyinference = false "
// + getConstraintList(" relClass = '", allowedsemrel)
// + ") r1"
// + " JOIN resproj_relation r2 "
// + " ON r1.source = r2.source "
// + " ) tmp "
// + "GROUP BY projectId ";
spark
.sql(query)
.sql(potential_update_query)
.as(Encoders.bean(ResultProjectSet.class))
// .toJSON()
// .write()
// .mode(SaveMode.Overwrite)
// .option("compression", "gzip")
// .text(potentialUpdatePath);
.toJavaRDD()
.map(r -> OBJECT_MAPPER.writeValueAsString(r))
.saveAsTextFile(potentialUpdatePath, GzipCodec.class);
.write()
.option("compression", "gzip")
.mode(SaveMode.Overwrite)
.json(potentialUpdatePath);
query = "SELECT source resultId, collect_set(target) projectSet "
String result_projectset_query = "SELECT source resultId, collect_set(target) projectSet "
+ "FROM resproj_relation "
+ "GROUP BY source";
spark
.sql(query)
.sql(result_projectset_query)
.as(Encoders.bean(ResultProjectSet.class))
// .toJSON()
// .write()
// .mode(SaveMode.Overwrite)
// .option("compression", "gzip")
// .text(alreadyLinkedPath);
.toJavaRDD()
.map(r -> OBJECT_MAPPER.writeValueAsString(r))
.saveAsTextFile(alreadyLinkedPath, GzipCodec.class);
.write()
.option("compression", "gzip")
.mode(SaveMode.Overwrite)
.json(alreadyLinkedPath);
}
}

View File

@ -0,0 +1,147 @@
package eu.dnetlib.dhp.projecttoresult;
import static eu.dnetlib.dhp.PropagationConstant.*;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.sql.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.countrypropagation.PrepareDatasourceCountryAssociation;
import eu.dnetlib.dhp.schema.oaf.Relation;
import scala.Tuple2;
public class SparkResultToProjectThroughSemRelJob {
private static final Logger log = LoggerFactory.getLogger(PrepareDatasourceCountryAssociation.class);
public static void main(String[] args) throws Exception {
String jsonConfiguration = IOUtils
.toString(
SparkResultToProjectThroughSemRelJob.class
.getResourceAsStream(
"/eu/dnetlib/dhp/projecttoresult/input_projecttoresult_parameters.json"));
final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
parser.parseArgument(args);
Boolean isSparkSessionManaged = isSparkSessionManaged(parser);
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
final String outputPath = parser.get("outputPath");
log.info("outputPath {}: ", outputPath);
final String potentialUpdatePath = parser.get("potentialUpdatePath");
log.info("potentialUpdatePath {}: ", potentialUpdatePath);
final String alreadyLinkedPath = parser.get("alreadyLinkedPath");
log.info("alreadyLinkedPath {}: ", alreadyLinkedPath);
final Boolean saveGraph = Boolean.valueOf(parser.get("saveGraph"));
log.info("saveGraph: {}", saveGraph);
SparkConf conf = new SparkConf();
runWithSparkSession(
conf,
isSparkSessionManaged,
spark -> {
if (isTest(parser)) {
removeOutputDir(spark, outputPath);
}
execPropagation(
spark, outputPath, alreadyLinkedPath, potentialUpdatePath, saveGraph);
});
}
private static void execPropagation(
SparkSession spark,
String outputPath,
String alreadyLinkedPath,
String potentialUpdatePath,
Boolean saveGraph) {
Dataset<ResultProjectSet> toaddrelations = readPath(spark, potentialUpdatePath, ResultProjectSet.class);
Dataset<ResultProjectSet> alreadyLinked = readPath(spark, alreadyLinkedPath, ResultProjectSet.class);
if (saveGraph) {
toaddrelations
.joinWith(
alreadyLinked,
toaddrelations.col("resultId").equalTo(alreadyLinked.col("resultId")),
"left_outer")
.flatMap(mapRelationRn(), Encoders.bean(Relation.class))
.write()
.mode(SaveMode.Append)
.option("compression", "gzip")
.json(outputPath);
}
}
private static FlatMapFunction<Tuple2<ResultProjectSet, ResultProjectSet>, Relation> mapRelationRn() {
return (FlatMapFunction<Tuple2<ResultProjectSet, ResultProjectSet>, Relation>) value -> {
List<Relation> new_relations = new ArrayList<>();
ResultProjectSet potential_update = value._1();
Optional<ResultProjectSet> already_linked = Optional.ofNullable(value._2());
if (already_linked.isPresent()) {
already_linked
.get()
.getProjectSet()
.stream()
.forEach(
(p -> {
if (potential_update
.getProjectSet()
.contains(p)) {
potential_update.getProjectSet().remove(p);
}
}));
}
String resId = potential_update.getResultId();
potential_update
.getProjectSet()
.stream()
.forEach(
projectId -> {
new_relations
.add(
getRelation(
resId,
projectId,
RELATION_RESULT_PROJECT_REL_CLASS,
RELATION_RESULTPROJECT_REL_TYPE,
RELATION_RESULTPROJECT_SUBREL_TYPE,
PROPAGATION_DATA_INFO_TYPE,
PROPAGATION_RELATION_RESULT_PROJECT_SEM_REL_CLASS_ID,
PROPAGATION_RELATION_RESULT_PROJECT_SEM_REL_CLASS_NAME));
new_relations
.add(
getRelation(
projectId,
resId,
RELATION_PROJECT_RESULT_REL_CLASS,
RELATION_RESULTPROJECT_REL_TYPE,
RELATION_RESULTPROJECT_SUBREL_TYPE,
PROPAGATION_DATA_INFO_TYPE,
PROPAGATION_RELATION_RESULT_PROJECT_SEM_REL_CLASS_ID,
PROPAGATION_RELATION_RESULT_PROJECT_SEM_REL_CLASS_NAME));
});
return new_relations.iterator();
};
}
}

View File

@ -1,159 +0,0 @@
package eu.dnetlib.dhp.projecttoresult;
import static eu.dnetlib.dhp.PropagationConstant.*;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.sql.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.countrypropagation.PrepareDatasourceCountryAssociation;
import eu.dnetlib.dhp.schema.oaf.Relation;
public class SparkResultToProjectThroughSemRelJob3 {
private static final Logger log = LoggerFactory.getLogger(PrepareDatasourceCountryAssociation.class);
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
public static void main(String[] args) throws Exception {
String jsonConfiguration = IOUtils
.toString(
SparkResultToProjectThroughSemRelJob3.class
.getResourceAsStream(
"/eu/dnetlib/dhp/projecttoresult/input_projecttoresult_parameters.json"));
final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
parser.parseArgument(args);
Boolean isSparkSessionManaged = isSparkSessionManaged(parser);
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
final String outputPath = parser.get("outputPath");
log.info("outputPath {}: ", outputPath);
final String potentialUpdatePath = parser.get("potentialUpdatePath");
log.info("potentialUpdatePath {}: ", potentialUpdatePath);
final String alreadyLinkedPath = parser.get("alreadyLinkedPath");
log.info("alreadyLinkedPath {}: ", alreadyLinkedPath);
final Boolean saveGraph = Boolean.valueOf(parser.get("saveGraph"));
log.info("saveGraph: {}", saveGraph);
SparkConf conf = new SparkConf();
runWithSparkSession(
conf,
isSparkSessionManaged,
spark -> {
if (isTest(parser)) {
removeOutputDir(spark, outputPath);
}
execPropagation(
spark, outputPath, alreadyLinkedPath, potentialUpdatePath, saveGraph);
});
}
private static void execPropagation(
SparkSession spark,
String outputPath,
String alreadyLinkedPath,
String potentialUpdatePath,
Boolean saveGraph) {
Dataset<ResultProjectSet> toaddrelations = readAssocResultProjects(spark, potentialUpdatePath);
Dataset<ResultProjectSet> alreadyLinked = readAssocResultProjects(spark, alreadyLinkedPath);
if (saveGraph) {
getNewRelations(alreadyLinked, toaddrelations)
.toJSON()
.write()
.mode(SaveMode.Append)
.option("compression", "gzip")
.text(outputPath);
}
}
private static Dataset<Relation> getNewRelations(
Dataset<ResultProjectSet> alreadyLinked, Dataset<ResultProjectSet> toaddrelations) {
return toaddrelations
.joinWith(
alreadyLinked,
toaddrelations.col("resultId").equalTo(alreadyLinked.col("resultId")),
"left_outer")
.flatMap(
value -> {
List<Relation> new_relations = new ArrayList<>();
ResultProjectSet potential_update = value._1();
Optional<ResultProjectSet> already_linked = Optional.ofNullable(value._2());
if (already_linked.isPresent()) {
already_linked
.get()
.getProjectSet()
.stream()
.forEach(
(p -> {
if (potential_update
.getProjectSet()
.contains(p)) {
potential_update.getProjectSet().remove(p);
}
}));
}
String resId = potential_update.getResultId();
potential_update
.getProjectSet()
.stream()
.forEach(
pId -> {
new_relations
.add(
getRelation(
resId,
pId,
RELATION_RESULT_PROJECT_REL_CLASS,
RELATION_RESULTPROJECT_REL_TYPE,
RELATION_RESULTPROJECT_SUBREL_TYPE,
PROPAGATION_DATA_INFO_TYPE,
PROPAGATION_RELATION_RESULT_PROJECT_SEM_REL_CLASS_ID,
PROPAGATION_RELATION_RESULT_PROJECT_SEM_REL_CLASS_NAME));
new_relations
.add(
getRelation(
pId,
resId,
RELATION_PROJECT_RESULT_REL_CLASS,
RELATION_RESULTPROJECT_REL_TYPE,
RELATION_RESULTPROJECT_SUBREL_TYPE,
PROPAGATION_DATA_INFO_TYPE,
PROPAGATION_RELATION_RESULT_PROJECT_SEM_REL_CLASS_ID,
PROPAGATION_RELATION_RESULT_PROJECT_SEM_REL_CLASS_NAME));
});
return new_relations.iterator();
},
Encoders.bean(Relation.class));
}
private static Dataset<ResultProjectSet> readAssocResultProjects(
SparkSession spark, String potentialUpdatePath) {
return spark
.read()
.textFile(potentialUpdatePath)
.map(
value -> OBJECT_MAPPER.readValue(value, ResultProjectSet.class),
Encoders.bean(ResultProjectSet.class));
}
}

View File

@ -8,6 +8,7 @@ import java.util.*;
import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -22,8 +23,6 @@ public class PrepareResultCommunitySet {
private static final Logger log = LoggerFactory.getLogger(PrepareResultCommunitySet.class);
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
public static void main(String[] args) throws Exception {
String jsonConfiguration = IOUtils
.toString(
@ -32,7 +31,6 @@ public class PrepareResultCommunitySet {
"/eu/dnetlib/dhp/resulttocommunityfromorganization/input_preparecommunitytoresult_parameters.json"));
final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
parser.parseArgument(args);
Boolean isSparkSessionManaged = isSparkSessionManaged(parser);
@ -69,7 +67,8 @@ public class PrepareResultCommunitySet {
String inputPath,
String outputPath,
OrganizationMap organizationMap) {
Dataset<Relation> relation = readRelations(spark, inputPath);
Dataset<Relation> relation = readPath(spark, inputPath, Relation.class);
relation.createOrReplaceTempView("relation");
String query = "SELECT result_organization.source resultId, result_organization.target orgId, org_set merges "
@ -88,46 +87,44 @@ public class PrepareResultCommunitySet {
+ " GROUP BY source) organization_organization "
+ "ON result_organization.target = organization_organization.source ";
org.apache.spark.sql.Dataset<ResultOrganizations> result_organizationset = spark
Dataset<ResultOrganizations> result_organizationset = spark
.sql(query)
.as(Encoders.bean(ResultOrganizations.class));
result_organizationset
.map(
value -> {
String rId = value.getResultId();
Optional<List<String>> orgs = Optional.ofNullable(value.getMerges());
String oTarget = value.getOrgId();
Set<String> communitySet = new HashSet<>();
if (organizationMap.containsKey(oTarget)) {
communitySet.addAll(organizationMap.get(oTarget));
}
if (orgs.isPresent())
// try{
for (String oId : orgs.get()) {
if (organizationMap.containsKey(oId)) {
communitySet.addAll(organizationMap.get(oId));
}
}
// }catch(Exception e){
//
// }
if (communitySet.size() > 0) {
ResultCommunityList rcl = new ResultCommunityList();
rcl.setResultId(rId);
ArrayList<String> communityList = new ArrayList<>();
communityList.addAll(communitySet);
rcl.setCommunityList(communityList);
return rcl;
}
return null;
},
Encoders.bean(ResultCommunityList.class))
.filter(r -> r != null)
.toJSON()
.map(mapResultCommunityFn(organizationMap), Encoders.bean(ResultCommunityList.class))
.filter(Objects::nonNull)
.write()
.mode(SaveMode.Overwrite)
.option("compression", "gzip")
.text(outputPath);
.json(outputPath);
}
private static MapFunction<ResultOrganizations, ResultCommunityList> mapResultCommunityFn(
OrganizationMap organizationMap) {
return (MapFunction<ResultOrganizations, ResultCommunityList>) value -> {
String rId = value.getResultId();
Optional<List<String>> orgs = Optional.ofNullable(value.getMerges());
String oTarget = value.getOrgId();
Set<String> communitySet = new HashSet<>();
if (organizationMap.containsKey(oTarget)) {
communitySet.addAll(organizationMap.get(oTarget));
}
if (orgs.isPresent())
for (String oId : orgs.get()) {
if (organizationMap.containsKey(oId)) {
communitySet.addAll(organizationMap.get(oId));
}
}
if (communitySet.size() > 0) {
ResultCommunityList rcl = new ResultCommunityList();
rcl.setResultId(rId);
ArrayList<String> communityList = new ArrayList<>();
communityList.addAll(communitySet);
rcl.setCommunityList(communityList);
return rcl;
}
return null;
};
}
}

View File

@ -9,6 +9,8 @@ import java.util.stream.Collectors;
import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
@ -19,17 +21,16 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.schema.oaf.*;
import scala.Tuple2;
public class SparkResultToCommunityFromOrganizationJob2 {
public class SparkResultToCommunityFromOrganizationJob {
private static final Logger log = LoggerFactory.getLogger(SparkResultToCommunityFromOrganizationJob2.class);
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
private static final Logger log = LoggerFactory.getLogger(SparkResultToCommunityFromOrganizationJob.class);
public static void main(String[] args) throws Exception {
String jsonConfiguration = IOUtils
.toString(
SparkResultToCommunityFromOrganizationJob2.class
SparkResultToCommunityFromOrganizationJob.class
.getResourceAsStream(
"/eu/dnetlib/dhp/resulttocommunityfromorganization/input_communitytoresult_parameters.json"));
@ -81,54 +82,56 @@ public class SparkResultToCommunityFromOrganizationJob2 {
String outputPath,
Class<R> resultClazz,
String possibleUpdatesPath) {
org.apache.spark.sql.Dataset<ResultCommunityList> possibleUpdates = readResultCommunityList(
spark, possibleUpdatesPath);
org.apache.spark.sql.Dataset<R> result = readPathEntity(spark, inputPath, resultClazz);
Dataset<ResultCommunityList> possibleUpdates = readPath(spark, possibleUpdatesPath, ResultCommunityList.class);
Dataset<R> result = readPath(spark, inputPath, resultClazz);
result
.joinWith(
possibleUpdates,
result.col("id").equalTo(possibleUpdates.col("resultId")),
"left_outer")
.map(
value -> {
R ret = value._1();
Optional<ResultCommunityList> rcl = Optional.ofNullable(value._2());
if (rcl.isPresent()) {
ArrayList<String> communitySet = rcl.get().getCommunityList();
List<String> contextList = ret
.getContext()
.stream()
.map(con -> con.getId())
.collect(Collectors.toList());
Result res = new Result();
res.setId(ret.getId());
List<Context> propagatedContexts = new ArrayList<>();
for (String cId : communitySet) {
if (!contextList.contains(cId)) {
Context newContext = new Context();
newContext.setId(cId);
newContext
.setDataInfo(
Arrays
.asList(
getDataInfo(
PROPAGATION_DATA_INFO_TYPE,
PROPAGATION_RESULT_COMMUNITY_ORGANIZATION_CLASS_ID,
PROPAGATION_RESULT_COMMUNITY_ORGANIZATION_CLASS_NAME)));
propagatedContexts.add(newContext);
}
}
res.setContext(propagatedContexts);
ret.mergeFrom(res);
}
return ret;
},
Encoders.bean(resultClazz))
.toJSON()
.map(resultCommunityFn(), Encoders.bean(resultClazz))
.write()
.mode(SaveMode.Overwrite)
.option("compression", "gzip")
.text(outputPath);
.json(outputPath);
}
private static <R extends Result> MapFunction<Tuple2<R, ResultCommunityList>, R> resultCommunityFn() {
return (MapFunction<Tuple2<R, ResultCommunityList>, R>) value -> {
R ret = value._1();
Optional<ResultCommunityList> rcl = Optional.ofNullable(value._2());
if (rcl.isPresent()) {
ArrayList<String> communitySet = rcl.get().getCommunityList();
List<String> contextList = ret
.getContext()
.stream()
.map(con -> con.getId())
.collect(Collectors.toList());
Result res = new Result();
res.setId(ret.getId());
List<Context> propagatedContexts = new ArrayList<>();
for (String cId : communitySet) {
if (!contextList.contains(cId)) {
Context newContext = new Context();
newContext.setId(cId);
newContext
.setDataInfo(
Arrays
.asList(
getDataInfo(
PROPAGATION_DATA_INFO_TYPE,
PROPAGATION_RESULT_COMMUNITY_ORGANIZATION_CLASS_ID,
PROPAGATION_RESULT_COMMUNITY_ORGANIZATION_CLASS_NAME)));
propagatedContexts.add(newContext);
}
}
res.setContext(propagatedContexts);
ret.mergeFrom(res);
}
return ret;
};
}
}

View File

@ -8,29 +8,56 @@ import java.util.Arrays;
import java.util.List;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.gson.Gson;
import eu.dnetlib.dhp.QueryInformationSystem;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.resulttocommunityfromorganization.ResultCommunityList;
import eu.dnetlib.dhp.schema.oaf.Relation;
import eu.dnetlib.dhp.schema.oaf.Result;
import eu.dnetlib.dhp.utils.ISLookupClientFactory;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
public class PrepareResultCommunitySetStep1 {
private static final Logger log = LoggerFactory.getLogger(PrepareResultCommunitySetStep1.class);
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
private static final String COMMUNITY_LIST_XQUERY = "for $x in collection('/db/DRIVER/ContextDSResources/ContextDSResourceType')"
+ " where $x//CONFIGURATION/context[./@type='community' or ./@type='ri']"
+ " and $x//CONFIGURATION/context/param[./@name='status']/text() != 'hidden'"
+ " return $x//CONFIGURATION/context/@id/string()";
/**
* associates to each result the set of community contexts they are associated to; associates to each target of a
* relation with allowed semantics the set of community context it could possibly inherit from the source of the
* relation
*/
// TODO
private static final String RESULT_CONTEXT_QUERY_TEMPLATE = "select target resultId, community_context "
+ "from (select id, collect_set(co.id) community_context "
+ " from result "
+ " lateral view explode (context) c as co "
+ " where datainfo.deletedbyinference = false %s group by id) p "
+ " JOIN "
+ " (select source, target from relation "
+ " where datainfo.deletedbyinference = false %s ) r ON p.id = r.source";
/**
* a dataset for example could be linked to more than one publication. For each publication linked to that dataset
* the previous query will produce a row: targetId set of community context the target could possibly inherit with
* the following query there will be a single row for each result linked to more than one result of the result type
* currently being used
*/
// TODO
private static final String RESULT_COMMUNITY_LIST_QUERY = "select resultId , collect_set(co) communityList "
+ "from result_context "
+ "lateral view explode (community_context) c as co "
+ "where length(co) > 0 "
+ "group by resultId";
public static void main(String[] args) throws Exception {
String jsonConfiguration = IOUtils
@ -64,7 +91,7 @@ public class PrepareResultCommunitySetStep1 {
final String isLookupUrl = parser.get("isLookUpUrl");
log.info("isLookupUrl: {}", isLookupUrl);
final List<String> communityIdList = QueryInformationSystem.getCommunityList(isLookupUrl);
final List<String> communityIdList = getCommunityList(isLookupUrl);
log.info("communityIdList: {}", new Gson().toJson(communityIdList));
final String resultType = resultClassName.substring(resultClassName.lastIndexOf(".") + 1).toLowerCase();
@ -98,78 +125,43 @@ public class PrepareResultCommunitySetStep1 {
Class<R> resultClazz,
String resultType,
List<String> communityIdList) {
// read the relation table and the table related to the result it is using
final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
org.apache.spark.sql.Dataset<Relation> relation = spark
.createDataset(
sc
.textFile(inputPath + "/relation")
.map(item -> OBJECT_MAPPER.readValue(item, Relation.class))
.rdd(),
Encoders.bean(Relation.class));
final String inputResultPath = inputPath + "/" + resultType;
log.info("Reading Graph table from: {}", inputResultPath);
final String inputRelationPath = inputPath + "/relation";
log.info("Reading relation table from: {}", inputResultPath);
Dataset<Relation> relation = readPath(spark, inputRelationPath, Relation.class);
relation.createOrReplaceTempView("relation");
log.info("Reading Graph table from: {}", inputPath + "/" + resultType);
Dataset<R> result = readPathEntity(spark, inputPath + "/" + resultType, resultClazz);
Dataset<R> result = readPath(spark, inputResultPath, resultClazz);
result.createOrReplaceTempView("result");
getPossibleResultcommunityAssociation(
spark, allowedsemrel, outputPath + "/" + resultType, communityIdList);
}
final String outputResultPath = outputPath + "/" + resultType;
log.info("writing output results to: {}", outputResultPath);
private static void getPossibleResultcommunityAssociation(
SparkSession spark,
List<String> allowedsemrel,
String outputPath,
List<String> communityIdList) {
String resultContextQuery = String
.format(
RESULT_CONTEXT_QUERY_TEMPLATE,
getConstraintList(" co.id = '", communityIdList),
getConstraintList(" relClass = '", allowedsemrel));
String communitylist = getConstraintList(" co.id = '", communityIdList);
String semrellist = getConstraintList(" relClass = '", allowedsemrel);
/*
* associates to each result the set of community contexts they are associated to select id, collect_set(co.id)
* community_context " + " from result " + " lateral view explode (context) c as co " +
* " where datainfo.deletedbyinference = false "+ communitylist + " group by id associates to each target
* of a relation with allowed semantics the set of community context it could possibly inherit from the source
* of the relation
*/
String query = "Select target resultId, community_context "
+ "from (select id, collect_set(co.id) community_context "
+ " from result "
+ " lateral view explode (context) c as co "
+ " where datainfo.deletedbyinference = false "
+ communitylist
+ " group by id) p "
+ "JOIN "
+ "(select source, target "
+ "from relation "
+ "where datainfo.deletedbyinference = false "
+ semrellist
+ ") r "
+ "ON p.id = r.source";
org.apache.spark.sql.Dataset<Row> result_context = spark.sql(query);
Dataset<Row> result_context = spark.sql(resultContextQuery);
result_context.createOrReplaceTempView("result_context");
// ( target, (mes, dh-ch-, ni))
/*
* a dataset for example could be linked to more than one publication. For each publication linked to that
* dataset the previous query will produce a row: targetId set of community context the target could possibly
* inherit with the following query there will be a single row for each result linked to more than one result of
* the result type currently being used
*/
query = "select resultId , collect_set(co) communityList "
+ "from result_context "
+ "lateral view explode (community_context) c as co "
+ "where length(co) > 0 "
+ "group by resultId";
spark
.sql(query)
.sql(RESULT_COMMUNITY_LIST_QUERY)
.as(Encoders.bean(ResultCommunityList.class))
.toJavaRDD()
.map(r -> OBJECT_MAPPER.writeValueAsString(r))
.saveAsTextFile(outputPath, GzipCodec.class);
.write()
.option("compression", "gzip")
.mode(SaveMode.Overwrite)
.json(outputResultPath);
}
public static List<String> getCommunityList(final String isLookupUrl) throws ISLookUpException {
ISLookUpService isLookUp = ISLookupClientFactory.getLookUpService(isLookupUrl);
return isLookUp.quickSearchProfile(COMMUNITY_LIST_XQUERY);
}
}

View File

@ -62,11 +62,11 @@ public class PrepareResultCommunitySetStep2 {
private static void mergeInfo(SparkSession spark, String inputPath, String outputPath) {
Dataset<ResultCommunityList> resultOrcidAssocCommunityList = readResultCommunityList(
spark, inputPath + "/publication")
.union(readResultCommunityList(spark, inputPath + "/dataset"))
.union(readResultCommunityList(spark, inputPath + "/otherresearchproduct"))
.union(readResultCommunityList(spark, inputPath + "/software"));
Dataset<ResultCommunityList> resultOrcidAssocCommunityList = readPath(
spark, inputPath + "/publication", ResultCommunityList.class)
.union(readPath(spark, inputPath + "/dataset", ResultCommunityList.class))
.union(readPath(spark, inputPath + "/otherresearchproduct", ResultCommunityList.class))
.union(readPath(spark, inputPath + "/software", ResultCommunityList.class));
resultOrcidAssocCommunityList
.toJavaRDD()
@ -80,9 +80,7 @@ public class PrepareResultCommunitySetStep2 {
return a;
}
Set<String> community_set = new HashSet<>();
a.getCommunityList().stream().forEach(aa -> community_set.add(aa));
b
.getCommunityList()
.stream()
@ -100,13 +98,4 @@ public class PrepareResultCommunitySetStep2 {
.saveAsTextFile(outputPath, GzipCodec.class);
}
private static Dataset<ResultCommunityList> readResultCommunityList(
SparkSession spark, String relationPath) {
return spark
.read()
.textFile(relationPath)
.map(
value -> OBJECT_MAPPER.readValue(value, ResultCommunityList.class),
Encoders.bean(ResultCommunityList.class));
}
}

View File

@ -9,30 +9,28 @@ import java.util.stream.Collectors;
import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.ximpleware.extended.xpath.parser;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.resulttocommunityfromorganization.ResultCommunityList;
import eu.dnetlib.dhp.schema.oaf.*;
import scala.Tuple2;
public class SparkResultToCommunityThroughSemRelJob4 {
public class SparkResultToCommunityThroughSemRelJob {
private static final Logger log = LoggerFactory.getLogger(SparkResultToCommunityThroughSemRelJob4.class);
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
private static final Logger log = LoggerFactory.getLogger(SparkResultToCommunityThroughSemRelJob.class);
public static void main(String[] args) throws Exception {
String jsonConfiguration = IOUtils
.toString(
SparkResultToCommunityThroughSemRelJob4.class
SparkResultToCommunityThroughSemRelJob.class
.getResourceAsStream(
"/eu/dnetlib/dhp/resulttocommunityfromsemrel/input_communitytoresult_parameters.json"));
@ -87,58 +85,59 @@ public class SparkResultToCommunityThroughSemRelJob4 {
String preparedInfoPath,
Class<R> resultClazz) {
org.apache.spark.sql.Dataset<ResultCommunityList> possibleUpdates = readResultCommunityList(
spark, preparedInfoPath);
org.apache.spark.sql.Dataset<R> result = readPathEntity(spark, inputPath, resultClazz);
Dataset<ResultCommunityList> possibleUpdates = readPath(spark, preparedInfoPath, ResultCommunityList.class);
Dataset<R> result = readPath(spark, inputPath, resultClazz);
result
.joinWith(
possibleUpdates,
result.col("id").equalTo(possibleUpdates.col("resultId")),
"left_outer")
.map(
value -> {
R ret = value._1();
Optional<ResultCommunityList> rcl = Optional.ofNullable(value._2());
if (rcl.isPresent()) {
Set<String> context_set = new HashSet<>();
ret.getContext().stream().forEach(c -> context_set.add(c.getId()));
List<Context> contextList = rcl
.get()
.getCommunityList()
.stream()
.map(
c -> {
if (!context_set.contains(c)) {
Context newContext = new Context();
newContext.setId(c);
newContext
.setDataInfo(
Arrays
.asList(
getDataInfo(
PROPAGATION_DATA_INFO_TYPE,
PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID,
PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME)));
return newContext;
}
return null;
})
.filter(c -> c != null)
.collect(Collectors.toList());
Result r = new Result();
r.setId(ret.getId());
r.setContext(contextList);
ret.mergeFrom(r);
}
return ret;
},
Encoders.bean(resultClazz))
.toJSON()
.map(contextUpdaterFn(), Encoders.bean(resultClazz))
.write()
.mode(SaveMode.Overwrite)
.option("compression", "gzip")
.text(outputPath);
.json(outputPath);
}
private static <R extends Result> MapFunction<Tuple2<R, ResultCommunityList>, R> contextUpdaterFn() {
return (MapFunction<Tuple2<R, ResultCommunityList>, R>) value -> {
R ret = value._1();
Optional<ResultCommunityList> rcl = Optional.ofNullable(value._2());
if (rcl.isPresent()) {
Set<String> context_set = new HashSet<>();
ret.getContext().stream().forEach(c -> context_set.add(c.getId()));
List<Context> contextList = rcl
.get()
.getCommunityList()
.stream()
.map(
c -> {
if (!context_set.contains(c)) {
Context newContext = new Context();
newContext.setId(c);
newContext
.setDataInfo(
Arrays
.asList(
getDataInfo(
PROPAGATION_DATA_INFO_TYPE,
PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_ID,
PROPAGATION_RESULT_COMMUNITY_SEMREL_CLASS_NAME)));
return newContext;
}
return null;
})
.filter(Objects::nonNull)
.collect(Collectors.toList());
Result r = new Result();
r.setId(ret.getId());
r.setContext(contextList);
ret.mergeFrom(r);
}
return ret;
};
}
}

View File

@ -7,7 +7,7 @@ import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkHiveSession;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
@ -58,8 +58,7 @@ public class PrepareResultInstRepoAssociation {
isSparkSessionManaged,
spark -> {
readNeededResources(spark, inputPath);
prepareDatasourceOrganizationAssociations(
spark, datasourceOrganizationPath, alreadyLinkedPath);
prepareDatasourceOrganization(spark, datasourceOrganizationPath);
prepareAlreadyLinkedAssociation(spark, alreadyLinkedPath);
});
}
@ -77,45 +76,25 @@ public class PrepareResultInstRepoAssociation {
spark
.sql(query)
.as(Encoders.bean(ResultOrganizationSet.class))
// TODO retry to stick with datasets
.toJavaRDD()
.map(r -> OBJECT_MAPPER.writeValueAsString(r))
.saveAsTextFile(alreadyLinkedPath, GzipCodec.class);
}
private static void readNeededResources(SparkSession spark, String inputPath) {
final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
org.apache.spark.sql.Dataset<Datasource> datasource = spark
.createDataset(
sc
.textFile(inputPath + "/datasource")
.map(item -> new ObjectMapper().readValue(item, Datasource.class))
.rdd(),
Encoders.bean(Datasource.class));
org.apache.spark.sql.Dataset<Relation> relation = spark
.createDataset(
sc
.textFile(inputPath + "/relation")
.map(item -> new ObjectMapper().readValue(item, Relation.class))
.rdd(),
Encoders.bean(Relation.class));
org.apache.spark.sql.Dataset<Organization> organization = spark
.createDataset(
sc
.textFile(inputPath + "/organization")
.map(item -> new ObjectMapper().readValue(item, Organization.class))
.rdd(),
Encoders.bean(Organization.class));
Dataset<Datasource> datasource = readPath(spark, inputPath + "/datasource", Datasource.class);
datasource.createOrReplaceTempView("datasource");
Dataset<Relation> relation = readPath(spark, inputPath + "/relation", Relation.class);
relation.createOrReplaceTempView("relation");
Dataset<Organization> organization = readPath(spark, inputPath + "/organization", Organization.class);
organization.createOrReplaceTempView("organization");
}
private static void prepareDatasourceOrganizationAssociations(
SparkSession spark, String datasourceOrganizationPath, String alreadyLinkedPath) {
private static void prepareDatasourceOrganization(
SparkSession spark, String datasourceOrganizationPath) {
String query = "SELECT source datasourceId, target organizationId "
+ "FROM ( SELECT id "
@ -135,10 +114,9 @@ public class PrepareResultInstRepoAssociation {
spark
.sql(query)
.as(Encoders.bean(DatasourceOrganization.class))
.toJSON()
.write()
.mode(SaveMode.Overwrite)
.option("compression", "gzip")
.text(datasourceOrganizationPath);
.json(datasourceOrganizationPath);
}
}

View File

@ -0,0 +1,193 @@
package eu.dnetlib.dhp.resulttoorganizationfrominstrepo;
import static eu.dnetlib.dhp.PropagationConstant.*;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkHiveSession;
import java.util.*;
import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.broadcast.Broadcast;
import org.apache.spark.sql.*;
import org.apache.spark.sql.Dataset;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.schema.oaf.*;
import scala.Tuple2;
public class SparkResultToOrganizationFromIstRepoJob {
private static final Logger log = LoggerFactory.getLogger(SparkResultToOrganizationFromIstRepoJob.class);
private static final String RESULT_ORGANIZATIONSET_QUERY = "SELECT id resultId, collect_set(organizationId) organizationSet "
+ "FROM ( SELECT id, organizationId "
+ "FROM rels "
+ "JOIN cfhb "
+ " ON cf = datasourceId "
+ "UNION ALL "
+ "SELECT id , organizationId "
+ "FROM rels "
+ "JOIN cfhb "
+ " ON hb = datasourceId ) tmp "
+ "GROUP BY id";
public static void main(String[] args) throws Exception {
String jsonConfiguration = IOUtils
.toString(
SparkResultToOrganizationFromIstRepoJob.class
.getResourceAsStream(
"/eu/dnetlib/dhp/resulttoorganizationfrominstrepo/input_propagationresulaffiliationfrominstrepo_parameters.json"));
final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
parser.parseArgument(args);
Boolean isSparkSessionManaged = isSparkSessionManaged(parser);
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
String inputPath = parser.get("sourcePath");
log.info("inputPath: {}", inputPath);
final String outputPath = parser.get("outputPath");
log.info("outputPath: {}", outputPath);
final String datasourceorganization = parser.get("datasourceOrganizationPath");
log.info("datasourceOrganizationPath: {}", datasourceorganization);
final String alreadylinked = parser.get("alreadyLinkedPath");
log.info("alreadyLinkedPath: {}", alreadylinked);
final String resultClassName = parser.get("resultTableName");
log.info("resultTableName: {}", resultClassName);
final Boolean saveGraph = Optional
.ofNullable(parser.get("saveGraph"))
.map(Boolean::valueOf)
.orElse(Boolean.TRUE);
log.info("saveGraph: {}", saveGraph);
Class<? extends Result> resultClazz = (Class<? extends Result>) Class.forName(resultClassName);
SparkConf conf = new SparkConf();
conf.set("hive.metastore.uris", parser.get("hive_metastore_uris"));
runWithSparkHiveSession(
conf,
isSparkSessionManaged,
spark -> {
if (isTest(parser)) {
removeOutputDir(spark, outputPath);
}
if (saveGraph)
execPropagation(
spark,
datasourceorganization,
alreadylinked,
inputPath,
outputPath,
resultClazz);
});
}
private static void execPropagation(
SparkSession spark,
String datasourceorganization,
String alreadyLinkedPath,
String inputPath,
String outputPath,
Class<? extends Result> clazz) {
Dataset<DatasourceOrganization> ds_org = readPath(spark, datasourceorganization, DatasourceOrganization.class);
Dataset<ResultOrganizationSet> potentialUpdates = getPotentialRelations(spark, inputPath, clazz, ds_org);
Dataset<ResultOrganizationSet> alreadyLinked = readPath(spark, alreadyLinkedPath, ResultOrganizationSet.class);
potentialUpdates
.joinWith(
alreadyLinked,
potentialUpdates.col("resultId").equalTo(alreadyLinked.col("resultId")),
"left_outer")
.flatMap(createRelationFn(), Encoders.bean(Relation.class))
.write()
.mode(SaveMode.Append)
.option("compression", "gzip")
.json(outputPath);
}
private static FlatMapFunction<Tuple2<ResultOrganizationSet, ResultOrganizationSet>, Relation> createRelationFn() {
return (FlatMapFunction<Tuple2<ResultOrganizationSet, ResultOrganizationSet>, Relation>) value -> {
List<Relation> new_relations = new ArrayList<>();
ResultOrganizationSet potential_update = value._1();
Optional<ResultOrganizationSet> already_linked = Optional.ofNullable(value._2());
List<String> organization_list = potential_update.getOrganizationSet();
if (already_linked.isPresent()) {
already_linked
.get()
.getOrganizationSet()
.stream()
.forEach(
rId -> {
if (organization_list.contains(rId)) {
organization_list.remove(rId);
}
});
}
String resultId = potential_update.getResultId();
organization_list
.stream()
.forEach(
orgId -> {
new_relations
.add(
getRelation(
orgId,
resultId,
RELATION_ORGANIZATION_RESULT_REL_CLASS,
RELATION_RESULTORGANIZATION_REL_TYPE,
RELATION_RESULTORGANIZATION_SUBREL_TYPE,
PROPAGATION_DATA_INFO_TYPE,
PROPAGATION_RELATION_RESULT_ORGANIZATION_INST_REPO_CLASS_ID,
PROPAGATION_RELATION_RESULT_ORGANIZATION_INST_REPO_CLASS_NAME));
new_relations
.add(
getRelation(
resultId,
orgId,
RELATION_RESULT_ORGANIZATION_REL_CLASS,
RELATION_RESULTORGANIZATION_REL_TYPE,
RELATION_RESULTORGANIZATION_SUBREL_TYPE,
PROPAGATION_DATA_INFO_TYPE,
PROPAGATION_RELATION_RESULT_ORGANIZATION_INST_REPO_CLASS_ID,
PROPAGATION_RELATION_RESULT_ORGANIZATION_INST_REPO_CLASS_NAME));
});
return new_relations.iterator();
};
}
private static <R extends Result> Dataset<ResultOrganizationSet> getPotentialRelations(
SparkSession spark,
String inputPath,
Class<R> resultClazz,
Dataset<DatasourceOrganization> ds_org) {
Dataset<R> result = readPath(spark, inputPath, resultClazz);
result.createOrReplaceTempView("result");
createCfHbforResult(spark);
ds_org.createOrReplaceTempView("rels");
return spark
.sql(RESULT_ORGANIZATIONSET_QUERY)
.as(Encoders.bean(ResultOrganizationSet.class));
}
}

View File

@ -1,232 +0,0 @@
package eu.dnetlib.dhp.resulttoorganizationfrominstrepo;
import static eu.dnetlib.dhp.PropagationConstant.*;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkHiveSession;
import java.util.*;
import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.broadcast.Broadcast;
import org.apache.spark.sql.*;
import org.apache.spark.sql.Dataset;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.schema.oaf.*;
import scala.Tuple2;
public class SparkResultToOrganizationFromIstRepoJob2 {
private static final Logger log = LoggerFactory.getLogger(SparkResultToOrganizationFromIstRepoJob2.class);
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
public static void main(String[] args) throws Exception {
String jsonConfiguration = IOUtils
.toString(
SparkResultToOrganizationFromIstRepoJob2.class
.getResourceAsStream(
"/eu/dnetlib/dhp/resulttoorganizationfrominstrepo/input_propagationresulaffiliationfrominstrepo_parameters.json"));
final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
parser.parseArgument(args);
Boolean isSparkSessionManaged = isSparkSessionManaged(parser);
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
String inputPath = parser.get("sourcePath");
log.info("inputPath: {}", inputPath);
final String outputPath = parser.get("outputPath");
log.info("outputPath: {}", outputPath);
final String datasourceorganization = parser.get("datasourceOrganizationPath");
log.info("datasourceOrganizationPath: {}", datasourceorganization);
final String alreadylinked = parser.get("alreadyLinkedPath");
log.info("alreadyLinkedPath: {}", alreadylinked);
final String resultClassName = parser.get("resultTableName");
log.info("resultTableName: {}", resultClassName);
final Boolean saveGraph = Optional
.ofNullable(parser.get("saveGraph"))
.map(Boolean::valueOf)
.orElse(Boolean.TRUE);
log.info("saveGraph: {}", saveGraph);
Class<? extends Result> resultClazz = (Class<? extends Result>) Class.forName(resultClassName);
SparkConf conf = new SparkConf();
conf.set("hive.metastore.uris", parser.get("hive_metastore_uris"));
runWithSparkHiveSession(
conf,
isSparkSessionManaged,
spark -> {
if (isTest(parser)) {
removeOutputDir(spark, outputPath);
}
if (saveGraph)
execPropagation(
spark,
datasourceorganization,
alreadylinked,
inputPath,
outputPath,
resultClazz);
});
}
private static void execPropagation(
SparkSession spark,
String datasourceorganization,
String alreadylinked,
String inputPath,
String outputPath,
Class<? extends Result> resultClazz) {
final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
org.apache.spark.sql.Dataset<DatasourceOrganization> datasourceorganizationassoc = readAssocDatasourceOrganization(
spark, datasourceorganization);
// broadcasting the result of the preparation step
Broadcast<org.apache.spark.sql.Dataset<DatasourceOrganization>> broadcast_datasourceorganizationassoc = sc
.broadcast(datasourceorganizationassoc);
org.apache.spark.sql.Dataset<ResultOrganizationSet> potentialUpdates = getPotentialRelations(
spark,
inputPath,
resultClazz,
broadcast_datasourceorganizationassoc)
.as(Encoders.bean(ResultOrganizationSet.class));
getNewRelations(
spark
.read()
.textFile(alreadylinked)
.map(
value -> OBJECT_MAPPER
.readValue(
value, ResultOrganizationSet.class),
Encoders.bean(ResultOrganizationSet.class)),
potentialUpdates)
.toJSON()
.write()
.mode(SaveMode.Append)
.option("compression", "gzip")
.text(outputPath);
}
private static Dataset<Relation> getNewRelations(
Dataset<ResultOrganizationSet> alreadyLinked,
Dataset<ResultOrganizationSet> potentialUpdates) {
return potentialUpdates
.joinWith(
alreadyLinked,
potentialUpdates.col("resultId").equalTo(alreadyLinked.col("resultId")),
"left_outer")
.flatMap(
(FlatMapFunction<Tuple2<ResultOrganizationSet, ResultOrganizationSet>, Relation>) value -> {
List<Relation> new_relations = new ArrayList<>();
ResultOrganizationSet potential_update = value._1();
Optional<ResultOrganizationSet> already_linked = Optional.ofNullable(value._2());
List<String> organization_list = potential_update.getOrganizationSet();
if (already_linked.isPresent()) {
already_linked
.get()
.getOrganizationSet()
.stream()
.forEach(
rId -> {
if (organization_list.contains(rId)) {
organization_list.remove(rId);
}
});
}
String resultId = potential_update.getResultId();
organization_list
.stream()
.forEach(
orgId -> {
new_relations
.add(
getRelation(
orgId,
resultId,
RELATION_ORGANIZATION_RESULT_REL_CLASS,
RELATION_RESULTORGANIZATION_REL_TYPE,
RELATION_RESULTORGANIZATION_SUBREL_TYPE,
PROPAGATION_DATA_INFO_TYPE,
PROPAGATION_RELATION_RESULT_ORGANIZATION_INST_REPO_CLASS_ID,
PROPAGATION_RELATION_RESULT_ORGANIZATION_INST_REPO_CLASS_NAME));
new_relations
.add(
getRelation(
resultId,
orgId,
RELATION_RESULT_ORGANIZATION_REL_CLASS,
RELATION_RESULTORGANIZATION_REL_TYPE,
RELATION_RESULTORGANIZATION_SUBREL_TYPE,
PROPAGATION_DATA_INFO_TYPE,
PROPAGATION_RELATION_RESULT_ORGANIZATION_INST_REPO_CLASS_ID,
PROPAGATION_RELATION_RESULT_ORGANIZATION_INST_REPO_CLASS_NAME));
});
return new_relations.iterator();
},
Encoders.bean(Relation.class));
}
private static <R extends Result> org.apache.spark.sql.Dataset<ResultOrganizationSet> getPotentialRelations(
SparkSession spark,
String inputPath,
Class<R> resultClazz,
Broadcast<org.apache.spark.sql.Dataset<DatasourceOrganization>> broadcast_datasourceorganizationassoc) {
org.apache.spark.sql.Dataset<R> result = readPathEntity(spark, inputPath, resultClazz);
result.createOrReplaceTempView("result");
createCfHbforresult(spark);
return organizationPropagationAssoc(spark, broadcast_datasourceorganizationassoc);
}
private static org.apache.spark.sql.Dataset<DatasourceOrganization> readAssocDatasourceOrganization(
SparkSession spark, String datasourcecountryorganization) {
return spark
.read()
.textFile(datasourcecountryorganization)
.map(
value -> OBJECT_MAPPER.readValue(value, DatasourceOrganization.class),
Encoders.bean(DatasourceOrganization.class));
}
private static org.apache.spark.sql.Dataset<ResultOrganizationSet> organizationPropagationAssoc(
SparkSession spark,
Broadcast<org.apache.spark.sql.Dataset<DatasourceOrganization>> broadcast_datasourceorganizationassoc) {
org.apache.spark.sql.Dataset<DatasourceOrganization> datasourceorganization = broadcast_datasourceorganizationassoc
.value();
datasourceorganization.createOrReplaceTempView("rels");
String query = "SELECT id resultId, collect_set(organizationId) organizationSet "
+ "FROM ( SELECT id, organizationId "
+ "FROM rels "
+ "JOIN cfhb "
+ " ON cf = datasourceId "
+ "UNION ALL "
+ "SELECT id , organizationId "
+ "FROM rels "
+ "JOIN cfhb "
+ " ON hb = datasourceId ) tmp "
+ "GROUP BY id";
return spark.sql(query).as(Encoders.bean(ResultOrganizationSet.class));
}
}

View File

@ -5,6 +5,12 @@
"paramDescription": "the path of the sequencial file to read",
"paramRequired": true
},
{
"paramName":"out",
"paramLongName":"outputPath",
"paramDescription": "the output path",
"paramRequired": true
},
{
"paramName":"h",
"paramLongName":"hive_metastore_uris",

View File

@ -19,27 +19,22 @@
</parameters>
<start to="reset-outputpath"/>
<start to="reset_outputpath"/>
<kill name="Kill">
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<action name="reset-outputpath">
<action name="reset_outputpath">
<fs>
<delete path='${workingDir}/preparedInfo'/>
<delete path='${workingDir}/publication'/>
<delete path='${workingDir}/dataset'/>
<delete path='${workingDir}/otherresearchproduct'/>
<delete path='${workingDir}/software'/>
<delete path='${outputPath}/relation'/>
<delete path='${outputPath}/dataset'/>
<delete path='${outputPath}/software'/>
<delete path='${outputPath}/publication'/>
<delete path='${outputPath}/otherresearchproduct'/>
<delete path='${outputPath}/project'/>
<delete path='${outputPath}/organization'/>
<delete path='${outputPath}/datasource'/>
<delete path="${outputPath}/relation"/>
<delete path="${outputPath}/dataset"/>
<delete path="${outputPath}/software"/>
<delete path="${outputPath}/publication"/>
<delete path="${outputPath}/otherresearchproduct"/>
<delete path="${outputPath}/project"/>
<delete path="${outputPath}/organization"/>
<delete path="${outputPath}/datasource"/>
</fs>
<ok to="copy_entities"/>
<error to="Kill"/>
@ -50,11 +45,8 @@
<path start="copy_organization"/>
<path start="copy_projects"/>
<path start="copy_datasources"/>
<path start="copy_publication"/>
<path start="copy_dataset"/>
<path start="copy_orp"/>
<path start="copy_software"/>
</fork>
<action name="copy_relation">
<distcp xmlns="uri:oozie:distcp-action:0.2">
<job-tracker>${jobTracker}</job-tracker>
@ -98,50 +90,6 @@
<error to="Kill"/>
</action>
<action name="copy_publication">
<distcp xmlns="uri:oozie:distcp-action:0.2">
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<arg>${nameNode}/${sourcePath}/publication</arg>
<arg>${nameNode}/${workingDir}/publication</arg>
</distcp>
<ok to="copy_wait"/>
<error to="Kill"/>
</action>
<action name="copy_dataset">
<distcp xmlns="uri:oozie:distcp-action:0.2">
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<arg>${nameNode}/${sourcePath}/dataset</arg>
<arg>${nameNode}/${workingDir}/dataset</arg>
</distcp>
<ok to="copy_wait"/>
<error to="Kill"/>
</action>
<action name="copy_orp">
<distcp xmlns="uri:oozie:distcp-action:0.2">
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<arg>${nameNode}/${sourcePath}/otherresearchproduct</arg>
<arg>${nameNode}/${workingDir}/otherresearchproduct</arg>
</distcp>
<ok to="copy_wait"/>
<error to="Kill"/>
</action>
<action name="copy_software">
<distcp xmlns="uri:oozie:distcp-action:0.2">
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<arg>${nameNode}/${sourcePath}/software</arg>
<arg>${nameNode}/${workingDir}/software</arg>
</distcp>
<ok to="copy_wait"/>
<error to="Kill"/>
</action>
<join name="copy_wait" to="prepare_datasource_country_association"/>
<action name="prepare_datasource_country_association">
@ -159,7 +107,7 @@
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=300
--conf spark.sql.shuffle.partitions=3840
</spark-opts>
<arg>--sourcePath</arg><arg>${sourcePath}</arg>
<arg>--whitelist</arg><arg>${whitelist}</arg>
@ -198,7 +146,8 @@
--conf spark.hadoop.mapreduce.reduce.speculative=false
--conf spark.sql.shuffle.partitions=3840
</spark-opts>
<arg>--sourcePath</arg><arg>${workingDir}/publication</arg>
<arg>--sourcePath</arg><arg>${sourcePath}/publication</arg>
<arg>--outputPath</arg><arg>${workingDir}/publication</arg>
<arg>--hive_metastore_uris</arg><arg>${hive_metastore_uris}</arg>
<arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.Publication</arg>
<arg>--preparedInfoPath</arg><arg>${workingDir}/preparedInfo</arg>
@ -227,7 +176,8 @@
--conf spark.hadoop.mapreduce.reduce.speculative=false
--conf spark.sql.shuffle.partitions=3840
</spark-opts>
<arg>--sourcePath</arg><arg>${workingDir}/dataset</arg>
<arg>--sourcePath</arg><arg>${sourcePath}/dataset</arg>
<arg>--outputPath</arg><arg>${workingDir}/dataset</arg>
<arg>--hive_metastore_uris</arg><arg>${hive_metastore_uris}</arg>
<arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.Dataset</arg>
<arg>--preparedInfoPath</arg><arg>${workingDir}/preparedInfo</arg>
@ -256,7 +206,8 @@
--conf spark.hadoop.mapreduce.reduce.speculative=false
--conf spark.sql.shuffle.partitions=3840
</spark-opts>
<arg>--sourcePath</arg><arg>${workingDir}/otherresearchproduct</arg>
<arg>--sourcePath</arg><arg>${sourcePath}/otherresearchproduct</arg>
<arg>--outputPath</arg><arg>${workingDir}/otherresearchproduct</arg>
<arg>--hive_metastore_uris</arg><arg>${hive_metastore_uris}</arg>
<arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.OtherResearchProduct</arg>
<arg>--preparedInfoPath</arg><arg>${workingDir}/preparedInfo</arg>
@ -285,7 +236,8 @@
--conf spark.hadoop.mapreduce.reduce.speculative=false
--conf spark.sql.shuffle.partitions=3840
</spark-opts>
<arg>--sourcePath</arg><arg>${workingDir}/software</arg>
<arg>--sourcePath</arg><arg>${sourcePath}/software</arg>
<arg>--outputPath</arg><arg>${workingDir}/software</arg>
<arg>--hive_metastore_uris</arg><arg>${hive_metastore_uris}</arg>
<arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.Software</arg>
<arg>--preparedInfoPath</arg><arg>${workingDir}/preparedInfo</arg>
@ -308,7 +260,7 @@
<master>yarn</master>
<mode>cluster</mode>
<name>countryPropagationForPublications</name>
<class>eu.dnetlib.dhp.countrypropagation.SparkCountryPropagationJob3</class>
<class>eu.dnetlib.dhp.countrypropagation.SparkCountryPropagationJob</class>
<jar>dhp-propagation-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCores}
@ -323,7 +275,8 @@
--conf spark.hadoop.mapreduce.reduce.speculative=false
--conf spark.sql.shuffle.partitions=3840
</spark-opts>
<arg>--sourcePath</arg><arg>${workingDir}/publication</arg>
<arg>--sourcePath</arg><arg>${sourcePath}/publication</arg>
<arg>--preparedInfoPath</arg><arg>${workingDir}/publication</arg>
<arg>--saveGraph</arg><arg>${saveGraph}</arg>
<arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.Publication</arg>
<arg>--outputPath</arg><arg>${outputPath}/publication</arg>
@ -337,7 +290,7 @@
<master>yarn</master>
<mode>cluster</mode>
<name>countryPropagationForDataset</name>
<class>eu.dnetlib.dhp.countrypropagation.SparkCountryPropagationJob3</class>
<class>eu.dnetlib.dhp.countrypropagation.SparkCountryPropagationJob</class>
<jar>dhp-propagation-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCores}
@ -352,7 +305,8 @@
--conf spark.hadoop.mapreduce.reduce.speculative=false
--conf spark.sql.shuffle.partitions=3840
</spark-opts>
<arg>--sourcePath</arg><arg>${workingDir}/dataset</arg>
<arg>--sourcePath</arg><arg>${sourcePath}/dataset</arg>
<arg>--preparedInfoPath</arg><arg>${workingDir}/dataset</arg>
<arg>--saveGraph</arg><arg>${saveGraph}</arg>
<arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.Dataset</arg>
<arg>--outputPath</arg><arg>${outputPath}/dataset</arg>
@ -366,7 +320,7 @@
<master>yarn</master>
<mode>cluster</mode>
<name>countryPropagationForORP</name>
<class>eu.dnetlib.dhp.countrypropagation.SparkCountryPropagationJob3</class>
<class>eu.dnetlib.dhp.countrypropagation.SparkCountryPropagationJob</class>
<jar>dhp-propagation-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCores}
@ -381,7 +335,8 @@
--conf spark.hadoop.mapreduce.reduce.speculative=false
--conf spark.sql.shuffle.partitions=3840
</spark-opts>
<arg>--sourcePath</arg><arg>${workingDir}/otherresearchproduct</arg>
<arg>--sourcePath</arg><arg>${sourcePath}/otherresearchproduct</arg>
<arg>--preparedInfoPath</arg><arg>${workingDir}/otherresearchproduct</arg>
<arg>--saveGraph</arg><arg>${saveGraph}</arg>
<arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.OtherResearchProduct</arg>
<arg>--outputPath</arg><arg>${outputPath}/otherresearchproduct</arg>
@ -395,7 +350,7 @@
<master>yarn</master>
<mode>cluster</mode>
<name>countryPropagationForSoftware</name>
<class>eu.dnetlib.dhp.countrypropagation.SparkCountryPropagationJob3</class>
<class>eu.dnetlib.dhp.countrypropagation.SparkCountryPropagationJob</class>
<jar>dhp-propagation-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCores}
@ -410,7 +365,8 @@
--conf spark.hadoop.mapreduce.reduce.speculative=false
--conf spark.sql.shuffle.partitions=3840
</spark-opts>
<arg>--sourcePath</arg><arg>${workingDir}/software</arg>
<arg>--sourcePath</arg><arg>${sourcePath}/software</arg>
<arg>--preparedInfoPath</arg><arg>${workingDir}/software</arg>
<arg>--saveGraph</arg><arg>${saveGraph}</arg>
<arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.Software</arg>
<arg>--outputPath</arg><arg>${outputPath}/software</arg>

View File

@ -253,7 +253,7 @@
<master>yarn</master>
<mode>cluster</mode>
<name>ORCIDPropagation-Publication</name>
<class>eu.dnetlib.dhp.orcidtoresultfromsemrel.SparkOrcidToResultFromSemRelJob3</class>
<class>eu.dnetlib.dhp.orcidtoresultfromsemrel.SparkOrcidToResultFromSemRelJob</class>
<jar>dhp-propagation-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCores}
@ -285,7 +285,7 @@
<master>yarn</master>
<mode>cluster</mode>
<name>ORCIDPropagation-Dataset</name>
<class>eu.dnetlib.dhp.orcidtoresultfromsemrel.SparkOrcidToResultFromSemRelJob3</class>
<class>eu.dnetlib.dhp.orcidtoresultfromsemrel.SparkOrcidToResultFromSemRelJob</class>
<jar>dhp-propagation-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCores}
@ -316,7 +316,7 @@
<master>yarn</master>
<mode>cluster</mode>
<name>ORCIDPropagation-ORP</name>
<class>eu.dnetlib.dhp.orcidtoresultfromsemrel.SparkOrcidToResultFromSemRelJob3</class>
<class>eu.dnetlib.dhp.orcidtoresultfromsemrel.SparkOrcidToResultFromSemRelJob</class>
<jar>dhp-propagation-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCores}
@ -347,7 +347,7 @@
<master>yarn</master>
<mode>cluster</mode>
<name>ORCIDPropagation-Software</name>
<class>eu.dnetlib.dhp.orcidtoresultfromsemrel.SparkOrcidToResultFromSemRelJob3</class>
<class>eu.dnetlib.dhp.orcidtoresultfromsemrel.SparkOrcidToResultFromSemRelJob</class>
<jar>dhp-propagation-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCores}

View File

@ -166,7 +166,7 @@
<master>yarn</master>
<mode>cluster</mode>
<name>ProjectToResultPropagation</name>
<class>eu.dnetlib.dhp.projecttoresult.SparkResultToProjectThroughSemRelJob3</class>
<class>eu.dnetlib.dhp.projecttoresult.SparkResultToProjectThroughSemRelJob</class>
<jar>dhp-propagation-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCores}

View File

@ -127,7 +127,7 @@
<master>yarn</master>
<mode>cluster</mode>
<name>community2resultfromorganization-Publication</name>
<class>eu.dnetlib.dhp.resulttocommunityfromorganization.SparkResultToCommunityFromOrganizationJob2</class>
<class>eu.dnetlib.dhp.resulttocommunityfromorganization.SparkResultToCommunityFromOrganizationJob</class>
<jar>dhp-propagation-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCores}
@ -155,7 +155,7 @@
<master>yarn</master>
<mode>cluster</mode>
<name>community2resultfromorganization-Dataset</name>
<class>eu.dnetlib.dhp.resulttocommunityfromorganization.SparkResultToCommunityFromOrganizationJob2</class>
<class>eu.dnetlib.dhp.resulttocommunityfromorganization.SparkResultToCommunityFromOrganizationJob</class>
<jar>dhp-propagation-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCores}
@ -183,7 +183,7 @@
<master>yarn</master>
<mode>cluster</mode>
<name>community2resultfromorganization-ORP</name>
<class>eu.dnetlib.dhp.resulttocommunityfromorganization.SparkResultToCommunityFromOrganizationJob2</class>
<class>eu.dnetlib.dhp.resulttocommunityfromorganization.SparkResultToCommunityFromOrganizationJob</class>
<jar>dhp-propagation-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCores}
@ -211,7 +211,7 @@
<master>yarn</master>
<mode>cluster</mode>
<name>community2resultfromorganization-Software</name>
<class>eu.dnetlib.dhp.resulttocommunityfromorganization.SparkResultToCommunityFromOrganizationJob2</class>
<class>eu.dnetlib.dhp.resulttocommunityfromorganization.SparkResultToCommunityFromOrganizationJob</class>
<jar>dhp-propagation-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCores}

View File

@ -252,7 +252,7 @@
<master>yarn</master>
<mode>cluster</mode>
<name>Result2CommunitySemRelPropagation-Publication</name>
<class>eu.dnetlib.dhp.resulttocommunityfromsemrel.SparkResultToCommunityThroughSemRelJob4</class>
<class>eu.dnetlib.dhp.resulttocommunityfromsemrel.SparkResultToCommunityThroughSemRelJob</class>
<jar>dhp-propagation-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCores}
@ -280,7 +280,7 @@
<master>yarn</master>
<mode>cluster</mode>
<name>Result2CommunitySemRelPropagation-Dataset</name>
<class>eu.dnetlib.dhp.resulttocommunityfromsemrel.SparkResultToCommunityThroughSemRelJob4</class>
<class>eu.dnetlib.dhp.resulttocommunityfromsemrel.SparkResultToCommunityThroughSemRelJob</class>
<jar>dhp-propagation-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCores}
@ -308,7 +308,7 @@
<master>yarn</master>
<mode>cluster</mode>
<name>Result2CommunitySemRelPropagation-ORP</name>
<class>eu.dnetlib.dhp.resulttocommunityfromsemrel.SparkResultToCommunityThroughSemRelJob4</class>
<class>eu.dnetlib.dhp.resulttocommunityfromsemrel.SparkResultToCommunityThroughSemRelJob</class>
<jar>dhp-propagation-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCores}
@ -336,7 +336,7 @@
<master>yarn</master>
<mode>cluster</mode>
<name>Result2CommunitySemRelPropagation-Software</name>
<class>eu.dnetlib.dhp.resulttocommunityfromsemrel.SparkResultToCommunityThroughSemRelJob4</class>
<class>eu.dnetlib.dhp.resulttocommunityfromsemrel.SparkResultToCommunityThroughSemRelJob</class>
<jar>dhp-propagation-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCores}

View File

@ -166,7 +166,7 @@
<master>yarn</master>
<mode>cluster</mode>
<name>resultToOrganizationFromInstRepoPropagationForPublications</name>
<class>eu.dnetlib.dhp.resulttoorganizationfrominstrepo.SparkResultToOrganizationFromIstRepoJob2</class>
<class>eu.dnetlib.dhp.resulttoorganizationfrominstrepo.SparkResultToOrganizationFromIstRepoJob</class>
<jar>dhp-propagation-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCores}
@ -196,7 +196,7 @@
<master>yarn</master>
<mode>cluster</mode>
<name>resultToOrganizationFromInstRepoPropagationForDataset</name>
<class>eu.dnetlib.dhp.resulttoorganizationfrominstrepo.SparkResultToOrganizationFromIstRepoJob2</class>
<class>eu.dnetlib.dhp.resulttoorganizationfrominstrepo.SparkResultToOrganizationFromIstRepoJob</class>
<jar>dhp-propagation-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCores}
@ -225,7 +225,7 @@
<master>yarn</master>
<mode>cluster</mode>
<name>resultToOrganizationFromInstRepoPropagationForORP</name>
<class>eu.dnetlib.dhp.resulttoorganizationfrominstrepo.SparkResultToOrganizationFromIstRepoJob2</class>
<class>eu.dnetlib.dhp.resulttoorganizationfrominstrepo.SparkResultToOrganizationFromIstRepoJob</class>
<jar>dhp-propagation-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCores}
@ -255,7 +255,7 @@
<master>yarn</master>
<mode>cluster</mode>
<name>resultToOrganizationFromInstRepoPropagationForSoftware</name>
<class>eu.dnetlib.dhp.resulttoorganizationfrominstrepo.SparkResultToOrganizationFromIstRepoJob2</class>
<class>eu.dnetlib.dhp.resulttoorganizationfrominstrepo.SparkResultToOrganizationFromIstRepoJob</class>
<jar>dhp-propagation-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCores}

View File

@ -66,30 +66,25 @@ public class CountryPropagationJobTest {
@Test
public void testCountryPropagationSoftware() throws Exception {
SparkCountryPropagationJob2
final String sourcePath = getClass()
.getResource("/eu/dnetlib/dhp/countrypropagation/sample/software")
.getPath();
final String preparedInfoPath = getClass()
.getResource("/eu/dnetlib/dhp/countrypropagation/preparedInfo")
.getPath();
SparkCountryPropagationJob
.main(
new String[] {
"-isSparkSessionManaged",
Boolean.FALSE.toString(),
"-sourcePath",
getClass()
.getResource("/eu/dnetlib/dhp/countrypropagation/sample/software")
.getPath(),
"-hive_metastore_uris",
"",
"-saveGraph",
"true",
"-resultTableName",
"eu.dnetlib.dhp.schema.oaf.Software",
"-outputPath",
workingDir.toString() + "/software",
"-preparedInfoPath",
getClass()
.getResource("/eu/dnetlib/dhp/countrypropagation/preparedInfo")
.getPath(),
"--isSparkSessionManaged", Boolean.FALSE.toString(),
"--sourcePath", sourcePath,
"--hive_metastore_uris", "",
"-saveGraph", "true",
"-resultTableName", Software.class.getCanonicalName(),
"-outputPath", workingDir.toString() + "/software",
"-preparedInfoPath", preparedInfoPath
});
final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
JavaRDD<Software> tmp = sc
.textFile(workingDir.toString() + "/software")

View File

@ -65,33 +65,27 @@ public class OrcidPropagationJobTest {
@Test
public void noUpdateTest() throws Exception {
SparkOrcidToResultFromSemRelJob3
final String sourcePath = getClass()
.getResource("/eu/dnetlib/dhp/orcidtoresultfromsemrel/sample/noupdate")
.getPath();
final String possibleUpdatesPath = getClass()
.getResource(
"/eu/dnetlib/dhp/orcidtoresultfromsemrel/preparedInfo/mergedOrcidAssoc")
.getPath();
SparkOrcidToResultFromSemRelJob
.main(
new String[] {
"-isTest",
Boolean.TRUE.toString(),
"-isSparkSessionManaged",
Boolean.FALSE.toString(),
"-sourcePath",
getClass()
.getResource("/eu/dnetlib/dhp/orcidtoresultfromsemrel/sample/noupdate")
.getPath(),
"-hive_metastore_uris",
"",
"-saveGraph",
"true",
"-resultTableName",
"eu.dnetlib.dhp.schema.oaf.Dataset",
"-outputPath",
workingDir.toString() + "/dataset",
"-possibleUpdatesPath",
getClass()
.getResource(
"/eu/dnetlib/dhp/orcidtoresultfromsemrel/preparedInfo/mergedOrcidAssoc")
.getPath()
"-isTest", Boolean.TRUE.toString(),
"-isSparkSessionManaged", Boolean.FALSE.toString(),
"-sourcePath", sourcePath,
"-hive_metastore_uris", "",
"-saveGraph", "true",
"-resultTableName", Dataset.class.getCanonicalName(),
"-outputPath", workingDir.toString() + "/dataset",
"-possibleUpdatesPath", possibleUpdatesPath
});
final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
JavaRDD<Dataset> tmp = sc
.textFile(workingDir.toString() + "/dataset")
@ -117,7 +111,7 @@ public class OrcidPropagationJobTest {
@Test
public void oneUpdateTest() throws Exception {
SparkOrcidToResultFromSemRelJob3
SparkOrcidToResultFromSemRelJob
.main(
new String[] {
"-isTest",
@ -182,7 +176,7 @@ public class OrcidPropagationJobTest {
@Test
public void twoUpdatesTest() throws Exception {
SparkOrcidToResultFromSemRelJob3
SparkOrcidToResultFromSemRelJob
.main(
new String[] {
"-isTest",

View File

@ -72,7 +72,7 @@ public class ProjectPropagationJobTest {
@Test
public void NoUpdateTest() throws Exception {
SparkResultToProjectThroughSemRelJob3
SparkResultToProjectThroughSemRelJob
.main(
new String[] {
"-isTest",
@ -115,7 +115,7 @@ public class ProjectPropagationJobTest {
*/
@Test
public void UpdateTenTest() throws Exception {
SparkResultToProjectThroughSemRelJob3
SparkResultToProjectThroughSemRelJob
.main(
new String[] {
"-isTest",
@ -194,7 +194,7 @@ public class ProjectPropagationJobTest {
*/
@Test
public void UpdateMixTest() throws Exception {
SparkResultToProjectThroughSemRelJob3
SparkResultToProjectThroughSemRelJob
.main(
new String[] {
"-isTest",

View File

@ -67,8 +67,8 @@ public class ResultToCommunityJobTest {
}
@Test
public void test1() throws Exception {
SparkResultToCommunityFromOrganizationJob2
public void testSparkResultToCommunityFromOrganizationJob() throws Exception {
SparkResultToCommunityFromOrganizationJob
.main(
new String[] {
"-isTest",

View File

@ -78,7 +78,7 @@ public class ResultToCommunityJobTest {
@Test
public void test1() throws Exception {
SparkResultToCommunityThroughSemRelJob4
SparkResultToCommunityThroughSemRelJob
.main(
new String[] {
"-isTest", Boolean.TRUE.toString(),

View File

@ -39,11 +39,11 @@ public class Result2OrganizationJobTest {
public static void beforeAll() throws IOException {
workingDir = Files
.createTempDirectory(
SparkResultToOrganizationFromIstRepoJob2.class.getSimpleName());
SparkResultToOrganizationFromIstRepoJob.class.getSimpleName());
log.info("using work dir {}", workingDir);
SparkConf conf = new SparkConf();
conf.setAppName(SparkResultToOrganizationFromIstRepoJob2.class.getSimpleName());
conf.setAppName(SparkResultToOrganizationFromIstRepoJob.class.getSimpleName());
conf.setMaster("local[*]");
conf.set("spark.driver.host", "localhost");
@ -54,7 +54,7 @@ public class Result2OrganizationJobTest {
spark = SparkSession
.builder()
.appName(SparkResultToOrganizationFromIstRepoJob2.class.getSimpleName())
.appName(SparkResultToOrganizationFromIstRepoJob.class.getSimpleName())
.config(conf)
.getOrCreate();
}
@ -72,7 +72,7 @@ public class Result2OrganizationJobTest {
*/
@Test
public void NoUpdateTest() throws Exception {
SparkResultToOrganizationFromIstRepoJob2
SparkResultToOrganizationFromIstRepoJob
.main(
new String[] {
"-isTest",
@ -123,7 +123,7 @@ public class Result2OrganizationJobTest {
*/
@Test
public void UpdateNoMixTest() throws Exception {
SparkResultToOrganizationFromIstRepoJob2
SparkResultToOrganizationFromIstRepoJob
.main(
new String[] {
"-isTest",
@ -197,7 +197,7 @@ public class Result2OrganizationJobTest {
@Test
public void UpdateMixTest() throws Exception {
SparkResultToOrganizationFromIstRepoJob2
SparkResultToOrganizationFromIstRepoJob
.main(
new String[] {
"-isTest",