forked from D-Net/dnet-hadoop
[UnresolvedEntities] Changed the way to merge the unresolved because the new merge removed the dataInfo from the merged result. Added also data info for subjects
This commit is contained in:
parent
dce7f5fea8
commit
73eba34d42
|
@ -21,8 +21,10 @@ import org.slf4j.LoggerFactory;
|
|||
|
||||
import eu.dnetlib.dhp.actionmanager.createunresolvedentities.model.FOSDataModel;
|
||||
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
||||
import eu.dnetlib.dhp.schema.common.ModelConstants;
|
||||
import eu.dnetlib.dhp.schema.oaf.Result;
|
||||
import eu.dnetlib.dhp.schema.oaf.StructuredProperty;
|
||||
import eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils;
|
||||
import eu.dnetlib.dhp.utils.DHPUtils;
|
||||
|
||||
public class PrepareFOSSparkJob implements Serializable {
|
||||
|
@ -71,6 +73,7 @@ public class PrepareFOSSparkJob implements Serializable {
|
|||
Result r = new Result();
|
||||
FOSDataModel first = it.next();
|
||||
r.setId(DHPUtils.generateUnresolvedIdentifier(k, DOI));
|
||||
|
||||
HashSet<String> level1 = new HashSet<>();
|
||||
HashSet<String> level2 = new HashSet<>();
|
||||
HashSet<String> level3 = new HashSet<>();
|
||||
|
@ -81,6 +84,19 @@ public class PrepareFOSSparkJob implements Serializable {
|
|||
level2.forEach(l -> sbjs.add(getSubject(l, FOS_CLASS_ID, FOS_CLASS_NAME, UPDATE_SUBJECT_FOS_CLASS_ID)));
|
||||
level3.forEach(l -> sbjs.add(getSubject(l, FOS_CLASS_ID, FOS_CLASS_NAME, UPDATE_SUBJECT_FOS_CLASS_ID)));
|
||||
r.setSubject(sbjs);
|
||||
r
|
||||
.setDataInfo(
|
||||
OafMapperUtils
|
||||
.dataInfo(
|
||||
false, null, true,
|
||||
false,
|
||||
OafMapperUtils
|
||||
.qualifier(
|
||||
ModelConstants.PROVENANCE_ENRICH,
|
||||
null,
|
||||
ModelConstants.DNET_PROVENANCE_ACTIONS,
|
||||
ModelConstants.DNET_PROVENANCE_ACTIONS),
|
||||
null));
|
||||
return r;
|
||||
}, Encoders.bean(Result.class))
|
||||
.write()
|
||||
|
|
|
@ -21,8 +21,10 @@ import org.slf4j.LoggerFactory;
|
|||
|
||||
import eu.dnetlib.dhp.actionmanager.createunresolvedentities.model.SDGDataModel;
|
||||
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
||||
import eu.dnetlib.dhp.schema.common.ModelConstants;
|
||||
import eu.dnetlib.dhp.schema.oaf.Result;
|
||||
import eu.dnetlib.dhp.schema.oaf.StructuredProperty;
|
||||
import eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils;
|
||||
import eu.dnetlib.dhp.utils.DHPUtils;
|
||||
|
||||
public class PrepareSDGSparkJob implements Serializable {
|
||||
|
@ -78,6 +80,19 @@ public class PrepareSDGSparkJob implements Serializable {
|
|||
s -> sbjs
|
||||
.add(getSubject(s.getSbj(), SDG_CLASS_ID, SDG_CLASS_NAME, UPDATE_SUBJECT_SDG_CLASS_ID)));
|
||||
r.setSubject(sbjs);
|
||||
r
|
||||
.setDataInfo(
|
||||
OafMapperUtils
|
||||
.dataInfo(
|
||||
false, null, true,
|
||||
false,
|
||||
OafMapperUtils
|
||||
.qualifier(
|
||||
ModelConstants.PROVENANCE_ENRICH,
|
||||
null,
|
||||
ModelConstants.DNET_PROVENANCE_ACTIONS,
|
||||
ModelConstants.DNET_PROVENANCE_ACTIONS),
|
||||
null));
|
||||
return r;
|
||||
}, Encoders.bean(Result.class))
|
||||
.write()
|
||||
|
|
|
@ -5,6 +5,7 @@ import static eu.dnetlib.dhp.actionmanager.Constants.*;
|
|||
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
|
||||
|
||||
import java.io.Serializable;
|
||||
import java.util.Optional;
|
||||
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import org.apache.spark.SparkConf;
|
||||
|
@ -67,7 +68,19 @@ public class SparkSaveUnresolved implements Serializable {
|
|||
.groupByKey((MapFunction<Result, String>) Result::getId, Encoders.STRING())
|
||||
.mapGroups((MapGroupsFunction<String, Result, Result>) (k, it) -> {
|
||||
Result ret = it.next();
|
||||
it.forEachRemaining(r -> ret.mergeFrom(r));
|
||||
it.forEachRemaining(r -> {
|
||||
if (r.getInstance() != null) {
|
||||
ret.setInstance(r.getInstance());
|
||||
}
|
||||
if (r.getSubject() != null) {
|
||||
if (ret.getSubject() != null)
|
||||
ret.getSubject().addAll(r.getSubject());
|
||||
else
|
||||
ret.setSubject(r.getSubject());
|
||||
}
|
||||
|
||||
// ret.mergeFrom(r)
|
||||
});
|
||||
return ret;
|
||||
}, Encoders.bean(Result.class))
|
||||
.write()
|
||||
|
|
|
@ -146,6 +146,11 @@ public class PrepareTest {
|
|||
.get(0)
|
||||
.getValue());
|
||||
|
||||
final String doi2 = "unresolved::10.3390/s18072310::doi";
|
||||
|
||||
Assertions.assertEquals(1, tmp.filter(r -> r.getId().equals(doi2)).count());
|
||||
Assertions.assertEquals(1, tmp.filter(r -> r.getId().equals(doi2)).collect().get(0).getInstance().size());
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -259,59 +264,61 @@ public class PrepareTest {
|
|||
.collect()
|
||||
.contains("8. Economic growth"));
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
void test3() throws Exception {
|
||||
final String sourcePath = "/Users/miriam.baglioni/Downloads/doi_fos_results_20_12_2021.csv.gz";
|
||||
|
||||
final String outputPath = workingDir.toString() + "/fos.json";
|
||||
GetFOSSparkJob
|
||||
.main(
|
||||
new String[] {
|
||||
"--isSparkSessionManaged", Boolean.FALSE.toString(),
|
||||
"--sourcePath", sourcePath,
|
||||
|
||||
"-outputPath", outputPath
|
||||
|
||||
});
|
||||
|
||||
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
|
||||
|
||||
JavaRDD<FOSDataModel> tmp = sc
|
||||
.textFile(outputPath)
|
||||
.map(item -> OBJECT_MAPPER.readValue(item, FOSDataModel.class));
|
||||
|
||||
tmp.foreach(t -> Assertions.assertTrue(t.getDoi() != null));
|
||||
tmp.foreach(t -> Assertions.assertTrue(t.getLevel1() != null));
|
||||
tmp.foreach(t -> Assertions.assertTrue(t.getLevel2() != null));
|
||||
tmp.foreach(t -> Assertions.assertTrue(t.getLevel3() != null));
|
||||
Assertions.assertEquals(32, tmp.filter(row -> row.getDataInfo() != null).count());
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
void test4() throws Exception {
|
||||
final String sourcePath = "/Users/miriam.baglioni/Downloads/doi_sdg_results_20_12_21.csv.gz";
|
||||
|
||||
final String outputPath = workingDir.toString() + "/sdg.json";
|
||||
GetSDGSparkJob
|
||||
.main(
|
||||
new String[] {
|
||||
"--isSparkSessionManaged", Boolean.FALSE.toString(),
|
||||
"--sourcePath", sourcePath,
|
||||
|
||||
"-outputPath", outputPath
|
||||
|
||||
});
|
||||
|
||||
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
|
||||
|
||||
JavaRDD<SDGDataModel> tmp = sc
|
||||
.textFile(outputPath)
|
||||
.map(item -> OBJECT_MAPPER.readValue(item, SDGDataModel.class));
|
||||
|
||||
tmp.foreach(t -> Assertions.assertTrue(t.getDoi() != null));
|
||||
tmp.foreach(t -> Assertions.assertTrue(t.getSbj() != null));
|
||||
|
||||
}
|
||||
// @Test
|
||||
// void test3() throws Exception {
|
||||
// final String sourcePath = "/Users/miriam.baglioni/Downloads/doi_fos_results_20_12_2021.csv.gz";
|
||||
//
|
||||
// final String outputPath = workingDir.toString() + "/fos.json";
|
||||
// GetFOSSparkJob
|
||||
// .main(
|
||||
// new String[] {
|
||||
// "--isSparkSessionManaged", Boolean.FALSE.toString(),
|
||||
// "--sourcePath", sourcePath,
|
||||
//
|
||||
// "-outputPath", outputPath
|
||||
//
|
||||
// });
|
||||
//
|
||||
// final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
|
||||
//
|
||||
// JavaRDD<FOSDataModel> tmp = sc
|
||||
// .textFile(outputPath)
|
||||
// .map(item -> OBJECT_MAPPER.readValue(item, FOSDataModel.class));
|
||||
//
|
||||
// tmp.foreach(t -> Assertions.assertTrue(t.getDoi() != null));
|
||||
// tmp.foreach(t -> Assertions.assertTrue(t.getLevel1() != null));
|
||||
// tmp.foreach(t -> Assertions.assertTrue(t.getLevel2() != null));
|
||||
// tmp.foreach(t -> Assertions.assertTrue(t.getLevel3() != null));
|
||||
//
|
||||
// }
|
||||
//
|
||||
// @Test
|
||||
// void test4() throws Exception {
|
||||
// final String sourcePath = "/Users/miriam.baglioni/Downloads/doi_sdg_results_20_12_21.csv.gz";
|
||||
//
|
||||
// final String outputPath = workingDir.toString() + "/sdg.json";
|
||||
// GetSDGSparkJob
|
||||
// .main(
|
||||
// new String[] {
|
||||
// "--isSparkSessionManaged", Boolean.FALSE.toString(),
|
||||
// "--sourcePath", sourcePath,
|
||||
//
|
||||
// "-outputPath", outputPath
|
||||
//
|
||||
// });
|
||||
//
|
||||
// final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
|
||||
//
|
||||
// JavaRDD<SDGDataModel> tmp = sc
|
||||
// .textFile(outputPath)
|
||||
// .map(item -> OBJECT_MAPPER.readValue(item, SDGDataModel.class));
|
||||
//
|
||||
// tmp.foreach(t -> Assertions.assertTrue(t.getDoi() != null));
|
||||
// tmp.foreach(t -> Assertions.assertTrue(t.getSbj() != null));
|
||||
//
|
||||
// }
|
||||
}
|
||||
|
|
|
@ -196,6 +196,9 @@ public class ProduceTest {
|
|||
final String doi = "unresolved::10.3390/s18072310::doi";
|
||||
JavaRDD<Result> tmp = getResultJavaRDD();
|
||||
|
||||
tmp
|
||||
.filter(row -> row.getId().equals(doi))
|
||||
.foreach(r -> System.out.println(OBJECT_MAPPER.writeValueAsString(r)));
|
||||
Assertions
|
||||
.assertEquals(
|
||||
3, tmp
|
||||
|
|
Loading…
Reference in New Issue