diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/actions/TransformActions.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/actions/TransformActions.java index cf95711eb..19a0cb5c9 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/actions/TransformActions.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/actions/TransformActions.java @@ -5,17 +5,14 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Splitter; import com.google.common.collect.Lists; import com.google.protobuf.InvalidProtocolBufferException; -import eu.dnetlib.actionmanager.actions.AtomicAction; +import eu.dnetlib.dhp.schema.action.AtomicAction; import eu.dnetlib.data.proto.OafProtos; import eu.dnetlib.dhp.application.ArgumentApplicationParser; -import eu.dnetlib.dhp.schema.oaf.DataInfo; -import eu.dnetlib.dhp.schema.oaf.Oaf; -import eu.dnetlib.dhp.schema.oaf.Qualifier; -import eu.dnetlib.dhp.schema.oaf.Relation; +import eu.dnetlib.dhp.schema.oaf.*; import eu.dnetlib.dhp.utils.ISLookupClientFactory; import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException; import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; -import org.apache.commons.codec.binary.Base64; + import org.apache.commons.io.IOUtils; import org.apache.commons.lang3.StringUtils; import org.apache.commons.logging.Log; @@ -84,59 +81,97 @@ public class TransformActions implements Serializable { log.info(String.format("transforming actions from '%s' to '%s'", sourcePath, targetDirectory)); sc.sequenceFile(sourcePath, Text.class, Text.class) - .mapToPair(a -> new Tuple2<>(a._1(), AtomicAction.fromJSON(a._2().toString()))) + .mapToPair(a -> new Tuple2<>(a._1(), eu.dnetlib.actionmanager.actions.AtomicAction.fromJSON(a._2().toString()))) .mapToPair(a -> new Tuple2<>(a._1(), transformAction(a._1().toString(), a._2()))) - + .filter(t -> StringUtils.isNotBlank(t._2().toString())) .saveAsHadoopFile(targetDirectory.toString(), Text.class, Text.class, SequenceFileOutputFormat.class, GzipCodec.class); } } } - private Text transformAction(String atomicaActionId, AtomicAction aa) throws InvalidProtocolBufferException, JsonProcessingException { - + private Text transformAction(String atomicaActionId, eu.dnetlib.actionmanager.actions.AtomicAction aa) throws InvalidProtocolBufferException, JsonProcessingException { + final Text out = new Text(); final ObjectMapper mapper = new ObjectMapper(); if (aa.getTargetValue() != null && aa.getTargetValue().length > 0) { - Oaf oaf = ProtoConverter.convert(OafProtos.Oaf.parseFrom(aa.getTargetValue())); - aa.setTargetValue(mapper.writeValueAsString(oaf).getBytes()); + out.set(mapper.writeValueAsString(doTransform(aa))); } else { - if (atomicaActionId.contains("dedupSimilarity")) { - - final String[] splitId = atomicaActionId.split("@"); - - String source = splitId[0]; - String target = splitId[2]; - - String[] relSemantic = splitId[1].split("_"); - - Relation rel = new Relation(); - rel.setSource(source); - rel.setTarget(target); - rel.setRelType(relSemantic[0]); - rel.setSubRelType(relSemantic[1]); - rel.setRelClass(relSemantic[2]); - - DataInfo d = new DataInfo(); - d.setDeletedbyinference(false); - d.setInferenceprovenance("deduplication"); - d.setInferred(true); - d.setInvisible(false); - Qualifier provenanceaction = new Qualifier(); - - provenanceaction.setClassid("deduplication"); - provenanceaction.setClassname("deduplication"); - provenanceaction.setSchemeid("dnet:provenanceActions"); - provenanceaction.setSchemename("dnet:provenanceActions"); - - d.setProvenanceaction(provenanceaction); - - rel.setDataInfo(d); - - aa.setTargetValue(mapper.writeValueAsString(rel).getBytes()); + out.set(mapper.writeValueAsString(getRelationAtomicAction(atomicaActionId))); } } - return new Text(mapper.writeValueAsString(aa)); + return out; + } + + private AtomicAction getRelationAtomicAction(String atomicaActionId) { + final String[] splitId = atomicaActionId.split("@"); + + String source = splitId[0]; + String target = splitId[2]; + + String[] relSemantic = splitId[1].split("_"); + + Relation rel = new Relation(); + rel.setSource(source); + rel.setTarget(target); + rel.setRelType(relSemantic[0]); + rel.setSubRelType(relSemantic[1]); + rel.setRelClass(relSemantic[2]); + + DataInfo d = new DataInfo(); + d.setDeletedbyinference(false); + d.setInferenceprovenance("deduplication"); + d.setInferred(true); + d.setInvisible(false); + Qualifier provenanceaction = new Qualifier(); + + provenanceaction.setClassid("deduplication"); + provenanceaction.setClassname("deduplication"); + provenanceaction.setSchemeid("dnet:provenanceActions"); + provenanceaction.setSchemename("dnet:provenanceActions"); + + d.setProvenanceaction(provenanceaction); + + rel.setDataInfo(d); + + return new AtomicAction<>(Relation.class, rel); + } + + private AtomicAction doTransform(eu.dnetlib.actionmanager.actions.AtomicAction aa) throws InvalidProtocolBufferException { + final OafProtos.Oaf proto_oaf = OafProtos.Oaf.parseFrom(aa.getTargetValue()); + final Oaf oaf = ProtoConverter.convert(proto_oaf); + switch (proto_oaf.getKind()) { + case entity: + switch (proto_oaf.getEntity().getType()) { + case datasource: + return new AtomicAction<>(Datasource.class, (Datasource) oaf); + case organization: + return new AtomicAction<>(Organization.class, (Organization) oaf); + case project: + return new AtomicAction<>(Project.class, (Project) oaf); + case result: + final String resulttypeid = proto_oaf.getEntity().getResult().getMetadata().getResulttype().getClassid(); + switch (resulttypeid) { + case "publication": + return new AtomicAction<>(Publication.class, (Publication) oaf); + case "software": + return new AtomicAction<>(Software.class, (Software) oaf); + case "other": + return new AtomicAction<>(OtherResearchProduct.class, (OtherResearchProduct) oaf); + case "dataset": + return new AtomicAction<>(Dataset.class, (Dataset) oaf); + default: + // can be an update, where the resulttype is not specified + return new AtomicAction<>(Result.class, (Result) oaf); + } + default: + throw new IllegalArgumentException("invalid entity type: " + proto_oaf.getEntity().getType()); + } + case relation: + return new AtomicAction<>(Relation.class, (Relation) oaf); + default: + throw new IllegalArgumentException("invalid kind: " + proto_oaf.getKind()); + } } private String getTargetBaseDir(String isLookupUrl) throws ISLookUpException { diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/wfs/actions/oozie_app/workflow.xml b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/wfs/actions/oozie_app/workflow.xml index ec2861a0e..ed01c8de4 100644 --- a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/wfs/actions/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/wfs/actions/oozie_app/workflow.xml @@ -54,12 +54,25 @@ + + ${jobTracker} + ${nameNode} + + + mapreduce.job.queuename + ${queueName} + + + oozie.launcher.mapred.job.queue.name + ${oozieLauncherQueueName} + + + + - ${jobTracker} - ${nameNode} eu.dnetlib.dhp.migration.actions.MigrateActionSet -Dmapred.task.timeout=${distcp_task_timeout} -is${isLookupUrl} @@ -78,8 +91,6 @@ - ${jobTracker} - ${nameNode} yarn cluster transform_actions