forked from D-Net/dnet-hadoop
wip: large refactoring
This commit is contained in:
parent
934c1846f8
commit
d04610480a
|
@ -120,7 +120,7 @@ public class GroupEntitiesSparkJob {
|
|||
|
||||
private Entity mergeAndGet(Entity b, Entity a) {
|
||||
if (Objects.nonNull(a) && Objects.nonNull(b)) {
|
||||
return MergeUtils.merge(b, a);
|
||||
return MergeUtils.merge(b, a, true);
|
||||
}
|
||||
return Objects.isNull(a) ? b : a;
|
||||
}
|
||||
|
|
|
@ -21,8 +21,12 @@ import eu.dnetlib.dhp.schema.oaf.common.ModelSupport;
|
|||
public class MergeUtils {
|
||||
|
||||
public static <T extends Oaf> T merge(final T left, final T right) {
|
||||
return merge(left, right, false);
|
||||
}
|
||||
|
||||
public static <T extends Oaf> T merge(final T left, final T right, boolean checkDelegatedAuthority) {
|
||||
if (sameClass(left, right, Entity.class)) {
|
||||
return mergeEntities(left, right);
|
||||
return mergeEntities(left, right, checkDelegatedAuthority);
|
||||
} else if (sameClass(left, right, Relation.class)) {
|
||||
return mergeRelation(left, right);
|
||||
} else {
|
||||
|
@ -34,9 +38,9 @@ public class MergeUtils {
|
|||
}
|
||||
}
|
||||
|
||||
private static <T extends Oaf> T mergeEntities(T left, T right) {
|
||||
private static <T extends Oaf> T mergeEntities(T left, T right, boolean checkDelegatedAuthority) {
|
||||
if (sameClass(left, right, Result.class)) {
|
||||
if (!left.getClass().equals(right.getClass())) {
|
||||
if (!left.getClass().equals(right.getClass()) || checkDelegatedAuthority) {
|
||||
return mergeResultsOfDifferentTypes(left, right);
|
||||
}
|
||||
return mergeResult(left, right);
|
||||
|
@ -265,16 +269,16 @@ public class MergeUtils {
|
|||
if (enrich.getOaiprovenance() != null && trustCompareResult < 0)
|
||||
mergedResult.setOaiprovenance(enrich.getOaiprovenance());
|
||||
|
||||
if (isSubClass(mergedResult, Publication.class)) {
|
||||
if (sameClass(mergedResult, enrich, Publication.class)) {
|
||||
return (T) mergePublication(mergedResult, enrich);
|
||||
}
|
||||
if (isSubClass(mergedResult, Dataset.class)) {
|
||||
if (sameClass(mergedResult, enrich, Dataset.class)) {
|
||||
return (T) mergeDataset(mergedResult, enrich);
|
||||
}
|
||||
if (isSubClass(mergedResult, OtherResearchProduct.class)) {
|
||||
if (sameClass(mergedResult, enrich, OtherResearchProduct.class)) {
|
||||
return (T) mergeORP(mergedResult, enrich);
|
||||
}
|
||||
if (isSubClass(mergedResult, Software.class)) {
|
||||
if (sameClass(mergedResult, enrich, Software.class)) {
|
||||
return (T) mergeSoftware(mergedResult, enrich);
|
||||
}
|
||||
|
||||
|
@ -888,11 +892,11 @@ public class MergeUtils {
|
|||
.compare(
|
||||
Optional
|
||||
.ofNullable(a.getDataInfo())
|
||||
.map(DataInfo::getTrust)
|
||||
.map(EntityDataInfo::getTrust)
|
||||
.orElse(0f),
|
||||
Optional
|
||||
.ofNullable(b.getDataInfo())
|
||||
.map(DataInfo::getTrust)
|
||||
.map(EntityDataInfo::getTrust)
|
||||
.orElse(0f));
|
||||
}
|
||||
|
||||
|
|
|
@ -363,7 +363,7 @@ public class OafMapperUtils {
|
|||
final Entity entity,
|
||||
final String validationDate) {
|
||||
|
||||
final List<Provenance> provenance = getProvenance(entity.getCollectedfrom(), entity.getDataInfo());
|
||||
final List<Provenance> provenance = getProvenance(entity.getCollectedfrom(), fromEntityDataInfo(entity.getDataInfo()));
|
||||
return getRelation(
|
||||
source, target, relType, subRelType, relClass, provenance, validationDate, null);
|
||||
}
|
||||
|
@ -434,4 +434,13 @@ public class OafMapperUtils {
|
|||
.orElse(""))
|
||||
.orElse("");
|
||||
}
|
||||
|
||||
public static DataInfo fromEntityDataInfo(EntityDataInfo entityDataInfo) {
|
||||
DataInfo dataInfo = new DataInfo();
|
||||
dataInfo.setTrust(entityDataInfo.getTrust());
|
||||
dataInfo.setInferenceprovenance(entityDataInfo.getInferenceprovenance());
|
||||
dataInfo.setInferred(entityDataInfo.getInferred());
|
||||
dataInfo.setProvenanceaction(entityDataInfo.getProvenanceaction());
|
||||
return dataInfo;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -80,7 +80,7 @@ public class MergeUtilsTest {
|
|||
assertEquals(1, d2.getCollectedfrom().size());
|
||||
assertTrue(cfId(d2.getCollectedfrom()).contains(ModelConstants.ZENODO_OD_ID));
|
||||
|
||||
Result res = MergeUtils.merge(d1, d2);
|
||||
Result res = MergeUtils.merge(d1, d2, true);
|
||||
|
||||
assertEquals(d2, res);
|
||||
}
|
||||
|
@ -93,7 +93,7 @@ public class MergeUtilsTest {
|
|||
assertEquals(1, d2.getCollectedfrom().size());
|
||||
assertTrue(cfId(d2.getCollectedfrom()).contains(ModelConstants.ZENODO_OD_ID));
|
||||
|
||||
Result res = MergeUtils.merge(p1, d2);
|
||||
Result res = MergeUtils.merge(p1, d2, true);
|
||||
|
||||
assertEquals(d2, res);
|
||||
}
|
||||
|
|
|
@ -55,11 +55,8 @@ public class Constants {
|
|||
null,
|
||||
ModelConstants.DNET_PROVENANCE_ACTIONS));
|
||||
|
||||
|
||||
public static final DataInfo Bip_DATA_INFO3 = OafMapperUtils
|
||||
.dataInfo(
|
||||
false,
|
||||
false,
|
||||
0.8f,
|
||||
UPDATE_DATA_INFO_TYPE,
|
||||
false,
|
||||
|
@ -68,31 +65,6 @@ public class Constants {
|
|||
UPDATE_MEASURE_BIP_CLASS_ID,
|
||||
UPDATE_CLASS_NAME,
|
||||
ModelConstants.DNET_PROVENANCE_ACTIONS));
|
||||
public static final EntityDataInfo Bip_DATA_INFO2 = OafMapperUtils
|
||||
.dataInfo(
|
||||
false,
|
||||
false,
|
||||
0.8f,
|
||||
UPDATE_DATA_INFO_TYPE,
|
||||
true,
|
||||
OafMapperUtils
|
||||
.qualifier(
|
||||
UPDATE_MEASURE_BIP_CLASS_ID,
|
||||
UPDATE_CLASS_NAME,
|
||||
ModelConstants.DNET_PROVENANCE_ACTIONS));
|
||||
|
||||
public static final EntityDataInfo Bip_DATA_INFO = OafMapperUtils
|
||||
.dataInfo(
|
||||
false,
|
||||
false,
|
||||
0.8f, //TODO check
|
||||
UPDATE_DATA_INFO_TYPE,
|
||||
true,
|
||||
OafMapperUtils
|
||||
.qualifier(
|
||||
ModelConstants.PROVENANCE_ENRICH,
|
||||
null,
|
||||
ModelConstants.DNET_PROVENANCE_ACTIONS));
|
||||
|
||||
private Constants() {
|
||||
}
|
||||
|
|
|
@ -129,20 +129,13 @@ public class CreateActionSetSparkJob implements Serializable {
|
|||
|
||||
List<Relation> relationList = new ArrayList<>();
|
||||
|
||||
String citing = ID_PREFIX
|
||||
+ IdentifierFactory.md5(CleaningFunctions.normalizePidValue(PidType.doi.toString(), value.getCiting()));
|
||||
final String cited = ID_PREFIX
|
||||
+ IdentifierFactory.md5(CleaningFunctions.normalizePidValue(PidType.doi.toString(), value.getCited()));
|
||||
|
||||
String citing = asOpenAireId(value.getCiting());
|
||||
final String cited = asOpenAireId(value.getCited());
|
||||
if (!citing.equals(cited)) {
|
||||
relationList.add(getRelation(citing, cited));
|
||||
|
||||
if (duplicate && value.getCiting().endsWith(".refs")) {
|
||||
citing = ID_PREFIX + IdentifierFactory
|
||||
.md5(
|
||||
CleaningFunctions
|
||||
.normalizePidValue(
|
||||
"doi", value.getCiting().substring(0, value.getCiting().indexOf(".refs"))));
|
||||
citing = asOpenAireId(value.getCiting());
|
||||
relationList.add(getRelation(citing, cited));
|
||||
}
|
||||
}
|
||||
|
@ -150,6 +143,13 @@ public class CreateActionSetSparkJob implements Serializable {
|
|||
return relationList;
|
||||
}
|
||||
|
||||
private static String asOpenAireId(String value) {
|
||||
return IdentifierFactory.idFromPid(
|
||||
"50", PidType.doi.toString(),
|
||||
CleaningFunctions.normalizePidValue(PidType.doi.toString(), value),
|
||||
true);
|
||||
}
|
||||
|
||||
public static Relation getRelation(
|
||||
String source,
|
||||
String target) {
|
||||
|
|
|
@ -73,6 +73,7 @@ object DataciteModelConstants {
|
|||
val SUBJ_CLASS = "keywords"
|
||||
val DATACITE_NAME = "Datacite"
|
||||
val dataInfo: EntityDataInfo = dataciteDataInfo(0.9f)
|
||||
val relDataInfo = OafMapperUtils.fromEntityDataInfo(dataInfo);
|
||||
|
||||
val DATACITE_COLLECTED_FROM: KeyValue =
|
||||
OafMapperUtils.keyValue(ModelConstants.DATACITE_ID, DATACITE_NAME)
|
||||
|
|
|
@ -279,11 +279,6 @@ object DataciteToOAFTransformation {
|
|||
|
||||
}
|
||||
|
||||
def createDNetTargetIdentifier(pid: String, pidType: String, idPrefix: String): String = {
|
||||
val f_part = s"$idPrefix|${pidType.toLowerCase}".padTo(15, '_')
|
||||
s"$f_part::${IdentifierFactory.md5(pid.toLowerCase)}"
|
||||
}
|
||||
|
||||
def generateOAFDate(dt: String, q: Qualifier): StructuredProperty = {
|
||||
OafMapperUtils.structuredProperty(dt, q)
|
||||
}
|
||||
|
@ -313,7 +308,7 @@ object DataciteToOAFTransformation {
|
|||
val p = match_pattern.get._2
|
||||
val grantId = m.matcher(awardUri).replaceAll("$2")
|
||||
val targetId = s"$p${DHPUtils.md5(grantId)}"
|
||||
List(generateRelation(sourceId, targetId, "isProducedBy", DATACITE_COLLECTED_FROM, dataInfo))
|
||||
List(generateRelation(sourceId, targetId, "isProducedBy", DATACITE_COLLECTED_FROM, relDataInfo))
|
||||
} else
|
||||
List()
|
||||
|
||||
|
@ -357,7 +352,7 @@ object DataciteToOAFTransformation {
|
|||
result.setPid(List(pid).asJava)
|
||||
|
||||
// This identifiere will be replaced in a second moment using the PID logic generation
|
||||
result.setId(OafMapperUtils.createOpenaireId(50, s"datacite____::$doi", true))
|
||||
result.setId(IdentifierFactory.createOpenaireId(50, s"datacite____::$doi", true))
|
||||
result.setOriginalId(List(doi).asJava)
|
||||
|
||||
val d = new Date(dateOfCollection * 1000)
|
||||
|
@ -386,7 +381,7 @@ object DataciteToOAFTransformation {
|
|||
)
|
||||
else null
|
||||
if (ni.nameIdentifier != null && ni.nameIdentifier.isDefined) {
|
||||
OafMapperUtils.authorPid(ni.nameIdentifier.get, q, dataInfo)
|
||||
OafMapperUtils.authorPid(ni.nameIdentifier.get, q, relDataInfo)
|
||||
} else
|
||||
null
|
||||
|
||||
|
@ -501,7 +496,7 @@ object DataciteToOAFTransformation {
|
|||
SUBJ_CLASS,
|
||||
SUBJ_CLASS,
|
||||
ModelConstants.DNET_SUBJECT_TYPOLOGIES,
|
||||
dataInfo
|
||||
relDataInfo
|
||||
)
|
||||
)
|
||||
.asJava
|
||||
|
@ -635,7 +630,7 @@ object DataciteToOAFTransformation {
|
|||
.map(r => {
|
||||
val rel = new Relation
|
||||
|
||||
rel.setProvenance(Lists.newArrayList(OafMapperUtils.getProvenance(DATACITE_COLLECTED_FROM, dataInfo)))
|
||||
rel.setProvenance(Lists.newArrayList(OafMapperUtils.getProvenance(DATACITE_COLLECTED_FROM, relDataInfo)))
|
||||
|
||||
val subRelType = subRelTypeMapping(r.relationType).relType
|
||||
rel.setRelType(REL_TYPE_VALUE)
|
||||
|
|
|
@ -2,7 +2,7 @@ package eu.dnetlib.dhp.sx.bio
|
|||
|
||||
import com.google.common.collect.Lists
|
||||
import eu.dnetlib.dhp.schema.common.ModelConstants
|
||||
import eu.dnetlib.dhp.schema.oaf.utils.{GraphCleaningFunctions, OafMapperUtils}
|
||||
import eu.dnetlib.dhp.schema.oaf.utils.{GraphCleaningFunctions, IdentifierFactory, OafMapperUtils}
|
||||
import eu.dnetlib.dhp.schema.oaf._
|
||||
import org.json4s.DefaultFormats
|
||||
import org.json4s.JsonAST.{JField, JObject, JString}
|
||||
|
@ -152,7 +152,7 @@ object BioDBToOAF {
|
|||
d.setDataInfo(DATA_INFO)
|
||||
|
||||
val nsPrefix = input.pidType.toLowerCase.padTo(12, '_')
|
||||
d.setId(OafMapperUtils.createOpenaireId(50, s"$nsPrefix::${input.pid.toLowerCase}", true))
|
||||
d.setId(IdentifierFactory.createOpenaireId(50, s"$nsPrefix::${input.pid.toLowerCase}", true))
|
||||
|
||||
if (input.tilte != null && input.tilte.nonEmpty)
|
||||
d.setTitle(
|
||||
|
@ -233,7 +233,7 @@ object BioDBToOAF {
|
|||
)
|
||||
|
||||
d.setDataInfo(DATA_INFO)
|
||||
d.setId(OafMapperUtils.createOpenaireId(50, s"uniprot_____::$pid", true))
|
||||
d.setId(IdentifierFactory.createOpenaireId(50, s"uniprot_____::$pid", true))
|
||||
d.setCollectedfrom(List(collectedFromMap("uniprot")).asJava)
|
||||
|
||||
val title: String = (json \ "title").extractOrElse[String](null)
|
||||
|
@ -424,7 +424,7 @@ object BioDBToOAF {
|
|||
|
||||
d.setCollectedfrom(List(collectedFromMap("pdb")).asJava)
|
||||
d.setDataInfo(DATA_INFO)
|
||||
d.setId(OafMapperUtils.createOpenaireId(50, s"pdb_________::$pdb", true))
|
||||
d.setId(IdentifierFactory.createOpenaireId(50, s"pdb_________::$pdb", true))
|
||||
d.setOriginalId(List(pdb).asJava)
|
||||
|
||||
val title = (json \ "title").extractOrElse[String](null)
|
||||
|
@ -532,7 +532,7 @@ object BioDBToOAF {
|
|||
|
||||
val nsPrefix = input.targetPidType.toLowerCase.padTo(12, '_')
|
||||
|
||||
d.setId(OafMapperUtils.createOpenaireId(50, s"$nsPrefix::${input.targetPid.toLowerCase}", true))
|
||||
d.setId(IdentifierFactory.createOpenaireId(50, s"$nsPrefix::${input.targetPid.toLowerCase}", true))
|
||||
d.setOriginalId(List(input.targetPid.toLowerCase).asJava)
|
||||
|
||||
d.setPid(
|
||||
|
|
|
@ -34,6 +34,8 @@ object PubMedToOaf {
|
|||
ModelConstants.PROVENANCE_ACTION_SET_QUALIFIER
|
||||
)
|
||||
|
||||
val REL_DATAINFO = OafMapperUtils.fromEntityDataInfo(ENTITY_DATAINFO)
|
||||
|
||||
val collectedFrom: KeyValue =
|
||||
OafMapperUtils.keyValue(ModelConstants.EUROPE_PUBMED_CENTRAL_ID, "Europe PubMed Central")
|
||||
|
||||
|
@ -259,7 +261,7 @@ object PubMedToOaf {
|
|||
SUBJ_CLASS,
|
||||
SUBJ_CLASS,
|
||||
ModelConstants.DNET_SUBJECT_TYPOLOGIES,
|
||||
ENTITY_DATAINFO
|
||||
REL_DATAINFO
|
||||
)
|
||||
)(collection.breakOut)
|
||||
if (subjects != null)
|
||||
|
|
|
@ -78,17 +78,12 @@ public class SparkAtomicActionScoreJobTest {
|
|||
SparkAtomicActionScoreJob
|
||||
.main(
|
||||
new String[] {
|
||||
"-isSparkSessionManaged",
|
||||
Boolean.FALSE.toString(),
|
||||
"-inputPath",
|
||||
|
||||
bipScoresPath,
|
||||
|
||||
"-outputPath",
|
||||
workingDir.toString() + "/actionSet"
|
||||
"-isSparkSessionManaged", Boolean.FALSE.toString(),
|
||||
"-inputPath", bipScoresPath,
|
||||
"-outputPath", workingDir.toString() + "/actionSet"
|
||||
});
|
||||
|
||||
final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
|
||||
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
|
||||
|
||||
JavaRDD<Result> tmp = sc
|
||||
.sequenceFile(workingDir.toString() + "/actionSet", Text.class, Text.class)
|
||||
|
|
|
@ -304,7 +304,6 @@ public class ProduceTest {
|
|||
SparkSaveUnresolved.main(new String[] {
|
||||
"--isSparkSessionManaged", Boolean.FALSE.toString(),
|
||||
"--sourcePath", workingDir.toString() + "/work",
|
||||
|
||||
"-outputPath", workingDir.toString() + "/unresolved"
|
||||
|
||||
});
|
||||
|
|
|
@ -8,6 +8,7 @@ import java.nio.file.Files;
|
|||
import java.nio.file.Path;
|
||||
|
||||
import eu.dnetlib.dhp.schema.oaf.DataInfo;
|
||||
import eu.dnetlib.dhp.schema.oaf.common.ModelSupport;
|
||||
import org.apache.commons.io.FileUtils;
|
||||
import org.apache.hadoop.io.Text;
|
||||
import org.apache.spark.SparkConf;
|
||||
|
@ -100,7 +101,7 @@ public class CreateOpenCitationsASTest {
|
|||
.map(value -> OBJECT_MAPPER.readValue(value._2().toString(), AtomicAction.class))
|
||||
.map(aa -> ((Relation) aa.getPayload()));
|
||||
|
||||
assertEquals(62, tmp.count());
|
||||
assertEquals(31, tmp.count());
|
||||
|
||||
// tmp.foreach(r -> System.out.println(OBJECT_MAPPER.writeValueAsString(r)));
|
||||
|
||||
|
@ -132,10 +133,7 @@ public class CreateOpenCitationsASTest {
|
|||
.map(value -> OBJECT_MAPPER.readValue(value._2().toString(), AtomicAction.class))
|
||||
.map(aa -> ((Relation) aa.getPayload()));
|
||||
|
||||
assertEquals(46, tmp.count());
|
||||
|
||||
// tmp.foreach(r -> System.out.println(OBJECT_MAPPER.writeValueAsString(r)));
|
||||
|
||||
assertEquals(23, tmp.count());
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -200,7 +198,7 @@ public class CreateOpenCitationsASTest {
|
|||
tmp.foreach(r -> {
|
||||
final DataInfo dataInfo = r.getProvenance().get(0).getDataInfo();
|
||||
assertEquals(false, dataInfo.getInferred());
|
||||
assertEquals("0.91", dataInfo.getTrust());
|
||||
assertEquals(0.91f, dataInfo.getTrust());
|
||||
assertEquals(
|
||||
CreateActionSetSparkJob.OPENCITATIONS_CLASSID, dataInfo.getProvenanceaction().getClassid());
|
||||
assertEquals(
|
||||
|
@ -240,9 +238,8 @@ public class CreateOpenCitationsASTest {
|
|||
assertEquals("citation", r.getSubRelType());
|
||||
assertEquals("resultResult", r.getRelType());
|
||||
});
|
||||
assertEquals(23, tmp.count());
|
||||
assertEquals(23, tmp.filter(r -> r.getRelClass().equals("Cites")).count());
|
||||
assertEquals(23, tmp.filter(r -> r.getRelClass().equals("IsCitedBy")).count());
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -281,17 +278,17 @@ public class CreateOpenCitationsASTest {
|
|||
@Test
|
||||
void testRelationsSourceTargetCouple() throws Exception {
|
||||
final String doi1 = "50|doi_________::"
|
||||
+ IdentifierFactory.md5(CleaningFunctions.normalizePidValue("doi", "10.1007/s10854-015-3684-x"));
|
||||
+ ModelSupport.md5(CleaningFunctions.normalizePidValue("doi", "10.1007/s10854-015-3684-x"));
|
||||
final String doi2 = "50|doi_________::"
|
||||
+ IdentifierFactory.md5(CleaningFunctions.normalizePidValue("doi", "10.1111/j.1551-2916.2008.02408.x"));
|
||||
+ ModelSupport.md5(CleaningFunctions.normalizePidValue("doi", "10.1111/j.1551-2916.2008.02408.x"));
|
||||
final String doi3 = "50|doi_________::"
|
||||
+ IdentifierFactory.md5(CleaningFunctions.normalizePidValue("doi", "10.1007/s10854-014-2114-9"));
|
||||
+ ModelSupport.md5(CleaningFunctions.normalizePidValue("doi", "10.1007/s10854-014-2114-9"));
|
||||
final String doi4 = "50|doi_________::"
|
||||
+ IdentifierFactory.md5(CleaningFunctions.normalizePidValue("doi", "10.1016/j.ceramint.2013.09.069"));
|
||||
+ ModelSupport.md5(CleaningFunctions.normalizePidValue("doi", "10.1016/j.ceramint.2013.09.069"));
|
||||
final String doi5 = "50|doi_________::"
|
||||
+ IdentifierFactory.md5(CleaningFunctions.normalizePidValue("doi", "10.1007/s10854-009-9913-4"));
|
||||
+ ModelSupport.md5(CleaningFunctions.normalizePidValue("doi", "10.1007/s10854-009-9913-4"));
|
||||
final String doi6 = "50|doi_________::"
|
||||
+ IdentifierFactory.md5(CleaningFunctions.normalizePidValue("doi", "10.1016/0038-1098(72)90370-5"));
|
||||
+ ModelSupport.md5(CleaningFunctions.normalizePidValue("doi", "10.1016/0038-1098(72)90370-5"));
|
||||
|
||||
String inputPath = getClass()
|
||||
.getResource(
|
||||
|
@ -318,7 +315,7 @@ public class CreateOpenCitationsASTest {
|
|||
|
||||
JavaRDD<Relation> check = tmp.filter(r -> r.getSource().equals(doi1) || r.getTarget().equals(doi1));
|
||||
|
||||
assertEquals(10, check.count());
|
||||
assertEquals(5, check.count());
|
||||
|
||||
check.foreach(r -> {
|
||||
if (r.getSource().equals(doi2) || r.getSource().equals(doi3) || r.getSource().equals(doi4) ||
|
||||
|
|
|
@ -73,7 +73,7 @@ public class SparkAtomicActionCountJobTest {
|
|||
|
||||
SparkAtomicActionUsageJob.writeActionSet(spark, usageScoresPath, workingDir.toString() + "/actionSet");
|
||||
|
||||
final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
|
||||
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
|
||||
|
||||
JavaRDD<Result> tmp = sc
|
||||
.sequenceFile(workingDir.toString() + "/actionSet", Text.class, Text.class)
|
||||
|
|
|
@ -18,6 +18,7 @@ import org.apache.hadoop.io.compress.GzipCodec;
|
|||
import org.apache.spark.SparkConf;
|
||||
import org.apache.spark.api.java.JavaRDD;
|
||||
import org.apache.spark.api.java.JavaSparkContext;
|
||||
import org.apache.spark.api.java.function.Function2;
|
||||
import org.apache.spark.sql.SparkSession;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
@ -135,7 +136,7 @@ public class GenerateEntitiesApplication {
|
|||
save(
|
||||
inputRdd
|
||||
.mapToPair(oaf -> new Tuple2<>(ModelSupport.idFn().apply(oaf), oaf))
|
||||
.reduceByKey(MergeUtils::merge)
|
||||
.reduceByKey((Function2<Oaf, Oaf, Oaf>) (v1, v2) -> MergeUtils.merge(v1, v2, true))
|
||||
.map(Tuple2::_2),
|
||||
targetPath);
|
||||
break;
|
||||
|
|
Loading…
Reference in New Issue