1
0
Fork 0

code formatting

This commit is contained in:
Claudio Atzori 2023-10-06 12:31:17 +02:00
parent 73c49b8d26
commit eed9fe0902
3 changed files with 133 additions and 133 deletions

View File

@ -33,7 +33,7 @@ import scala.Tuple2;
public class GroupEntitiesSparkJob {
private static final Logger log = LoggerFactory.getLogger(GroupEntitiesSparkJob.class);
private static final Encoder<OafEntity> OAFENTITY_KRYO_ENC = Encoders.kryo(OafEntity.class);
private static final Encoder<OafEntity> OAFENTITY_KRYO_ENC = Encoders.kryo(OafEntity.class);
public static void main(String[] args) throws Exception {
@ -114,7 +114,7 @@ public class GroupEntitiesSparkJob {
Encoders.tuple(Encoders.STRING(), OAFENTITY_KRYO_ENC));
// pivot on "_1" (classname of the entity)
// created columns containing only entities of the same class
// created columns containing only entities of the same class
for (Map.Entry<EntityType, Class> e : ModelSupport.entityTypes.entrySet()) {
String entity = e.getKey().name();
Class<? extends OafEntity> entityClass = e.getValue();

View File

@ -67,60 +67,60 @@ public class SparkPropagateRelation extends AbstractSparkAction {
log.info("graphOutputPath: '{}'", graphOutputPath);
Dataset<Relation> mergeRels = spark
.read()
.load(DedupUtility.createMergeRelPath(workingPath, "*", "*"))
.as(REL_BEAN_ENC);
.read()
.load(DedupUtility.createMergeRelPath(workingPath, "*", "*"))
.as(REL_BEAN_ENC);
// <mergedObjectID, dedupID>
Dataset<Row> idsToMerge = mergeRels
.where(col("relClass").equalTo(ModelConstants.MERGES))
.select(col("source").as("dedupID"), col("target").as("mergedObjectID"))
.distinct();
.where(col("relClass").equalTo(ModelConstants.MERGES))
.select(col("source").as("dedupID"), col("target").as("mergedObjectID"))
.distinct();
Dataset<Row> allRels = spark
.read()
.schema(REL_BEAN_ENC.schema())
.json(graphBasePath + "/relation");
.read()
.schema(REL_BEAN_ENC.schema())
.json(graphBasePath + "/relation");
Dataset<Relation> dedupedRels = allRels
.joinWith(idsToMerge, allRels.col("source").equalTo(idsToMerge.col("mergedObjectID")), "left_outer")
.joinWith(idsToMerge, col("_1.target").equalTo(idsToMerge.col("mergedObjectID")), "left_outer")
.select("_1._1", "_1._2.dedupID", "_2.dedupID")
.as(Encoders.tuple(REL_BEAN_ENC, Encoders.STRING(), Encoders.STRING()))
.map((MapFunction<Tuple3<Relation, String, String>, Relation>) t -> {
Relation rel = t._1();
String newSource = t._2();
String newTarget = t._3();
.joinWith(idsToMerge, allRels.col("source").equalTo(idsToMerge.col("mergedObjectID")), "left_outer")
.joinWith(idsToMerge, col("_1.target").equalTo(idsToMerge.col("mergedObjectID")), "left_outer")
.select("_1._1", "_1._2.dedupID", "_2.dedupID")
.as(Encoders.tuple(REL_BEAN_ENC, Encoders.STRING(), Encoders.STRING()))
.map((MapFunction<Tuple3<Relation, String, String>, Relation>) t -> {
Relation rel = t._1();
String newSource = t._2();
String newTarget = t._3();
if (rel.getDataInfo() == null) {
rel.setDataInfo(new DataInfo());
}
if (rel.getDataInfo() == null) {
rel.setDataInfo(new DataInfo());
}
if (newSource != null || newTarget != null) {
rel.getDataInfo().setDeletedbyinference(false);
if (newSource != null || newTarget != null) {
rel.getDataInfo().setDeletedbyinference(false);
if (newSource != null)
rel.setSource(newSource);
if (newSource != null)
rel.setSource(newSource);
if (newTarget != null)
rel.setTarget(newTarget);
}
if (newTarget != null)
rel.setTarget(newTarget);
}
return rel;
}, REL_BEAN_ENC);
return rel;
}, REL_BEAN_ENC);
// ids of records that are both not deletedbyinference and not invisible
Dataset<Row> ids = validIds(spark, graphBasePath);
// filter relations that point to valid records, can force them to be visible
Dataset<Relation> cleanedRels = dedupedRels
.join(ids, col("source").equalTo(ids.col("id")), "leftsemi")
.join(ids, col("target").equalTo(ids.col("id")), "leftsemi")
.as(REL_BEAN_ENC)
.map((MapFunction<Relation, Relation>) r -> {
r.getDataInfo().setInvisible(false);
return r;
}, REL_KRYO_ENC);
.join(ids, col("source").equalTo(ids.col("id")), "leftsemi")
.join(ids, col("target").equalTo(ids.col("id")), "leftsemi")
.as(REL_BEAN_ENC)
.map((MapFunction<Relation, Relation>) r -> {
r.getDataInfo().setInvisible(false);
return r;
}, REL_KRYO_ENC);
Dataset<Relation> distinctRels = cleanedRels
.groupByKey(

View File

@ -1,14 +1,14 @@
package eu.dnetlib.dhp.oa.graph.group;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.common.HdfsSupport;
import eu.dnetlib.dhp.oa.merge.GroupEntitiesSparkJob;
import eu.dnetlib.dhp.schema.common.ModelSupport;
import eu.dnetlib.dhp.schema.oaf.OafEntity;
import eu.dnetlib.dhp.schema.oaf.Result;
import eu.dnetlib.dhp.utils.DHPUtils;
import static org.junit.jupiter.api.Assertions.assertEquals;
import java.io.IOException;
import java.net.URISyntaxException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import org.apache.commons.io.FileUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FilterFunction;
@ -18,108 +18,108 @@ import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SparkSession;
import org.junit.jupiter.api.*;
import java.io.IOException;
import java.net.URISyntaxException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import static org.junit.jupiter.api.Assertions.assertEquals;
import eu.dnetlib.dhp.common.HdfsSupport;
import eu.dnetlib.dhp.oa.merge.GroupEntitiesSparkJob;
import eu.dnetlib.dhp.schema.common.ModelSupport;
import eu.dnetlib.dhp.schema.oaf.OafEntity;
import eu.dnetlib.dhp.schema.oaf.Result;
import eu.dnetlib.dhp.utils.DHPUtils;
@TestMethodOrder(MethodOrderer.OrderAnnotation.class)
public class GroupEntitiesSparkJobTest {
private static SparkSession spark;
private static SparkSession spark;
private static ObjectMapper mapper = new ObjectMapper()
.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
private static ObjectMapper mapper = new ObjectMapper()
.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
private static Path workingDir;
private Path dataInputPath;
private static Path workingDir;
private Path dataInputPath;
private Path checkpointPath;
private Path checkpointPath;
private Path outputPath;
private Path outputPath;
@BeforeAll
public static void beforeAll() throws IOException {
workingDir = Files.createTempDirectory(GroupEntitiesSparkJob.class.getSimpleName());
@BeforeAll
public static void beforeAll() throws IOException {
workingDir = Files.createTempDirectory(GroupEntitiesSparkJob.class.getSimpleName());
SparkConf conf = new SparkConf();
conf.setAppName(GroupEntitiesSparkJob.class.getSimpleName());
conf.setMaster("local");
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
conf.registerKryoClasses(ModelSupport.getOafModelClasses());
spark = SparkSession.builder().config(conf).getOrCreate();
}
SparkConf conf = new SparkConf();
conf.setAppName(GroupEntitiesSparkJob.class.getSimpleName());
conf.setMaster("local");
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
conf.registerKryoClasses(ModelSupport.getOafModelClasses());
spark = SparkSession.builder().config(conf).getOrCreate();
}
@BeforeEach
public void beforeEach() throws IOException, URISyntaxException {
dataInputPath = Paths.get(ClassLoader.getSystemResource("eu/dnetlib/dhp/oa/graph/group").toURI());
checkpointPath = workingDir.resolve("grouped_entity");
outputPath = workingDir.resolve("dispatched_entity");
}
@BeforeEach
public void beforeEach() throws IOException, URISyntaxException {
dataInputPath = Paths.get(ClassLoader.getSystemResource("eu/dnetlib/dhp/oa/graph/group").toURI());
checkpointPath = workingDir.resolve("grouped_entity");
outputPath = workingDir.resolve("dispatched_entity");
}
@AfterAll
public static void afterAll() throws IOException {
spark.stop();
FileUtils.deleteDirectory(workingDir.toFile());
}
@AfterAll
public static void afterAll() throws IOException {
spark.stop();
FileUtils.deleteDirectory(workingDir.toFile());
}
@Test
@Order(1)
void testGroupEntities() throws Exception {
GroupEntitiesSparkJob.main(new String[]{
"-isSparkSessionManaged",
Boolean.FALSE.toString(),
"-graphInputPath",
dataInputPath.toString(),
"-checkpointPath",
checkpointPath.toString(),
"-outputPath",
outputPath.toString(),
"-filterInvisible",
Boolean.FALSE.toString()
});
@Test
@Order(1)
void testGroupEntities() throws Exception {
GroupEntitiesSparkJob.main(new String[] {
"-isSparkSessionManaged",
Boolean.FALSE.toString(),
"-graphInputPath",
dataInputPath.toString(),
"-checkpointPath",
checkpointPath.toString(),
"-outputPath",
outputPath.toString(),
"-filterInvisible",
Boolean.FALSE.toString()
});
Dataset<OafEntity> checkpointTable = spark
.read()
.load(checkpointPath.toString())
.selectExpr("COALESCE(*)")
.as(Encoders.kryo(OafEntity.class));
Dataset<OafEntity> checkpointTable = spark
.read()
.load(checkpointPath.toString())
.selectExpr("COALESCE(*)")
.as(Encoders.kryo(OafEntity.class));
assertEquals(
1,
checkpointTable
.filter(
(FilterFunction<OafEntity>) r -> "50|doi_________::09821844208a5cd6300b2bfb13bca1b9"
.equals(r.getId()) &&
r.getCollectedfrom().stream().anyMatch(kv -> kv.getValue().equalsIgnoreCase("zenodo")))
.count());
assertEquals(
1,
checkpointTable
.filter(
(FilterFunction<OafEntity>) r -> "50|doi_________::09821844208a5cd6300b2bfb13bca1b9"
.equals(r.getId()) &&
r.getCollectedfrom().stream().anyMatch(kv -> kv.getValue().equalsIgnoreCase("zenodo")))
.count());
Dataset<Result> output = spark
.read()
.textFile(
DHPUtils
.toSeq(
HdfsSupport
.listFiles(outputPath.toString(), spark.sparkContext().hadoopConfiguration())))
.map((MapFunction<String, Result>) s -> mapper.readValue(s, Result.class), Encoders.bean(Result.class));
Dataset<Result> output = spark
.read()
.textFile(
DHPUtils
.toSeq(
HdfsSupport
.listFiles(outputPath.toString(), spark.sparkContext().hadoopConfiguration())))
.map((MapFunction<String, Result>) s -> mapper.readValue(s, Result.class), Encoders.bean(Result.class));
assertEquals(3, output.count());
assertEquals(
2,
output
.map((MapFunction<Result, String>) r -> r.getResulttype().getClassid(), Encoders.STRING())
.filter((FilterFunction<String>) s -> s.equals("publication"))
.count());
assertEquals(
1,
output
.map((MapFunction<Result, String>) r -> r.getResulttype().getClassid(), Encoders.STRING())
.filter((FilterFunction<String>) s -> s.equals("dataset"))
.count());
}
assertEquals(3, output.count());
assertEquals(
2,
output
.map((MapFunction<Result, String>) r -> r.getResulttype().getClassid(), Encoders.STRING())
.filter((FilterFunction<String>) s -> s.equals("publication"))
.count());
assertEquals(
1,
output
.map((MapFunction<Result, String>) r -> r.getResulttype().getClassid(), Encoders.STRING())
.filter((FilterFunction<String>) s -> s.equals("dataset"))
.count());
}
}