forked from D-Net/dnet-hadoop
adopting dhp-schemas:8.0.1 to support Auhtor's rawAffiliationString(s). Improved graph2hive implementation
This commit is contained in:
parent
d5867a1992
commit
62ff843334
|
@ -112,7 +112,7 @@ public class SparkAtomicActionUsageJob implements Serializable {
|
||||||
.joinWith(datasource, resultModel.col("datasourceId").equalTo(datasource.col("id")), "left")
|
.joinWith(datasource, resultModel.col("datasourceId").equalTo(datasource.col("id")), "left")
|
||||||
.map((MapFunction<Tuple2<UsageStatsResultModel, Datasource>, UsageStatsResultModel>) t2 -> {
|
.map((MapFunction<Tuple2<UsageStatsResultModel, Datasource>, UsageStatsResultModel>) t2 -> {
|
||||||
UsageStatsResultModel usrm = t2._1();
|
UsageStatsResultModel usrm = t2._1();
|
||||||
if(Optional.ofNullable(t2._2()).isPresent())
|
if (Optional.ofNullable(t2._2()).isPresent())
|
||||||
usrm.setDatasourceId(usrm.getDatasourceId() + "||" + t2._2().getOfficialname().getValue());
|
usrm.setDatasourceId(usrm.getDatasourceId() + "||" + t2._2().getOfficialname().getValue());
|
||||||
else
|
else
|
||||||
usrm.setDatasourceId(usrm.getDatasourceId() + "||NO_MATCH_FOUND");
|
usrm.setDatasourceId(usrm.getDatasourceId() + "||NO_MATCH_FOUND");
|
||||||
|
|
|
@ -407,10 +407,9 @@ object DataciteToOAFTransformation {
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
if (c.affiliation.isDefined)
|
if (c.affiliation.isDefined)
|
||||||
a.setAffiliation(
|
a.setRawAffiliationString(
|
||||||
c.affiliation.get
|
c.affiliation.get
|
||||||
.filter(af => af.nonEmpty)
|
.filter(af => af.nonEmpty)
|
||||||
.map(af => OafMapperUtils.field(af, dataInfo))
|
|
||||||
.asJava
|
.asJava
|
||||||
)
|
)
|
||||||
a.setRank(idx + 1)
|
a.setRank(idx + 1)
|
||||||
|
|
|
@ -313,7 +313,7 @@ case object ConversionUtil {
|
||||||
if (f.author.DisplayName.isDefined)
|
if (f.author.DisplayName.isDefined)
|
||||||
a.setFullname(f.author.DisplayName.get)
|
a.setFullname(f.author.DisplayName.get)
|
||||||
if (f.affiliation != null)
|
if (f.affiliation != null)
|
||||||
a.setAffiliation(List(asField(f.affiliation)).asJava)
|
a.setRawAffiliationString(List(f.affiliation).asJava)
|
||||||
a.setPid(
|
a.setPid(
|
||||||
List(
|
List(
|
||||||
createSP(
|
createSP(
|
||||||
|
@ -386,7 +386,7 @@ case object ConversionUtil {
|
||||||
a.setFullname(f.author.DisplayName.get)
|
a.setFullname(f.author.DisplayName.get)
|
||||||
|
|
||||||
if (f.affiliation != null)
|
if (f.affiliation != null)
|
||||||
a.setAffiliation(List(asField(f.affiliation)).asJava)
|
a.setRawAffiliationString(List(f.affiliation).asJava)
|
||||||
|
|
||||||
a.setPid(
|
a.setPid(
|
||||||
List(
|
List(
|
||||||
|
|
|
@ -9,10 +9,7 @@ import java.util.Optional;
|
||||||
import org.apache.commons.io.IOUtils;
|
import org.apache.commons.io.IOUtils;
|
||||||
import org.apache.spark.SparkConf;
|
import org.apache.spark.SparkConf;
|
||||||
import org.apache.spark.api.java.function.MapFunction;
|
import org.apache.spark.api.java.function.MapFunction;
|
||||||
import org.apache.spark.sql.Dataset;
|
import org.apache.spark.sql.*;
|
||||||
import org.apache.spark.sql.Encoders;
|
|
||||||
import org.apache.spark.sql.SaveMode;
|
|
||||||
import org.apache.spark.sql.SparkSession;
|
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
@ -25,8 +22,6 @@ public class GraphHiveTableImporterJob {
|
||||||
|
|
||||||
private static final Logger log = LoggerFactory.getLogger(GraphHiveTableImporterJob.class);
|
private static final Logger log = LoggerFactory.getLogger(GraphHiveTableImporterJob.class);
|
||||||
|
|
||||||
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
|
|
||||||
|
|
||||||
public static void main(String[] args) throws Exception {
|
public static void main(String[] args) throws Exception {
|
||||||
|
|
||||||
final ArgumentApplicationParser parser = new ArgumentApplicationParser(
|
final ArgumentApplicationParser parser = new ArgumentApplicationParser(
|
||||||
|
@ -74,7 +69,12 @@ public class GraphHiveTableImporterJob {
|
||||||
private static <T extends Oaf> void loadGraphTable(SparkSession spark, String inputPath, String hiveDbName,
|
private static <T extends Oaf> void loadGraphTable(SparkSession spark, String inputPath, String hiveDbName,
|
||||||
Class<T> clazz, int numPartitions) {
|
Class<T> clazz, int numPartitions) {
|
||||||
|
|
||||||
Dataset<String> dataset = spark.read().textFile(inputPath);
|
final Encoder<T> clazzEncoder = Encoders.bean(clazz);
|
||||||
|
|
||||||
|
Dataset<Row> dataset = spark
|
||||||
|
.read()
|
||||||
|
.schema(clazzEncoder.schema())
|
||||||
|
.json(inputPath);
|
||||||
|
|
||||||
if (numPartitions > 0) {
|
if (numPartitions > 0) {
|
||||||
log.info("repartitioning {} to {} partitions", clazz.getSimpleName(), numPartitions);
|
log.info("repartitioning {} to {} partitions", clazz.getSimpleName(), numPartitions);
|
||||||
|
@ -82,7 +82,6 @@ public class GraphHiveTableImporterJob {
|
||||||
}
|
}
|
||||||
|
|
||||||
dataset
|
dataset
|
||||||
.map((MapFunction<String, T>) s -> OBJECT_MAPPER.readValue(s, clazz), Encoders.bean(clazz))
|
|
||||||
.write()
|
.write()
|
||||||
.mode(SaveMode.Overwrite)
|
.mode(SaveMode.Overwrite)
|
||||||
.saveAsTable(tableIdentifier(hiveDbName, clazz));
|
.saveAsTable(tableIdentifier(hiveDbName, clazz));
|
||||||
|
|
|
@ -94,7 +94,7 @@ public class OdfToOafMapper extends AbstractMdRecordToOafMapper {
|
||||||
author.setFullname(String.format("%s, %s", author.getSurname(), author.getName()));
|
author.setFullname(String.format("%s, %s", author.getSurname(), author.getName()));
|
||||||
}
|
}
|
||||||
|
|
||||||
author.setAffiliation(prepareListFields(n, "./*[local-name()='affiliation']", info));
|
author.setRawAffiliationString(prepareListString(n, "./*[local-name()='affiliation']"));
|
||||||
author.setPid(preparePids(n, info));
|
author.setPid(preparePids(n, info));
|
||||||
author.setRank(pos++);
|
author.setRank(pos++);
|
||||||
res.add(author);
|
res.add(author);
|
||||||
|
|
|
@ -73,14 +73,10 @@ public class GraphHiveImporterJobTest {
|
||||||
GraphHiveImporterJob
|
GraphHiveImporterJob
|
||||||
.main(
|
.main(
|
||||||
new String[] {
|
new String[] {
|
||||||
"-isSparkSessionManaged",
|
"--isSparkSessionManaged", Boolean.FALSE.toString(),
|
||||||
Boolean.FALSE.toString(),
|
"--inputPath", getClass().getResource("/eu/dnetlib/dhp/oa/graph/sample").getPath(),
|
||||||
"-inputPath",
|
"--hiveMetastoreUris", "",
|
||||||
getClass().getResource("/eu/dnetlib/dhp/oa/graph/sample").getPath(),
|
"--hiveDbName", dbName
|
||||||
"-hiveMetastoreUris",
|
|
||||||
"",
|
|
||||||
"-hiveDbName",
|
|
||||||
dbName
|
|
||||||
});
|
});
|
||||||
|
|
||||||
ModelSupport.oafTypes
|
ModelSupport.oafTypes
|
||||||
|
|
|
@ -406,15 +406,15 @@ class MappersTest {
|
||||||
assertEquals("Baracchini", author.get().getSurname());
|
assertEquals("Baracchini", author.get().getSurname());
|
||||||
assertEquals("Theo", author.get().getName());
|
assertEquals("Theo", author.get().getName());
|
||||||
|
|
||||||
assertEquals(1, author.get().getAffiliation().size());
|
assertEquals(1, author.get().getRawAffiliationString().size());
|
||||||
final Optional<Field<String>> opAff = author
|
final Optional<String> opAff = author
|
||||||
.get()
|
.get()
|
||||||
.getAffiliation()
|
.getRawAffiliationString()
|
||||||
.stream()
|
.stream()
|
||||||
.findFirst();
|
.findFirst();
|
||||||
assertTrue(opAff.isPresent());
|
assertTrue(opAff.isPresent());
|
||||||
final Field<String> affiliation = opAff.get();
|
final String affiliation = opAff.get();
|
||||||
assertEquals("ISTI-CNR", affiliation.getValue());
|
assertEquals("ISTI-CNR", affiliation);
|
||||||
|
|
||||||
assertFalse(d.getSubject().isEmpty());
|
assertFalse(d.getSubject().isEmpty());
|
||||||
assertFalse(d.getInstance().isEmpty());
|
assertFalse(d.getInstance().isEmpty());
|
||||||
|
|
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
2
pom.xml
2
pom.xml
|
@ -937,7 +937,7 @@
|
||||||
<commons.logging.version>1.1.3</commons.logging.version>
|
<commons.logging.version>1.1.3</commons.logging.version>
|
||||||
<commons-validator.version>1.7</commons-validator.version>
|
<commons-validator.version>1.7</commons-validator.version>
|
||||||
<dateparser.version>1.0.7</dateparser.version>
|
<dateparser.version>1.0.7</dateparser.version>
|
||||||
<dhp-schemas.version>[7.0.1]</dhp-schemas.version>
|
<dhp-schemas.version>[8.0.1]</dhp-schemas.version>
|
||||||
<dhp.cdh.version>cdh5.9.2</dhp.cdh.version>
|
<dhp.cdh.version>cdh5.9.2</dhp.cdh.version>
|
||||||
<dhp.commons.lang.version>3.5</dhp.commons.lang.version>
|
<dhp.commons.lang.version>3.5</dhp.commons.lang.version>
|
||||||
<dhp.guava.version>11.0.2</dhp.guava.version>
|
<dhp.guava.version>11.0.2</dhp.guava.version>
|
||||||
|
|
Loading…
Reference in New Issue