1
0
Fork 0

code formatting

This commit is contained in:
Claudio Atzori 2023-11-23 16:33:24 +01:00
parent a0311e8a90
commit 1763d377ad
6 changed files with 274 additions and 275 deletions

View File

@ -18,7 +18,6 @@ package eu.dnetlib.pace.util;
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/*
* Diff Match and Patch
* Copyright 2018 The diff-match-patch Authors.

View File

@ -79,8 +79,8 @@ public class PrepareAffiliationRelationsTest {
.getPath();
String pubmedAffiliationRelationsPath = getClass()
.getResource("/eu/dnetlib/dhp/actionmanager/bipaffiliations/doi_to_ror.json")
.getPath();
.getResource("/eu/dnetlib/dhp/actionmanager/bipaffiliations/doi_to_ror.json")
.getPath();
String outputPath = workingDir.toString() + "/actionSet";

View File

@ -31,94 +31,94 @@ import scala.Tuple2;
public class PrepareResultCommunitySet {
private static final Logger log = LoggerFactory.getLogger(PrepareResultCommunitySet.class);
private static final Logger log = LoggerFactory.getLogger(PrepareResultCommunitySet.class);
public static void main(String[] args) throws Exception {
String jsonConfiguration = IOUtils
.toString(
PrepareResultCommunitySet.class
.getResourceAsStream(
"/eu/dnetlib/dhp/resulttocommunityfromproject/input_preparecommunitytoresult_parameters.json"));
public static void main(String[] args) throws Exception {
String jsonConfiguration = IOUtils
.toString(
PrepareResultCommunitySet.class
.getResourceAsStream(
"/eu/dnetlib/dhp/resulttocommunityfromproject/input_preparecommunitytoresult_parameters.json"));
final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
parser.parseArgument(args);
final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
parser.parseArgument(args);
Boolean isSparkSessionManaged = isSparkSessionManaged(parser);
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
Boolean isSparkSessionManaged = isSparkSessionManaged(parser);
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
String inputPath = parser.get("sourcePath");
log.info("inputPath: {}", inputPath);
String inputPath = parser.get("sourcePath");
log.info("inputPath: {}", inputPath);
final String outputPath = parser.get("outputPath");
log.info("outputPath: {}", outputPath);
final String outputPath = parser.get("outputPath");
log.info("outputPath: {}", outputPath);
final boolean production = Boolean.valueOf(parser.get("production"));
log.info("production: {}", production);
final boolean production = Boolean.valueOf(parser.get("production"));
log.info("production: {}", production);
final CommunityEntityMap projectsMap = Utils.getCommunityProjects(production);
// log.info("projectsMap: {}", new Gson().toJson(projectsMap));
final CommunityEntityMap projectsMap = Utils.getCommunityProjects(production);
// log.info("projectsMap: {}", new Gson().toJson(projectsMap));
SparkConf conf = new SparkConf();
SparkConf conf = new SparkConf();
runWithSparkSession(
conf,
isSparkSessionManaged,
spark -> {
removeOutputDir(spark, outputPath);
prepareInfo(spark, inputPath, outputPath, projectsMap);
});
}
runWithSparkSession(
conf,
isSparkSessionManaged,
spark -> {
removeOutputDir(spark, outputPath);
prepareInfo(spark, inputPath, outputPath, projectsMap);
});
}
private static void prepareInfo(
SparkSession spark,
String inputPath,
String outputPath,
CommunityEntityMap projectMap) {
private static void prepareInfo(
SparkSession spark,
String inputPath,
String outputPath,
CommunityEntityMap projectMap) {
final StructType structureSchema = new StructType()
.add(
"dataInfo", new StructType()
.add("deletedbyinference", DataTypes.BooleanType)
.add("invisible", DataTypes.BooleanType))
.add("source", DataTypes.StringType)
.add("target", DataTypes.StringType)
.add("relClass", DataTypes.StringType);
final StructType structureSchema = new StructType()
.add(
"dataInfo", new StructType()
.add("deletedbyinference", DataTypes.BooleanType)
.add("invisible", DataTypes.BooleanType))
.add("source", DataTypes.StringType)
.add("target", DataTypes.StringType)
.add("relClass", DataTypes.StringType);
spark
.read()
.schema(structureSchema)
.json(inputPath)
.filter(
"dataInfo.deletedbyinference != true " +
"and relClass == '" + ModelConstants.IS_PRODUCED_BY + "'")
.select(
new Column("source").as("resultId"),
new Column("target").as("projectId"))
.groupByKey((MapFunction<Row, String>) r -> (String) r.getAs("resultId"), Encoders.STRING())
.mapGroups((MapGroupsFunction<String, Row, ResultProjectList>) (k, v) -> {
ResultProjectList rpl = new ResultProjectList();
rpl.setResultId(k);
ArrayList<String> cl = new ArrayList<>();
cl.addAll(projectMap.get(v.next().getAs("projectId")));
v.forEachRemaining(r -> {
projectMap
.get(r.getAs("projectId"))
.forEach(c -> {
if (!cl.contains(c))
cl.add(c);
});
spark
.read()
.schema(structureSchema)
.json(inputPath)
.filter(
"dataInfo.deletedbyinference != true " +
"and relClass == '" + ModelConstants.IS_PRODUCED_BY + "'")
.select(
new Column("source").as("resultId"),
new Column("target").as("projectId"))
.groupByKey((MapFunction<Row, String>) r -> (String) r.getAs("resultId"), Encoders.STRING())
.mapGroups((MapGroupsFunction<String, Row, ResultProjectList>) (k, v) -> {
ResultProjectList rpl = new ResultProjectList();
rpl.setResultId(k);
ArrayList<String> cl = new ArrayList<>();
cl.addAll(projectMap.get(v.next().getAs("projectId")));
v.forEachRemaining(r -> {
projectMap
.get(r.getAs("projectId"))
.forEach(c -> {
if (!cl.contains(c))
cl.add(c);
});
});
if (cl.size() == 0)
return null;
rpl.setCommunityList(cl);
return rpl;
}, Encoders.bean(ResultProjectList.class))
.filter(Objects::nonNull)
.write()
.mode(SaveMode.Overwrite)
.option("compression", "gzip")
.json(outputPath);
}
});
if (cl.size() == 0)
return null;
rpl.setCommunityList(cl);
return rpl;
}, Encoders.bean(ResultProjectList.class))
.filter(Objects::nonNull)
.write()
.mode(SaveMode.Overwrite)
.option("compression", "gzip")
.json(outputPath);
}
}

View File

@ -5,22 +5,22 @@ import java.io.Serializable;
import java.util.ArrayList;
public class ResultProjectList implements Serializable {
private String resultId;
private ArrayList<String> communityList;
private String resultId;
private ArrayList<String> communityList;
public String getResultId() {
return resultId;
}
public String getResultId() {
return resultId;
}
public void setResultId(String resultId) {
this.resultId = resultId;
}
public void setResultId(String resultId) {
this.resultId = resultId;
}
public ArrayList<String> getCommunityList() {
return communityList;
}
public ArrayList<String> getCommunityList() {
return communityList;
}
public void setCommunityList(ArrayList<String> communityList) {
this.communityList = communityList;
}
public void setCommunityList(ArrayList<String> communityList) {
this.communityList = communityList;
}
}

View File

@ -37,127 +37,127 @@ import scala.Tuple2;
* @Date 11/10/23
*/
public class SparkResultToCommunityFromProject implements Serializable {
private static final Logger log = LoggerFactory.getLogger(SparkResultToCommunityFromProject.class);
private static final Logger log = LoggerFactory.getLogger(SparkResultToCommunityFromProject.class);
public static void main(String[] args) throws Exception {
String jsonConfiguration = IOUtils
.toString(
SparkResultToCommunityFromProject.class
.getResourceAsStream(
"/eu/dnetlib/dhp/resulttocommunityfromproject/input_communitytoresult_parameters.json"));
public static void main(String[] args) throws Exception {
String jsonConfiguration = IOUtils
.toString(
SparkResultToCommunityFromProject.class
.getResourceAsStream(
"/eu/dnetlib/dhp/resulttocommunityfromproject/input_communitytoresult_parameters.json"));
final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
parser.parseArgument(args);
parser.parseArgument(args);
Boolean isSparkSessionManaged = isSparkSessionManaged(parser);
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
Boolean isSparkSessionManaged = isSparkSessionManaged(parser);
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
String inputPath = parser.get("sourcePath");
log.info("inputPath: {}", inputPath);
String inputPath = parser.get("sourcePath");
log.info("inputPath: {}", inputPath);
final String outputPath = parser.get("outputPath");
log.info("outputPath: {}", outputPath);
final String outputPath = parser.get("outputPath");
log.info("outputPath: {}", outputPath);
final String possibleupdatespath = parser.get("preparedInfoPath");
log.info("preparedInfoPath: {}", possibleupdatespath);
final String possibleupdatespath = parser.get("preparedInfoPath");
log.info("preparedInfoPath: {}", possibleupdatespath);
SparkConf conf = new SparkConf();
SparkConf conf = new SparkConf();
runWithSparkSession(
conf,
isSparkSessionManaged,
spark -> {
runWithSparkSession(
conf,
isSparkSessionManaged,
spark -> {
execPropagation(spark, inputPath, outputPath, possibleupdatespath);
execPropagation(spark, inputPath, outputPath, possibleupdatespath);
});
}
});
}
private static <R extends Result> void execPropagation(
SparkSession spark,
String inputPath,
String outputPath,
private static <R extends Result> void execPropagation(
SparkSession spark,
String inputPath,
String outputPath,
String possibleUpdatesPath) {
String possibleUpdatesPath) {
Dataset<ResultProjectList> possibleUpdates = readPath(spark, possibleUpdatesPath, ResultProjectList.class);
Dataset<ResultProjectList> possibleUpdates = readPath(spark, possibleUpdatesPath, ResultProjectList.class);
ModelSupport.entityTypes
.keySet()
.parallelStream()
.forEach(e -> {
if (ModelSupport.isResult(e)) {
removeOutputDir(spark, outputPath + e.name());
Class<R> resultClazz = ModelSupport.entityTypes.get(e);
Dataset<R> result = readPath(spark, inputPath + e.name(), resultClazz);
ModelSupport.entityTypes
.keySet()
.parallelStream()
.forEach(e -> {
if (ModelSupport.isResult(e)) {
removeOutputDir(spark, outputPath + e.name());
Class<R> resultClazz = ModelSupport.entityTypes.get(e);
Dataset<R> result = readPath(spark, inputPath + e.name(), resultClazz);
result
.joinWith(
possibleUpdates,
result.col("id").equalTo(possibleUpdates.col("resultId")),
"left_outer")
.map(resultCommunityFn(), Encoders.bean(resultClazz))
.write()
.mode(SaveMode.Overwrite)
.option("compression", "gzip")
.json(outputPath + e.name());
}
});
result
.joinWith(
possibleUpdates,
result.col("id").equalTo(possibleUpdates.col("resultId")),
"left_outer")
.map(resultCommunityFn(), Encoders.bean(resultClazz))
.write()
.mode(SaveMode.Overwrite)
.option("compression", "gzip")
.json(outputPath + e.name());
}
});
}
}
private static <R extends Result> MapFunction<Tuple2<R, ResultProjectList>, R> resultCommunityFn() {
return value -> {
R ret = value._1();
Optional<ResultProjectList> rcl = Optional.ofNullable(value._2());
if (rcl.isPresent()) {
// ArrayList<String> communitySet = rcl.get().getCommunityList();
List<String> contextList = ret
.getContext()
.stream()
.map(Context::getId)
.collect(Collectors.toList());
private static <R extends Result> MapFunction<Tuple2<R, ResultProjectList>, R> resultCommunityFn() {
return value -> {
R ret = value._1();
Optional<ResultProjectList> rcl = Optional.ofNullable(value._2());
if (rcl.isPresent()) {
// ArrayList<String> communitySet = rcl.get().getCommunityList();
List<String> contextList = ret
.getContext()
.stream()
.map(Context::getId)
.collect(Collectors.toList());
@SuppressWarnings("unchecked")
R res = (R) ret.getClass().newInstance();
@SuppressWarnings("unchecked")
R res = (R) ret.getClass().newInstance();
res.setId(ret.getId());
List<Context> propagatedContexts = new ArrayList<>();
for (String cId : rcl.get().getCommunityList()) {
if (!contextList.contains(cId)) {
Context newContext = new Context();
newContext.setId(cId);
newContext
.setDataInfo(
Arrays
.asList(
getDataInfo(
PROPAGATION_DATA_INFO_TYPE,
PROPAGATION_RESULT_COMMUNITY_PROJECT_CLASS_ID,
PROPAGATION_RESULT_COMMUNITY_PROJECT_CLASS_NAME,
ModelConstants.DNET_PROVENANCE_ACTIONS)));
propagatedContexts.add(newContext);
} else {
ret
.getContext()
.stream()
.filter(c -> c.getId().equals(cId))
.findFirst()
.get()
.getDataInfo()
.add(
getDataInfo(
PROPAGATION_DATA_INFO_TYPE,
PROPAGATION_RESULT_COMMUNITY_PROJECT_CLASS_ID,
PROPAGATION_RESULT_COMMUNITY_PROJECT_CLASS_NAME,
ModelConstants.DNET_PROVENANCE_ACTIONS));
}
}
res.setContext(propagatedContexts);
ret.mergeFrom(res);
}
return ret;
};
}
res.setId(ret.getId());
List<Context> propagatedContexts = new ArrayList<>();
for (String cId : rcl.get().getCommunityList()) {
if (!contextList.contains(cId)) {
Context newContext = new Context();
newContext.setId(cId);
newContext
.setDataInfo(
Arrays
.asList(
getDataInfo(
PROPAGATION_DATA_INFO_TYPE,
PROPAGATION_RESULT_COMMUNITY_PROJECT_CLASS_ID,
PROPAGATION_RESULT_COMMUNITY_PROJECT_CLASS_NAME,
ModelConstants.DNET_PROVENANCE_ACTIONS)));
propagatedContexts.add(newContext);
} else {
ret
.getContext()
.stream()
.filter(c -> c.getId().equals(cId))
.findFirst()
.get()
.getDataInfo()
.add(
getDataInfo(
PROPAGATION_DATA_INFO_TYPE,
PROPAGATION_RESULT_COMMUNITY_PROJECT_CLASS_ID,
PROPAGATION_RESULT_COMMUNITY_PROJECT_CLASS_NAME,
ModelConstants.DNET_PROVENANCE_ACTIONS));
}
}
res.setContext(propagatedContexts);
ret.mergeFrom(res);
}
return ret;
};
}
}

View File

@ -31,103 +31,103 @@ import eu.dnetlib.dhp.schema.oaf.Dataset;
public class ResultToCommunityJobTest {
private static final Logger log = LoggerFactory.getLogger(ResultToCommunityJobTest.class);
private static final Logger log = LoggerFactory.getLogger(ResultToCommunityJobTest.class);
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
private static SparkSession spark;
private static SparkSession spark;
private static Path workingDir;
private static Path workingDir;
@BeforeAll
public static void beforeAll() throws IOException {
workingDir = Files.createTempDirectory(ResultToCommunityJobTest.class.getSimpleName());
log.info("using work dir {}", workingDir);
@BeforeAll
public static void beforeAll() throws IOException {
workingDir = Files.createTempDirectory(ResultToCommunityJobTest.class.getSimpleName());
log.info("using work dir {}", workingDir);
SparkConf conf = new SparkConf();
conf.setAppName(ResultToCommunityJobTest.class.getSimpleName());
SparkConf conf = new SparkConf();
conf.setAppName(ResultToCommunityJobTest.class.getSimpleName());
conf.setMaster("local[*]");
conf.set("spark.driver.host", "localhost");
conf.set("hive.metastore.local", "true");
conf.set("spark.ui.enabled", "false");
conf.set("spark.sql.warehouse.dir", workingDir.toString());
conf.set("hive.metastore.warehouse.dir", workingDir.resolve("warehouse").toString());
conf.setMaster("local[*]");
conf.set("spark.driver.host", "localhost");
conf.set("hive.metastore.local", "true");
conf.set("spark.ui.enabled", "false");
conf.set("spark.sql.warehouse.dir", workingDir.toString());
conf.set("hive.metastore.warehouse.dir", workingDir.resolve("warehouse").toString());
spark = SparkSession
.builder()
.appName(OrcidPropagationJobTest.class.getSimpleName())
.config(conf)
.getOrCreate();
}
spark = SparkSession
.builder()
.appName(OrcidPropagationJobTest.class.getSimpleName())
.config(conf)
.getOrCreate();
}
@AfterAll
public static void afterAll() throws IOException {
FileUtils.deleteDirectory(workingDir.toFile());
spark.stop();
}
@AfterAll
public static void afterAll() throws IOException {
FileUtils.deleteDirectory(workingDir.toFile());
spark.stop();
}
@Test
void testSparkResultToCommunityFromProjectJob() throws Exception {
final String preparedInfoPath = getClass()
.getResource("/eu/dnetlib/dhp/resulttocommunityfromproject/preparedInfo")
.getPath();
SparkResultToCommunityFromProject
.main(
new String[] {
@Test
void testSparkResultToCommunityFromProjectJob() throws Exception {
final String preparedInfoPath = getClass()
.getResource("/eu/dnetlib/dhp/resulttocommunityfromproject/preparedInfo")
.getPath();
SparkResultToCommunityFromProject
.main(
new String[] {
"-isSparkSessionManaged", Boolean.FALSE.toString(),
"-sourcePath", getClass()
.getResource("/eu/dnetlib/dhp/resulttocommunityfromproject/sample/")
.getPath(),
"-isSparkSessionManaged", Boolean.FALSE.toString(),
"-sourcePath", getClass()
.getResource("/eu/dnetlib/dhp/resulttocommunityfromproject/sample/")
.getPath(),
"-outputPath", workingDir.toString() + "/",
"-preparedInfoPath", preparedInfoPath
});
"-outputPath", workingDir.toString() + "/",
"-preparedInfoPath", preparedInfoPath
});
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
JavaRDD<Dataset> tmp = sc
.textFile(workingDir.toString() + "/dataset")
.map(item -> OBJECT_MAPPER.readValue(item, Dataset.class));
JavaRDD<Dataset> tmp = sc
.textFile(workingDir.toString() + "/dataset")
.map(item -> OBJECT_MAPPER.readValue(item, Dataset.class));
Assertions.assertEquals(10, tmp.count());
/**
* {"resultId":"50|57a035e5b1ae::d5be548ca7ae489d762f893be67af52f","communityList":["aurora"]}
* {"resultId":"50|57a035e5b1ae::a77232ffca9115fcad51c3503dbc7e3e","communityList":["aurora"]}
* {"resultId":"50|57a035e5b1ae::803aaad4decab7e27cd4b52a1931b3a1","communityList":["sdsn-gr"]}
* {"resultId":"50|57a035e5b1ae::a02e9e4087bca50687731ae5c765b5e1","communityList":["netherlands"]}
*/
List<Context> context = tmp
.filter(r -> r.getId().equals("50|57a035e5b1ae::d5be548ca7ae489d762f893be67af52f"))
.first()
.getContext();
Assertions.assertTrue(context.stream().anyMatch(c -> containsResultCommunityProject(c)));
Assertions.assertEquals(10, tmp.count());
/**
* {"resultId":"50|57a035e5b1ae::d5be548ca7ae489d762f893be67af52f","communityList":["aurora"]}
* {"resultId":"50|57a035e5b1ae::a77232ffca9115fcad51c3503dbc7e3e","communityList":["aurora"]}
* {"resultId":"50|57a035e5b1ae::803aaad4decab7e27cd4b52a1931b3a1","communityList":["sdsn-gr"]}
* {"resultId":"50|57a035e5b1ae::a02e9e4087bca50687731ae5c765b5e1","communityList":["netherlands"]}
*/
List<Context> context = tmp
.filter(r -> r.getId().equals("50|57a035e5b1ae::d5be548ca7ae489d762f893be67af52f"))
.first()
.getContext();
Assertions.assertTrue(context.stream().anyMatch(c -> containsResultCommunityProject(c)));
context = tmp
.filter(r -> r.getId().equals("50|57a035e5b1ae::a77232ffca9115fcad51c3503dbc7e3e"))
.first()
.getContext();
Assertions.assertTrue(context.stream().anyMatch(c -> containsResultCommunityProject(c)));
context = tmp
.filter(r -> r.getId().equals("50|57a035e5b1ae::a77232ffca9115fcad51c3503dbc7e3e"))
.first()
.getContext();
Assertions.assertTrue(context.stream().anyMatch(c -> containsResultCommunityProject(c)));
Assertions
.assertEquals(
0, tmp.filter(r -> r.getId().equals("50|57a035e5b1ae::803aaad4decab7e27cd4b52a1931b3a1")).count());
Assertions
.assertEquals(
0, tmp.filter(r -> r.getId().equals("50|57a035e5b1ae::803aaad4decab7e27cd4b52a1931b3a1")).count());
Assertions
.assertEquals(
0, tmp.filter(r -> r.getId().equals("50|57a035e5b1ae::a02e9e4087bca50687731ae5c765b5e1")).count());
Assertions
.assertEquals(
0, tmp.filter(r -> r.getId().equals("50|57a035e5b1ae::a02e9e4087bca50687731ae5c765b5e1")).count());
Assertions
.assertEquals(
2, tmp.filter(r -> r.getContext().stream().anyMatch(c -> c.getId().equals("aurora"))).count());
Assertions
.assertEquals(
2, tmp.filter(r -> r.getContext().stream().anyMatch(c -> c.getId().equals("aurora"))).count());
}
}
private static boolean containsResultCommunityProject(Context c) {
return c
.getDataInfo()
.stream()
.anyMatch(di -> di.getProvenanceaction().getClassid().equals("result:community:project"));
}
private static boolean containsResultCommunityProject(Context c) {
return c
.getDataInfo()
.stream()
.anyMatch(di -> di.getProvenanceaction().getClassid().equals("result:community:project"));
}
}