2020-04-20 15:24:33 +02:00
|
|
|
package eu.dnetlib.dhp.actionmanager.migration;
|
|
|
|
|
|
|
|
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
|
|
|
|
|
|
|
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
|
|
|
import com.google.common.base.Splitter;
|
|
|
|
import com.google.common.collect.Lists;
|
|
|
|
import com.google.protobuf.InvalidProtocolBufferException;
|
|
|
|
import eu.dnetlib.data.proto.OafProtos;
|
|
|
|
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
|
|
|
import eu.dnetlib.dhp.schema.action.AtomicAction;
|
|
|
|
import eu.dnetlib.dhp.schema.oaf.*;
|
|
|
|
import eu.dnetlib.dhp.utils.ISLookupClientFactory;
|
|
|
|
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
|
|
|
|
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
|
|
|
|
import java.io.IOException;
|
|
|
|
import java.io.Serializable;
|
|
|
|
import java.util.LinkedList;
|
2020-04-20 16:45:21 +02:00
|
|
|
import java.util.Objects;
|
2020-04-20 15:24:33 +02:00
|
|
|
import java.util.Optional;
|
|
|
|
import org.apache.commons.io.IOUtils;
|
|
|
|
import org.apache.commons.lang3.StringUtils;
|
|
|
|
import org.apache.hadoop.fs.FileSystem;
|
|
|
|
import org.apache.hadoop.fs.Path;
|
|
|
|
import org.apache.hadoop.io.Text;
|
|
|
|
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
|
|
|
|
import org.apache.spark.SparkConf;
|
|
|
|
import org.apache.spark.api.java.JavaSparkContext;
|
|
|
|
import org.apache.spark.sql.SparkSession;
|
|
|
|
import org.slf4j.Logger;
|
|
|
|
import org.slf4j.LoggerFactory;
|
|
|
|
import scala.Tuple2;
|
|
|
|
|
|
|
|
public class TransformActions implements Serializable {
|
|
|
|
|
2020-04-27 14:45:40 +02:00
|
|
|
private static final Logger log = LoggerFactory.getLogger(TransformActions.class);
|
2020-04-20 15:24:33 +02:00
|
|
|
|
2020-04-27 14:45:40 +02:00
|
|
|
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
|
2020-04-20 15:24:33 +02:00
|
|
|
|
2020-04-27 14:45:40 +02:00
|
|
|
private static final String SEPARATOR = "/";
|
2020-04-20 15:24:33 +02:00
|
|
|
|
2020-04-27 14:45:40 +02:00
|
|
|
public static void main(String[] args) throws Exception {
|
|
|
|
final ArgumentApplicationParser parser =
|
|
|
|
new ArgumentApplicationParser(
|
|
|
|
IOUtils.toString(
|
|
|
|
MigrateActionSet.class.getResourceAsStream(
|
|
|
|
"/eu/dnetlib/dhp/actionmanager/migration/transform_actionsets_parameters.json")));
|
|
|
|
parser.parseArgument(args);
|
2020-04-20 15:24:33 +02:00
|
|
|
|
2020-04-27 14:45:40 +02:00
|
|
|
Boolean isSparkSessionManaged =
|
|
|
|
Optional.ofNullable(parser.get("isSparkSessionManaged"))
|
|
|
|
.map(Boolean::valueOf)
|
|
|
|
.orElse(Boolean.TRUE);
|
|
|
|
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
|
2020-04-20 15:24:33 +02:00
|
|
|
|
2020-04-27 14:45:40 +02:00
|
|
|
final String isLookupUrl = parser.get("isLookupUrl");
|
|
|
|
log.info("isLookupUrl: {}", isLookupUrl);
|
2020-04-20 15:24:33 +02:00
|
|
|
|
2020-04-27 14:45:40 +02:00
|
|
|
final String inputPaths = parser.get("inputPaths");
|
2020-04-20 15:24:33 +02:00
|
|
|
|
2020-04-27 14:45:40 +02:00
|
|
|
if (StringUtils.isBlank(inputPaths)) {
|
|
|
|
throw new RuntimeException("empty inputPaths");
|
|
|
|
}
|
|
|
|
log.info("inputPaths: {}", inputPaths);
|
2020-04-20 15:24:33 +02:00
|
|
|
|
2020-04-27 14:45:40 +02:00
|
|
|
final String targetBaseDir = getTargetBaseDir(isLookupUrl);
|
2020-04-20 15:24:33 +02:00
|
|
|
|
2020-04-27 14:45:40 +02:00
|
|
|
SparkConf conf = new SparkConf();
|
2020-04-20 15:24:33 +02:00
|
|
|
|
2020-04-27 14:45:40 +02:00
|
|
|
runWithSparkSession(
|
|
|
|
conf, isSparkSessionManaged, spark -> transformActions(inputPaths, targetBaseDir, spark));
|
|
|
|
}
|
2020-04-20 15:24:33 +02:00
|
|
|
|
2020-04-27 14:45:40 +02:00
|
|
|
private static void transformActions(String inputPaths, String targetBaseDir, SparkSession spark)
|
|
|
|
throws IOException {
|
|
|
|
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
|
|
|
|
final FileSystem fs = FileSystem.get(spark.sparkContext().hadoopConfiguration());
|
2020-04-20 15:24:33 +02:00
|
|
|
|
2020-04-27 14:45:40 +02:00
|
|
|
for (String sourcePath : Lists.newArrayList(Splitter.on(",").split(inputPaths))) {
|
2020-04-20 15:24:33 +02:00
|
|
|
|
2020-04-27 14:45:40 +02:00
|
|
|
LinkedList<String> pathQ = Lists.newLinkedList(Splitter.on(SEPARATOR).split(sourcePath));
|
2020-04-20 15:24:33 +02:00
|
|
|
|
2020-04-27 14:45:40 +02:00
|
|
|
final String rawset = pathQ.pollLast();
|
|
|
|
final String actionSetDirectory = pathQ.pollLast();
|
2020-04-20 15:24:33 +02:00
|
|
|
|
2020-04-27 14:45:40 +02:00
|
|
|
final Path targetDirectory =
|
|
|
|
new Path(targetBaseDir + SEPARATOR + actionSetDirectory + SEPARATOR + rawset);
|
2020-04-20 15:24:33 +02:00
|
|
|
|
2020-04-27 14:45:40 +02:00
|
|
|
if (fs.exists(targetDirectory)) {
|
|
|
|
log.info("found target directory '{}", targetDirectory);
|
|
|
|
fs.delete(targetDirectory, true);
|
|
|
|
log.info("deleted target directory '{}", targetDirectory);
|
|
|
|
}
|
2020-04-20 15:24:33 +02:00
|
|
|
|
2020-04-27 14:45:40 +02:00
|
|
|
log.info("transforming actions from '{}' to '{}'", sourcePath, targetDirectory);
|
|
|
|
|
|
|
|
sc.sequenceFile(sourcePath, Text.class, Text.class)
|
|
|
|
.map(a -> eu.dnetlib.actionmanager.actions.AtomicAction.fromJSON(a._2().toString()))
|
|
|
|
.map(TransformActions::doTransform)
|
|
|
|
.filter(Objects::nonNull)
|
|
|
|
.mapToPair(
|
|
|
|
a -> new Tuple2<>(a.getClazz().toString(), OBJECT_MAPPER.writeValueAsString(a)))
|
|
|
|
.mapToPair(t -> new Tuple2(new Text(t._1()), new Text(t._2())))
|
|
|
|
.saveAsNewAPIHadoopFile(
|
|
|
|
targetDirectory.toString(),
|
|
|
|
Text.class,
|
|
|
|
Text.class,
|
|
|
|
SequenceFileOutputFormat.class,
|
|
|
|
sc.hadoopConfiguration());
|
2020-04-20 15:24:33 +02:00
|
|
|
}
|
2020-04-27 14:45:40 +02:00
|
|
|
}
|
2020-04-20 15:24:33 +02:00
|
|
|
|
2020-04-27 14:45:40 +02:00
|
|
|
private static AtomicAction doTransform(eu.dnetlib.actionmanager.actions.AtomicAction aa)
|
|
|
|
throws InvalidProtocolBufferException {
|
2020-04-20 16:45:21 +02:00
|
|
|
|
2020-04-27 14:45:40 +02:00
|
|
|
// dedup similarity relations had empty target value, don't migrate them
|
|
|
|
if (aa.getTargetValue().length == 0) {
|
|
|
|
return null;
|
2020-04-20 15:24:33 +02:00
|
|
|
}
|
2020-04-27 14:45:40 +02:00
|
|
|
final OafProtos.Oaf proto_oaf = OafProtos.Oaf.parseFrom(aa.getTargetValue());
|
|
|
|
final Oaf oaf = ProtoConverter.convert(proto_oaf);
|
|
|
|
switch (proto_oaf.getKind()) {
|
|
|
|
case entity:
|
|
|
|
switch (proto_oaf.getEntity().getType()) {
|
|
|
|
case datasource:
|
|
|
|
return new AtomicAction<>(Datasource.class, (Datasource) oaf);
|
|
|
|
case organization:
|
|
|
|
return new AtomicAction<>(Organization.class, (Organization) oaf);
|
|
|
|
case project:
|
|
|
|
return new AtomicAction<>(Project.class, (Project) oaf);
|
|
|
|
case result:
|
|
|
|
final String resulttypeid =
|
|
|
|
proto_oaf.getEntity().getResult().getMetadata().getResulttype().getClassid();
|
|
|
|
switch (resulttypeid) {
|
|
|
|
case "publication":
|
|
|
|
return new AtomicAction<>(Publication.class, (Publication) oaf);
|
|
|
|
case "software":
|
|
|
|
return new AtomicAction<>(Software.class, (Software) oaf);
|
|
|
|
case "other":
|
|
|
|
return new AtomicAction<>(OtherResearchProduct.class, (OtherResearchProduct) oaf);
|
|
|
|
case "dataset":
|
|
|
|
return new AtomicAction<>(Dataset.class, (Dataset) oaf);
|
|
|
|
default:
|
|
|
|
// can be an update, where the resulttype is not specified
|
|
|
|
return new AtomicAction<>(Result.class, (Result) oaf);
|
|
|
|
}
|
|
|
|
default:
|
|
|
|
throw new IllegalArgumentException(
|
|
|
|
"invalid entity type: " + proto_oaf.getEntity().getType());
|
|
|
|
}
|
|
|
|
case relation:
|
|
|
|
return new AtomicAction<>(Relation.class, (Relation) oaf);
|
|
|
|
default:
|
|
|
|
throw new IllegalArgumentException("invalid kind: " + proto_oaf.getKind());
|
2020-04-20 15:24:33 +02:00
|
|
|
}
|
2020-04-27 14:45:40 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
private static String getTargetBaseDir(String isLookupUrl) throws ISLookUpException {
|
|
|
|
ISLookUpService isLookUp = ISLookupClientFactory.getLookUpService(isLookupUrl);
|
|
|
|
String XQUERY =
|
|
|
|
"collection('/db/DRIVER/ServiceResources/ActionManagerServiceResourceType')//SERVICE_PROPERTIES/PROPERTY[@key = 'basePath']/@value/string()";
|
|
|
|
return isLookUp.getResourceProfileByQuery(XQUERY);
|
|
|
|
}
|
2020-04-20 15:24:33 +02:00
|
|
|
}
|