All the characters from the "Latin-1 Supplement" and "Latin Extended-A"
+ * Unicode blocks are mapped to the "Basic Latin" block. Characters from other
+ * alphabets are generally left intact, although the decomposable ones may be
+ * affected by the procedure.
+ *
+ * @author Lukasz Bolikowski (bolo@icm.edu.pl)
+ *
+ * @author Łukasz Dumiszewski /just copied from coansys-commons/
+ *
+ */
+public final class DiacriticsRemover {
+
+ private static final Character[] from = {
+ 'Æ', 'Ð', 'Ø', 'Þ', 'ß', 'æ', 'ð', 'ø', 'þ', 'Đ', 'đ', 'Ħ',
+ 'ħ', 'ı', 'ĸ', 'Ł', 'ł', 'Ŋ', 'ŋ', 'Œ', 'œ', 'Ŧ', 'ŧ'};
+ private static final String[] to = {
+ "AE", "D", "O", "Y", "ss", "ae", "d", "o", "y", "D", "d", "H",
+ "h", "i", "q", "L", "l", "N", "n", "OE", "oe", "T", "t"};
+
+ private static Map lookup = buildLookup();
+
+
+ //------------------------ CONSTRUCTORS -------------------
+
+
+ private DiacriticsRemover() {}
+
+
+ //------------------------ LOGIC --------------------------
+
+
+ /**
+ * Removes diacritics from a text.
+ *
+ * @param text Text to process.
+ * @return Text without diacritics.
+ */
+ public static String removeDiacritics(String text) {
+ if (text == null) {
+ return null;
+ }
+
+ String tmp = Normalizer.normalize(text, Normalizer.Form.NFKD);
+
+ StringBuilder builder = new StringBuilder();
+ for (int i = 0; i < tmp.length(); i++) {
+ Character ch = tmp.charAt(i);
+ if (Character.getType(ch) == Character.NON_SPACING_MARK) {
+ continue;
+ }
+
+ if (lookup.containsKey(ch)) {
+ builder.append(lookup.get(ch));
+ } else {
+ builder.append(ch);
+ }
+ }
+
+ return builder.toString();
+ }
+
+
+ //------------------------ PRIVATE --------------------------
+
+ private static Map buildLookup() {
+ if (from.length != to.length) {
+ throw new IllegalStateException();
+ }
+
+ Map _lookup = new HashMap();
+ for (int i = 0; i < from.length; i++) {
+ _lookup.put(from[i], to[i]);
+ }
+
+ return _lookup;
+ }
+}
diff --git a/dhp-common/src/main/java/eu/dnetlib/dhp/common/string/LenientComparisonStringNormalizer.java b/dhp-common/src/main/java/eu/dnetlib/dhp/common/string/LenientComparisonStringNormalizer.java
new file mode 100644
index 000000000..bae64ae38
--- /dev/null
+++ b/dhp-common/src/main/java/eu/dnetlib/dhp/common/string/LenientComparisonStringNormalizer.java
@@ -0,0 +1,130 @@
+/*
+ * This file is part of CoAnSys project.
+ * Copyright (c) 2012-2015 ICM-UW
+ *
+ * CoAnSys is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+
+ * CoAnSys is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with CoAnSys. If not, see .
+ */
+package eu.dnetlib.dhp.common.string;
+
+import java.io.Serializable;
+import java.util.List;
+
+import org.apache.commons.lang3.StringUtils;
+
+import com.google.common.collect.ImmutableList;
+
+/**
+ * An implementation of {@link StringNormalizer} that normalizes strings for non-strict comparisons
+ * in which one does not care about characters other than letters and digits or about differently written diacritics.
+ *
+ * @author Łukasz Dumiszewski
+ *
+ */
+public final class LenientComparisonStringNormalizer implements StringNormalizer, Serializable {
+
+
+ private static final long serialVersionUID = 1L;
+
+
+ private List whitelistCharacters;
+
+
+ //------------------------ CONSTRUCTORS --------------------------
+
+ public LenientComparisonStringNormalizer() {
+ this(ImmutableList.of());
+ }
+
+ /**
+ * @param whitelistCharacters - non alphanumeric characters that will not be removed
+ * during normalization
+ */
+ public LenientComparisonStringNormalizer(List whitelistCharacters) {
+ this.whitelistCharacters = whitelistCharacters;
+ }
+
+
+ //------------------------ LOGIC --------------------------
+
+
+
+ /**
+ * Normalizes the given value.
+ * The normalized strings are better suited for non-strict comparisons, in which one does NOT care about characters that are
+ * neither letters nor digits; about accidental spaces or different diacritics etc.
+ * This method:
+ *
+ *
Replaces all characters that are not letters or digits with spaces (except those on whitelist characters list)
+ *
Replaces white spaces with spaces
+ *
Trims
+ *
Compacts multi-space gaps to one-space gaps
+ *
Removes diacritics
+ *
Changes characters to lower case
+ *
+ * Returns "" if the passed value is null or blank
+ *
+ * @param value the string to normalize
+ * @see DiacriticsRemover#removeDiacritics(String, boolean)
+ *
+ *
+ */
+ public String normalize(String value) {
+
+ if (StringUtils.isBlank(value)) {
+
+ return "";
+
+ }
+
+
+ String result = value;
+
+ result = DiacriticsRemover.removeDiacritics(result);
+
+ result = removeNonLetterDigitCharacters(result);
+
+ result = result.toLowerCase();
+
+ result = result.trim().replaceAll(" +", " ");
+
+ return result;
+ }
+
+
+
+
+ //------------------------ PRIVATE --------------------------
+
+
+ private String removeNonLetterDigitCharacters(final String value) {
+
+ StringBuilder sb = new StringBuilder();
+
+ for (int i = 0; i < value.length(); ++i) {
+
+ char c = value.charAt(i);
+
+ if (Character.isLetterOrDigit(c) || whitelistCharacters.contains(c)) {
+ sb.append(c);
+ } else {
+ sb.append(' ');
+ }
+ }
+
+ return sb.toString();
+ }
+
+
+
+}
diff --git a/dhp-common/src/main/java/eu/dnetlib/dhp/common/string/StringNormalizer.java b/dhp-common/src/main/java/eu/dnetlib/dhp/common/string/StringNormalizer.java
new file mode 100644
index 000000000..6e28422bc
--- /dev/null
+++ b/dhp-common/src/main/java/eu/dnetlib/dhp/common/string/StringNormalizer.java
@@ -0,0 +1,16 @@
+package eu.dnetlib.dhp.common.string;
+
+/**
+ * String normalizer.
+ *
+ * @author Łukasz Dumiszewski
+ *
+ */
+public interface StringNormalizer {
+
+ /**
+ * Normalizes the given string value.
+ */
+ String normalize(String value);
+
+}
diff --git a/dhp-common/src/main/java/eu/dnetlib/dhp/common/utils/AvroGsonFactory.java b/dhp-common/src/main/java/eu/dnetlib/dhp/common/utils/AvroGsonFactory.java
new file mode 100644
index 000000000..7fcc0506a
--- /dev/null
+++ b/dhp-common/src/main/java/eu/dnetlib/dhp/common/utils/AvroGsonFactory.java
@@ -0,0 +1,45 @@
+package eu.dnetlib.dhp.common.utils;
+
+import java.lang.reflect.Type;
+
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import com.google.gson.JsonDeserializationContext;
+import com.google.gson.JsonDeserializer;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonParseException;
+
+/**
+ * Factory for gson object that supports serializing avro generated classes
+ *
+ * @author madryk
+ *
+ */
+public final class AvroGsonFactory {
+
+ //------------------------ CONSTRUCTORS -------------------
+
+
+ private AvroGsonFactory() {}
+
+
+ //------------------------ LOGIC --------------------------
+
+ public static Gson create() {
+ GsonBuilder builder = new GsonBuilder();
+
+ builder.registerTypeAdapter(CharSequence.class, new CharSequenceDeserializer());
+
+ return builder.create();
+ }
+
+ public static class CharSequenceDeserializer implements JsonDeserializer {
+
+ @Override
+ public CharSequence deserialize(JsonElement json, Type typeOfT, JsonDeserializationContext context)
+ throws JsonParseException {
+ return json.getAsString();
+ }
+
+ }
+}
diff --git a/dhp-common/src/main/java/eu/dnetlib/dhp/common/utils/AvroUtils.java b/dhp-common/src/main/java/eu/dnetlib/dhp/common/utils/AvroUtils.java
new file mode 100644
index 000000000..44dd218b5
--- /dev/null
+++ b/dhp-common/src/main/java/eu/dnetlib/dhp/common/utils/AvroUtils.java
@@ -0,0 +1,77 @@
+package eu.dnetlib.dhp.common.utils;
+
+import java.lang.reflect.Field;
+
+import org.apache.avro.Schema;
+
+/**
+ *
+ * @author Mateusz Kobos
+ *
+ */
+public final class AvroUtils {
+
+ public final static String primitiveTypePrefix = "org.apache.avro.Schema.Type.";
+
+
+ //------------------------ CONSTRUCTORS -------------------
+
+
+ private AvroUtils() {}
+
+
+ //------------------------ LOGIC --------------------------
+
+
+ /**
+ * For a given name of a class generated from Avro schema return
+ * a JSON schema.
+ *
+ * Apart from a name of a class you can also give a name of one of enums
+ * defined in {@link org.apache.avro.Schema.Type}; in such case an
+ * appropriate primitive type will be returned.
+ *
+ * @param typeName fully qualified name of a class generated from Avro schema,
+ * e.g. {@code eu.dnetlib.dhp.common.avro.Person},
+ * or a fully qualified name of enum defined by
+ * {@link org.apache.avro.Schema.Type},
+ * e.g. {@link org.apache.avro.Schema.Type.STRING}.
+ * @return JSON string
+ */
+ public static Schema toSchema(String typeName) {
+ Schema schema = null;
+ if(typeName.startsWith(primitiveTypePrefix)){
+ String shortName = typeName.substring(
+ primitiveTypePrefix.length(), typeName.length());
+ schema = getPrimitiveTypeSchema(shortName);
+ } else {
+ schema = getAvroClassSchema(typeName);
+ }
+ return schema;
+ }
+
+ private static Schema getPrimitiveTypeSchema(String shortName){
+ Schema.Type type = Schema.Type.valueOf(shortName);
+ return Schema.create(type);
+ }
+
+ private static Schema getAvroClassSchema(String className){
+ try {
+ Class> avroClass = Class.forName(className);
+ Field f = avroClass.getDeclaredField("SCHEMA$");
+ return (Schema) f.get(null);
+ } catch (ClassNotFoundException e) {
+ throw new RuntimeException(
+ "Class \""+className+"\" does not exist", e);
+ } catch (SecurityException e) {
+ throw new RuntimeException(e);
+ } catch (NoSuchFieldException e) {
+ throw new RuntimeException(e);
+ } catch (IllegalArgumentException e) {
+ throw new RuntimeException(e);
+ } catch (IllegalAccessException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+}
diff --git a/dhp-common/src/main/java/eu/dnetlib/dhp/common/utils/ByteArrayUtils.java b/dhp-common/src/main/java/eu/dnetlib/dhp/common/utils/ByteArrayUtils.java
new file mode 100644
index 000000000..152271ab7
--- /dev/null
+++ b/dhp-common/src/main/java/eu/dnetlib/dhp/common/utils/ByteArrayUtils.java
@@ -0,0 +1,45 @@
+package eu.dnetlib.dhp.common.utils;
+
+/**
+ * Byte array utility class.
+ * @author mhorst
+ *
+ */
+public final class ByteArrayUtils {
+
+ //------------------------ CONSTRUCTORS -------------------
+
+ private ByteArrayUtils() {}
+
+ //------------------------ LOGIC --------------------------
+
+ /**
+ * Does this byte array begin with match array content?
+ * @param source Byte array to examine
+ * @param match Byte array to locate in source
+ * @return true If the starting bytes are equal
+ */
+ public static boolean startsWith(byte[] source, byte[] match) {
+ return startsWith(source, 0, match);
+ }
+
+ /**
+ * Does this byte array begin with match array content?
+ * @param source Byte array to examine
+ * @param offset An offset into the source array
+ * @param match Byte array to locate in source
+ * @return true If the starting bytes are equal
+ */
+ public static boolean startsWith(byte[] source, int offset, byte[] match) {
+ if (match.length > (source.length - offset)) {
+ return false;
+ }
+ for (int i = 0; i < match.length; i++) {
+ if (source[offset + i] != match[i]) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+}
diff --git a/dhp-common/src/main/java/eu/dnetlib/dhp/common/utils/EmptyDatastoreVerifierProcess.java b/dhp-common/src/main/java/eu/dnetlib/dhp/common/utils/EmptyDatastoreVerifierProcess.java
new file mode 100644
index 000000000..1e6e04149
--- /dev/null
+++ b/dhp-common/src/main/java/eu/dnetlib/dhp/common/utils/EmptyDatastoreVerifierProcess.java
@@ -0,0 +1,89 @@
+package eu.dnetlib.dhp.common.utils;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.security.InvalidParameterException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+import eu.dnetlib.dhp.common.java.PortBindings;
+import eu.dnetlib.dhp.common.java.Ports;
+import eu.dnetlib.dhp.common.java.Process;
+import eu.dnetlib.dhp.common.java.io.CloseableIterator;
+import eu.dnetlib.dhp.common.java.io.DataStore;
+import eu.dnetlib.dhp.common.java.io.FileSystemPath;
+import eu.dnetlib.dhp.common.java.porttype.AnyPortType;
+import eu.dnetlib.dhp.common.java.porttype.PortType;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import static eu.dnetlib.dhp.common.WorkflowRuntimeParameters.OOZIE_ACTION_OUTPUT_FILENAME;
+
+/**
+ * Simple process verifying whether given datastore is empty.
+ * @author mhorst
+ *
+ */
+public class EmptyDatastoreVerifierProcess implements Process {
+
+ public static final String INPUT_PORT_NAME = "input";
+
+ public static final String DEFAULT_ENCODING = "UTF-8";
+
+ public static final String OUTPUT_PROPERTY_IS_EMPTY = "isEmpty";
+
+ /**
+ * Ports handled by this module.
+ */
+ private final Ports ports;
+
+
+ // ------------------------ CONSTRUCTORS --------------------------
+
+ public EmptyDatastoreVerifierProcess() {
+// preparing ports
+ Map input = new HashMap();
+ input.put(INPUT_PORT_NAME, new AnyPortType());
+ Map output = Collections.emptyMap();
+ ports = new Ports(input, output);
+ }
+
+ @Override
+ public Map getInputPorts() {
+ return ports.getInput();
+ }
+
+ @Override
+ public Map getOutputPorts() {
+ return ports.getOutput();
+ }
+
+ @Override
+ public void run(PortBindings portBindings, Configuration conf, Map parameters) throws Exception {
+ if (!portBindings.getInput().containsKey(INPUT_PORT_NAME)) {
+ throw new InvalidParameterException("missing input port!");
+ }
+
+ try (CloseableIterator> closeableIt = getIterator(conf, portBindings.getInput().get(INPUT_PORT_NAME))) {
+ File file = new File(System.getProperty(OOZIE_ACTION_OUTPUT_FILENAME));
+ Properties props = new Properties();
+ props.setProperty(OUTPUT_PROPERTY_IS_EMPTY, Boolean.toString(!closeableIt.hasNext()));
+ try (OutputStream os = new FileOutputStream(file)) {
+ props.store(os, "");
+ }
+ }
+ }
+
+ /**
+ * Returns iterator over datastore.
+ */
+ protected CloseableIterator> getIterator(Configuration conf, Path path) throws IOException {
+ return DataStore.getReader(new FileSystemPath(FileSystem.get(conf), path));
+ }
+
+}
diff --git a/dhp-schemas/README.md b/dhp-schemas/README.md
new file mode 100644
index 000000000..473ad4cf1
--- /dev/null
+++ b/dhp-schemas/README.md
@@ -0,0 +1,3 @@
+Description of the project
+--------------------------
+This project defines **serialization schemas** of Avro data store files that are used to pass data between workflow nodes in the system.
diff --git a/dhp-schemas/pom.xml b/dhp-schemas/pom.xml
new file mode 100644
index 000000000..2c6e18f27
--- /dev/null
+++ b/dhp-schemas/pom.xml
@@ -0,0 +1,62 @@
+
+
+ 4.0.0
+
+
+ eu.dnetlib.dhp
+ dhp
+ 1.0.0-SNAPSHOT
+
+
+ dhp-schemas
+ jar
+
+
+
+ org.apache.avro
+ avro
+
+
+
+
+
+
+
+ org.apache.avro
+ avro-maven-plugin
+
+
+ generate-sources
+
+ schema
+ idl-protocol
+
+
+
+
+
+
+ org.codehaus.mojo
+ build-helper-maven-plugin
+
+
+ add-source
+ generate-sources
+
+ add-source
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/dhp-schemas/src/main/avro/eu/dnetlib/dhp/audit/Fault.avdl b/dhp-schemas/src/main/avro/eu/dnetlib/dhp/audit/Fault.avdl
new file mode 100644
index 000000000..3bce821a4
--- /dev/null
+++ b/dhp-schemas/src/main/avro/eu/dnetlib/dhp/audit/Fault.avdl
@@ -0,0 +1,29 @@
+@namespace("eu.dnetlib.dhp.audit.schemas")
+protocol DHP {
+
+ record Cause {
+// generic cause code, root exception class name when derived from exception
+ string code;
+// cause message
+ union { null , string } message = null;
+ }
+
+ record Fault {
+// input object identifier
+ string inputObjectId;
+// fault creation timestamp
+ long timestamp;
+// generic fault code, root exception class name when derived from exception
+ string code;
+// fault message
+ union { null , string } message = null;
+// stack trace
+ union { null , string } stackTrace = null;
+// fault causes, array is indexed with cause depth
+ union { null , array } causes = null;
+// Other supplementary data related to specific type of fault.
+// See parameters description in oozie workflow.xml documentation of modules
+// that use this structure for information what exactly can be stored as supplementary data.
+ union { null , map } supplementaryData = null;
+ }
+}
diff --git a/dhp-schemas/src/main/avro/eu/dnetlib/dhp/common/ReportEntry.avdl b/dhp-schemas/src/main/avro/eu/dnetlib/dhp/common/ReportEntry.avdl
new file mode 100644
index 000000000..99406b4f0
--- /dev/null
+++ b/dhp-schemas/src/main/avro/eu/dnetlib/dhp/common/ReportEntry.avdl
@@ -0,0 +1,16 @@
+@namespace("eu.dnetlib.dhp.common.schemas")
+protocol DHP{
+
+ enum ReportEntryType {
+ COUNTER, DURATION
+ }
+
+
+ record ReportEntry {
+
+ string key;
+ ReportEntryType type;
+ string value;
+
+ }
+}
diff --git a/dhp-schemas/src/main/avro/eu/dnetlib/dhp/importer/NativeRecord.avdl b/dhp-schemas/src/main/avro/eu/dnetlib/dhp/importer/NativeRecord.avdl
new file mode 100644
index 000000000..9ad5435fa
--- /dev/null
+++ b/dhp-schemas/src/main/avro/eu/dnetlib/dhp/importer/NativeRecord.avdl
@@ -0,0 +1,21 @@
+@namespace("eu.dnetlib.dhp.importer.schemas")
+protocol DHP {
+
+ enum RecordFormat {
+ XML, JSON
+ }
+
+ record ImportedRecord {
+
+ // record identifier
+ string id;
+
+ RecordFormat format;
+
+ // format name (OAF, OAI_DC, Datacite, etc) for which there is a parser implementation
+ string formatName;
+
+ // record body
+ string body;
+ }
+}
diff --git a/dhp-wf/dhp-wf-import/pom.xml b/dhp-wf/dhp-wf-import/pom.xml
new file mode 100644
index 000000000..6bf4ba825
--- /dev/null
+++ b/dhp-wf/dhp-wf-import/pom.xml
@@ -0,0 +1,105 @@
+
+
+
+ eu.dnetlib.dhp
+ dhp-wf
+ 1.0.0-SNAPSHOT
+
+ 4.0.0
+
+ dhp-wf-import
+
+
+
+
+ ${project.groupId}
+ dhp-common
+ ${project.version}
+
+
+
+ ${project.groupId}
+ dhp-common
+ ${project.version}
+ test-jar
+ test
+
+
+
+ ${project.groupId}
+ dhp-schemas
+ ${project.version}
+
+
+
+
+ org.apache.hadoop
+ hadoop-mapreduce-client-core
+
+
+ org.apache.hadoop
+ hadoop-common
+
+
+
+ com.googlecode.json-simple
+ json-simple
+
+
+ commons-cli
+ commons-cli
+
+
+ eu.dnetlib
+ dnet-objectstore-rmi
+
+
+ eu.dnetlib
+ cnr-rmi-api
+
+
+ eu.dnetlib
+ cnr-resultset-client
+
+
+ eu.dnetlib
+ dnet-openaireplus-mapping-utils
+
+
+
+ org.springframework
+ spring-context
+
+
+ org.apache.cxf
+ cxf-rt-frontend-jaxws
+
+
+ com.google.code.gson
+ gson
+
+
+
+ org.apache.spark
+ spark-core_2.10
+
+
+ org.apache.spark
+ spark-sql_2.10
+
+
+ com.databricks
+ spark-avro_2.10
+
+
+ org.mongodb.spark
+ mongo-spark-connector_2.10
+
+
+
+
+
+
+
diff --git a/dhp-wf/dhp-wf-import/src/main/java/eu/dnetlib/dhp/wf/importer/DataFileRecordReceiver.java b/dhp-wf/dhp-wf-import/src/main/java/eu/dnetlib/dhp/wf/importer/DataFileRecordReceiver.java
new file mode 100644
index 000000000..214d6691d
--- /dev/null
+++ b/dhp-wf/dhp-wf-import/src/main/java/eu/dnetlib/dhp/wf/importer/DataFileRecordReceiver.java
@@ -0,0 +1,29 @@
+package eu.dnetlib.dhp.wf.importer;
+
+import java.io.IOException;
+
+import org.apache.avro.file.DataFileWriter;
+
+/**
+ * {@link DataFileWriter} based record receiver.
+ * @author mhorst
+ *
+ */
+public class DataFileRecordReceiver implements RecordReceiver {
+
+ private final DataFileWriter writer;
+
+ /**
+ * Default constructor.
+ * @param writer
+ */
+ public DataFileRecordReceiver(DataFileWriter writer) {
+ this.writer = writer;
+ }
+
+ @Override
+ public void receive(T object) throws IOException {
+ this.writer.append(object);
+ }
+
+}
diff --git a/dhp-wf/dhp-wf-import/src/main/java/eu/dnetlib/dhp/wf/importer/DataFileRecordReceiverWithCounter.java b/dhp-wf/dhp-wf-import/src/main/java/eu/dnetlib/dhp/wf/importer/DataFileRecordReceiverWithCounter.java
new file mode 100644
index 000000000..955f18065
--- /dev/null
+++ b/dhp-wf/dhp-wf-import/src/main/java/eu/dnetlib/dhp/wf/importer/DataFileRecordReceiverWithCounter.java
@@ -0,0 +1,50 @@
+package eu.dnetlib.dhp.wf.importer;
+
+import java.io.IOException;
+
+import org.apache.avro.file.DataFileWriter;
+
+/**
+ * {@link DataFileWriter} based record receiver with counter of
+ * received records.
+ *
+ * @author madryk
+ */
+public class DataFileRecordReceiverWithCounter extends DataFileRecordReceiver {
+
+ private long receivedCount = 0L;
+
+
+ //------------------------ CONSTRUCTORS --------------------------
+
+ /**
+ * Default constructor
+ *
+ * @param writer - writer of the received records
+ */
+ public DataFileRecordReceiverWithCounter(DataFileWriter writer) {
+ super(writer);
+ }
+
+
+ //------------------------ GETTERS --------------------------
+
+ /**
+ * Returns number of received records
+ */
+ public long getReceivedCount() {
+ return receivedCount;
+ }
+
+
+ //------------------------ LOGIC --------------------------
+
+ /**
+ * Receives passed record and increments the counter.
+ */
+ @Override
+ public void receive(T record) throws IOException {
+ super.receive(record);
+ ++receivedCount;
+ }
+}
diff --git a/dhp-wf/dhp-wf-import/src/main/java/eu/dnetlib/dhp/wf/importer/ImportWorkflowRuntimeParameters.java b/dhp-wf/dhp-wf-import/src/main/java/eu/dnetlib/dhp/wf/importer/ImportWorkflowRuntimeParameters.java
new file mode 100644
index 000000000..40f673ee0
--- /dev/null
+++ b/dhp-wf/dhp-wf-import/src/main/java/eu/dnetlib/dhp/wf/importer/ImportWorkflowRuntimeParameters.java
@@ -0,0 +1,52 @@
+package eu.dnetlib.dhp.wf.importer;
+
+/**
+ * Import realated workflow parameters.
+ * @author mhorst
+ *
+ */
+public final class ImportWorkflowRuntimeParameters {
+
+ // parameter names
+
+ public static final String IMPORT_INFERENCE_PROVENANCE_BLACKLIST = "import.inference.provenance.blacklist";
+ public static final String IMPORT_SKIP_DELETED_BY_INFERENCE = "import.skip.deleted.by.inference";
+ public static final String IMPORT_TRUST_LEVEL_THRESHOLD = "import.trust.level.threshold";
+ public static final String IMPORT_APPROVED_DATASOURCES_CSV = "import.approved.datasources.csv";
+ public static final String IMPORT_APPROVED_COLUMNFAMILIES_CSV = "import.approved.columnfamilies.csv";
+ public static final String IMPORT_MERGE_BODY_WITH_UPDATES = "import.merge.body.with.updates";
+ public static final String IMPORT_CONTENT_APPROVED_OBJECSTORES_CSV = "import.content.approved.objectstores.csv";
+ public static final String IMPORT_CONTENT_BLACKLISTED_OBJECSTORES_CSV = "import.content.blacklisted.objectstores.csv";
+
+ public static final String IMPORT_CONTENT_OBJECT_STORE_LOC = "import.content.object.store.location";
+ public static final String IMPORT_CONTENT_OBJECT_STORE_IDS_CSV = "import.content.object.store.ids.csv";
+ public static final String IMPORT_CONTENT_MAX_FILE_SIZE_MB = "import.content.max.file.size.mb";
+ public static final String IMPORT_CONTENT_CONNECTION_TIMEOUT = "import.content.connection.timeout";
+ public static final String IMPORT_CONTENT_READ_TIMEOUT = "import.content.read.timeout";
+
+ public static final String IMPORT_MDSTORE_IDS_CSV = "import.mdstore.ids.csv";
+ public static final String IMPORT_MDSTORE_SERVICE_LOCATION = "import.mdstore.service.location";
+ public static final String IMPORT_MDSTORE_RECORD_MAXLENGTH = "import.mdstore.record.maxlength";
+
+ public static final String IMPORT_ISLOOKUP_SERVICE_LOCATION = "import.islookup.service.location";
+ public static final String IMPORT_VOCABULARY_CODE = "import.vocabulary.code";
+ public static final String IMPORT_VOCABULARY_OUTPUT_FILENAME = "import.vocabulary.output.filename";
+
+ public static final String IMPORT_RESULT_SET_CLIENT_READ_TIMEOUT = "import.resultset.client.read.timeout";
+ public static final String IMPORT_RESULT_SET_CLIENT_CONNECTION_TIMEOUT = "import.resultset.client.connection.timeout";
+ public static final String IMPORT_RESULT_SET_PAGESIZE = "import.resultset.pagesize";
+
+
+ public static final String HBASE_ENCODING = "hbase.table.encoding";
+
+ public static final String IMPORT_FACADE_FACTORY_CLASS = "import.facade.factory.classname";
+
+ // default values
+
+ public static final String RESULTSET_READ_TIMEOUT_DEFAULT_VALUE = "60000";
+ public static final String RESULTSET_CONNECTION_TIMEOUT_DEFAULT_VALUE = "60000";
+ public static final String RESULTSET_PAGESIZE_DEFAULT_VALUE = "100";
+
+ private ImportWorkflowRuntimeParameters() {}
+
+}
diff --git a/dhp-wf/dhp-wf-import/src/main/java/eu/dnetlib/dhp/wf/importer/RecordReceiver.java b/dhp-wf/dhp-wf-import/src/main/java/eu/dnetlib/dhp/wf/importer/RecordReceiver.java
new file mode 100644
index 000000000..c0a5e8950
--- /dev/null
+++ b/dhp-wf/dhp-wf-import/src/main/java/eu/dnetlib/dhp/wf/importer/RecordReceiver.java
@@ -0,0 +1,14 @@
+package eu.dnetlib.dhp.wf.importer;
+
+import java.io.IOException;
+
+/**
+ * Record receiver interface.
+ * @author mhorst
+ *
+ * @param
+ */
+public interface RecordReceiver {
+
+ void receive(T object) throws IOException;
+}
diff --git a/dhp-wf/dhp-wf-import/src/main/java/eu/dnetlib/dhp/wf/importer/facade/AbstractResultSetAwareWebServiceFacade.java b/dhp-wf/dhp-wf-import/src/main/java/eu/dnetlib/dhp/wf/importer/facade/AbstractResultSetAwareWebServiceFacade.java
new file mode 100644
index 000000000..0a3cd6fb4
--- /dev/null
+++ b/dhp-wf/dhp-wf-import/src/main/java/eu/dnetlib/dhp/wf/importer/facade/AbstractResultSetAwareWebServiceFacade.java
@@ -0,0 +1,104 @@
+package eu.dnetlib.dhp.wf.importer.facade;
+
+import java.util.Map;
+
+import javax.xml.ws.BindingProvider;
+import javax.xml.ws.wsaddressing.W3CEndpointReferenceBuilder;
+
+import org.apache.log4j.Logger;
+
+import eu.dnetlib.enabling.tools.JaxwsServiceResolverImpl;
+
+/**
+ * Abstract class utilized by all WebService facades.
+ * @author mhorst
+ *
+ */
+public abstract class AbstractResultSetAwareWebServiceFacade {
+
+ private final Logger log = Logger.getLogger(this.getClass());
+
+ /**
+ * Web service.
+ */
+ private final T service;
+
+ /**
+ * ResultSet read timeout.
+ */
+ private final long resultSetReadTimeout;
+
+ /**
+ * ResultSet connection timeout.
+ */
+ private final long resultSetConnectionTimeout;
+
+ /**
+ * ResultSet page size.
+ */
+ private final int resultSetPageSize;
+
+
+ //------------------------ CONSTRUCTORS -------------------
+
+ /**
+ * Instantiates underlying service.
+ * @param clazz webservice class
+ * @param serviceLocation webservice location
+ * @param serviceReadTimeout service read timeout
+ * @param serviceConnectionTimeout service connection timeout
+ * @param resultSetReadTimeout resultset read timeout
+ * @param resultSetConnectionTimeout resultset connection timeout
+ * @param resultSetPageSize resultset page size
+ */
+ protected AbstractResultSetAwareWebServiceFacade(Class clazz, String serviceLocation,
+ long serviceReadTimeout, long serviceConnectionTimeout,
+ long resultSetReadTimeout, long resultSetConnectionTimeout, int resultSetPageSize) {
+ W3CEndpointReferenceBuilder eprBuilder = new W3CEndpointReferenceBuilder();
+ eprBuilder.address(serviceLocation);
+ eprBuilder.build();
+ this.service = new JaxwsServiceResolverImpl().getService(clazz, eprBuilder.build());
+ if (this.service instanceof BindingProvider) {
+ log.info(String.format("setting timeouts for %s: read timeout (%s) and connect timeout (%s)",
+ BindingProvider.class, serviceReadTimeout, serviceConnectionTimeout));
+ final Map requestContext = ((BindingProvider) service).getRequestContext();
+
+ // can't be sure about which will be used. Set them all.
+ requestContext.put("com.sun.xml.internal.ws.request.timeout", serviceReadTimeout);
+ requestContext.put("com.sun.xml.internal.ws.connect.timeout", serviceConnectionTimeout);
+
+ requestContext.put("com.sun.xml.ws.request.timeout", serviceReadTimeout);
+ requestContext.put("com.sun.xml.ws.connect.timeout", serviceConnectionTimeout);
+
+ requestContext.put("javax.xml.ws.client.receiveTimeout", serviceReadTimeout);
+ requestContext.put("javax.xml.ws.client.connectionTimeout", serviceConnectionTimeout);
+ }
+
+ this.resultSetReadTimeout = resultSetReadTimeout;
+ this.resultSetConnectionTimeout = resultSetConnectionTimeout;
+ this.resultSetPageSize = resultSetPageSize;
+ }
+
+
+ //------------------------ GETTERS -------------------------
+
+ public T getService() {
+ return service;
+ }
+
+
+ public long getResultSetReadTimeout() {
+ return resultSetReadTimeout;
+ }
+
+
+ public long getResultSetConnectionTimeout() {
+ return resultSetConnectionTimeout;
+ }
+
+
+ public int getResultSetPageSize() {
+ return resultSetPageSize;
+ }
+
+}
diff --git a/dhp-wf/dhp-wf-import/src/main/java/eu/dnetlib/dhp/wf/importer/facade/ISLookupFacade.java b/dhp-wf/dhp-wf-import/src/main/java/eu/dnetlib/dhp/wf/importer/facade/ISLookupFacade.java
new file mode 100644
index 000000000..c156ae1cc
--- /dev/null
+++ b/dhp-wf/dhp-wf-import/src/main/java/eu/dnetlib/dhp/wf/importer/facade/ISLookupFacade.java
@@ -0,0 +1,17 @@
+package eu.dnetlib.dhp.wf.importer.facade;
+
+/**
+ * ISLookup service facade.
+ *
+ * @author mhorst
+ *
+ */
+public interface ISLookupFacade {
+
+ /**
+ * Provides all profiles matching given query
+ * @param xPathQuery XPath query
+ */
+ Iterable searchProfile(String xPathQuery) throws ServiceFacadeException;
+
+}
diff --git a/dhp-wf/dhp-wf-import/src/main/java/eu/dnetlib/dhp/wf/importer/facade/MDStoreFacade.java b/dhp-wf/dhp-wf-import/src/main/java/eu/dnetlib/dhp/wf/importer/facade/MDStoreFacade.java
new file mode 100644
index 000000000..f50b02b98
--- /dev/null
+++ b/dhp-wf/dhp-wf-import/src/main/java/eu/dnetlib/dhp/wf/importer/facade/MDStoreFacade.java
@@ -0,0 +1,17 @@
+package eu.dnetlib.dhp.wf.importer.facade;
+
+/**
+ * MDStore service facade.
+ *
+ * @author mhorst
+ *
+ */
+public interface MDStoreFacade {
+
+ /**
+ * Delivers all records for given MDStore identifier
+ * @param mdStoreId MDStore identifier
+ */
+ Iterable deliverMDRecords(String mdStoreId) throws ServiceFacadeException;
+
+}
diff --git a/dhp-wf/dhp-wf-import/src/main/java/eu/dnetlib/dhp/wf/importer/facade/ObjectStoreFacade.java b/dhp-wf/dhp-wf-import/src/main/java/eu/dnetlib/dhp/wf/importer/facade/ObjectStoreFacade.java
new file mode 100644
index 000000000..0e1aa19ef
--- /dev/null
+++ b/dhp-wf/dhp-wf-import/src/main/java/eu/dnetlib/dhp/wf/importer/facade/ObjectStoreFacade.java
@@ -0,0 +1,19 @@
+package eu.dnetlib.dhp.wf.importer.facade;
+
+/**
+ * ObjectStore service facade.
+ *
+ * @author mhorst
+ *
+ */
+public interface ObjectStoreFacade {
+
+ /**
+ * Returns metadata records from given objectstore created in specified time range.
+ * @param objectStoreId object store identifier
+ * @param from from time in millis
+ * @param until until time in millis
+ */
+ Iterable deliverObjects(String objectStoreId, long from, long until) throws ServiceFacadeException;
+
+}
diff --git a/dhp-wf/dhp-wf-import/src/main/java/eu/dnetlib/dhp/wf/importer/facade/ServiceFacadeException.java b/dhp-wf/dhp-wf-import/src/main/java/eu/dnetlib/dhp/wf/importer/facade/ServiceFacadeException.java
new file mode 100644
index 000000000..9776306fa
--- /dev/null
+++ b/dhp-wf/dhp-wf-import/src/main/java/eu/dnetlib/dhp/wf/importer/facade/ServiceFacadeException.java
@@ -0,0 +1,27 @@
+package eu.dnetlib.dhp.wf.importer.facade;
+
+/**
+ * Service facade generic exception.
+ *
+ * @author mhorst
+ *
+ */
+public class ServiceFacadeException extends Exception {
+
+ private static final long serialVersionUID = 0L;
+
+ //------------------------ CONSTRUCTORS -------------------
+
+ public ServiceFacadeException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ public ServiceFacadeException(String message) {
+ super(message);
+ }
+
+ public ServiceFacadeException(Throwable cause) {
+ super(cause);
+ }
+
+}
diff --git a/dhp-wf/dhp-wf-import/src/main/java/eu/dnetlib/dhp/wf/importer/facade/ServiceFacadeFactory.java b/dhp-wf/dhp-wf-import/src/main/java/eu/dnetlib/dhp/wf/importer/facade/ServiceFacadeFactory.java
new file mode 100644
index 000000000..94b9307c4
--- /dev/null
+++ b/dhp-wf/dhp-wf-import/src/main/java/eu/dnetlib/dhp/wf/importer/facade/ServiceFacadeFactory.java
@@ -0,0 +1,20 @@
+package eu.dnetlib.dhp.wf.importer.facade;
+
+import java.util.Map;
+
+/**
+ * Generic service facade factory. All implementations must be instantiable with no-argument construtor.
+ *
+ * @author mhorst
+ *
+ */
+public interface ServiceFacadeFactory {
+
+ /**
+ * Creates service of given type configured with parameters.
+ *
+ * @param parameters service configuration
+ *
+ */
+ T instantiate(Map parameters);
+}
diff --git a/dhp-wf/dhp-wf-import/src/main/java/eu/dnetlib/dhp/wf/importer/facade/ServiceFacadeUtils.java b/dhp-wf/dhp-wf-import/src/main/java/eu/dnetlib/dhp/wf/importer/facade/ServiceFacadeUtils.java
new file mode 100644
index 000000000..53a76d761
--- /dev/null
+++ b/dhp-wf/dhp-wf-import/src/main/java/eu/dnetlib/dhp/wf/importer/facade/ServiceFacadeUtils.java
@@ -0,0 +1,80 @@
+package eu.dnetlib.dhp.wf.importer.facade;
+
+import static eu.dnetlib.dhp.wf.importer.ImportWorkflowRuntimeParameters.IMPORT_FACADE_FACTORY_CLASS;
+
+import java.lang.reflect.Constructor;
+import java.util.Map;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+
+import com.google.common.collect.ImmutableMap;
+
+import eu.dnetlib.dhp.wf.importer.ImportWorkflowRuntimeParameters;
+
+/**
+ * Service facade utility methods.
+ * @author mhorst
+ *
+ */
+public final class ServiceFacadeUtils {
+
+ //------------------------ CONSTRUCTORS -------------------
+
+ private ServiceFacadeUtils() {}
+
+ //------------------------ LOGIC --------------------------
+
+ /**
+ * Instantiates service based on provided parameters.
+ *
+ * Service factory class name is mandatory and has to be provided as {@value ImportWorkflowRuntimeParameters#IMPORT_FACADE_FACTORY_CLASS} parameter.
+ * Other parameters will be used by factory itself. Factory must be instantiable with no-argument construtor.
+ *
+ * @param parameters set of parameters required for service instantiation
+ *
+ */
+ public static T instantiate(Map parameters) throws ServiceFacadeException {
+ String serviceFactoryClassName = parameters.get(IMPORT_FACADE_FACTORY_CLASS);
+ if (StringUtils.isBlank(serviceFactoryClassName)) {
+ throw new ServiceFacadeException("unknown service facade factory, no " + IMPORT_FACADE_FACTORY_CLASS + " parameter provided!");
+ }
+ try {
+ Class> clazz = Class.forName(serviceFactoryClassName);
+ Constructor> constructor = clazz.getConstructor();
+ @SuppressWarnings("unchecked")
+ ServiceFacadeFactory serviceFactory = (ServiceFacadeFactory) constructor.newInstance();
+ return serviceFactory.instantiate(parameters);
+ } catch (Exception e) {
+ throw new ServiceFacadeException("exception occurred while instantiating service by facade factory: " + IMPORT_FACADE_FACTORY_CLASS, e);
+ }
+
+ }
+
+ /**
+ * Instantiates service based on provided configuration.
+ *
+ * Service factory class name is mandatory and has to be provided as {@value ImportWorkflowRuntimeParameters#IMPORT_FACADE_FACTORY_CLASS} configuration entry.
+ * Other parameters will be used by factory itself. Factory must be instantiable with no-argument construtor.
+ *
+ * @param config set of configuration entries required for service instantiation
+ */
+ public static T instantiate(Configuration config) throws ServiceFacadeException {
+ return instantiate(buildParameters(config));
+ }
+
+
+ // ------------------------ PRIVATE --------------------------
+
+ /**
+ * Converts configuration entries into plain map.
+ */
+ private static Map buildParameters(Configuration config) {
+ ImmutableMap.Builder builder = ImmutableMap.builder();
+ for (Map.Entry entry : config) {
+ builder.put(entry);
+ }
+ return builder.build();
+ }
+
+}
diff --git a/dhp-wf/dhp-wf-import/src/main/java/eu/dnetlib/dhp/wf/importer/facade/WebServiceISLookupFacade.java b/dhp-wf/dhp-wf-import/src/main/java/eu/dnetlib/dhp/wf/importer/facade/WebServiceISLookupFacade.java
new file mode 100644
index 000000000..7c787f2f8
--- /dev/null
+++ b/dhp-wf/dhp-wf-import/src/main/java/eu/dnetlib/dhp/wf/importer/facade/WebServiceISLookupFacade.java
@@ -0,0 +1,55 @@
+package eu.dnetlib.dhp.wf.importer.facade;
+
+import java.util.Collections;
+
+import org.apache.log4j.Logger;
+
+import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpDocumentNotFoundException;
+import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
+import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
+
+/**
+ * WebService based database facade.
+ *
+ * @author mhorst
+ *
+ */
+public class WebServiceISLookupFacade extends AbstractResultSetAwareWebServiceFacade implements ISLookupFacade {
+
+ private static final Logger log = Logger.getLogger(WebServiceISLookupFacade.class);
+
+
+ //------------------------ CONSTRUCTORS -------------------
+
+ /**
+ * @param serviceLocation database service location
+ * @param serviceReadTimeout service read timeout
+ * @param serviceConnectionTimeout service connection timeout
+ * @param resultSetReadTimeout result set providing database results read timeout
+ * @param resultSetConnectionTimeout result set connection timeout
+ * @param resultSetPageSize result set data chunk size
+ */
+ public WebServiceISLookupFacade(String serviceLocation,
+ long serviceReadTimeout, long serviceConnectionTimeout,
+ long resultSetReadTimeout, long resultSetConnectionTimeout, int resultSetPageSize) {
+ super(ISLookUpService.class, serviceLocation,
+ serviceReadTimeout, serviceConnectionTimeout,
+ resultSetReadTimeout, resultSetConnectionTimeout, resultSetPageSize);
+ }
+
+ //------------------------ LOGIC --------------------------
+
+ @Override
+ public Iterable searchProfile(String xPathQuery) throws ServiceFacadeException {
+ try {
+ return getService().quickSearchProfile(xPathQuery);
+ } catch (ISLookUpDocumentNotFoundException e) {
+ log.error("unable to find profile for query: " + xPathQuery, e);
+ return Collections.emptyList();
+ } catch (ISLookUpException e) {
+ throw new ServiceFacadeException("searching profiles in ISLookup failed with query '" + xPathQuery + "'", e);
+ }
+
+ }
+
+}
diff --git a/dhp-wf/dhp-wf-import/src/main/java/eu/dnetlib/dhp/wf/importer/facade/WebServiceISLookupFacadeFactory.java b/dhp-wf/dhp-wf-import/src/main/java/eu/dnetlib/dhp/wf/importer/facade/WebServiceISLookupFacadeFactory.java
new file mode 100644
index 000000000..6557ead94
--- /dev/null
+++ b/dhp-wf/dhp-wf-import/src/main/java/eu/dnetlib/dhp/wf/importer/facade/WebServiceISLookupFacadeFactory.java
@@ -0,0 +1,45 @@
+package eu.dnetlib.dhp.wf.importer.facade;
+
+import static eu.dnetlib.dhp.common.WorkflowRuntimeParameters.DNET_SERVICE_CLIENT_CONNECTION_TIMEOUT;
+import static eu.dnetlib.dhp.common.WorkflowRuntimeParameters.DNET_SERVICE_CLIENT_READ_TIMEOUT;
+import static eu.dnetlib.dhp.common.WorkflowRuntimeParameters.DNET_SERVICE_CONNECTION_TIMEOUT_DEFAULT_VALUE;
+import static eu.dnetlib.dhp.common.WorkflowRuntimeParameters.DNET_SERVICE_READ_TIMEOUT_DEFAULT_VALUE;
+import static eu.dnetlib.dhp.wf.importer.ImportWorkflowRuntimeParameters.IMPORT_ISLOOKUP_SERVICE_LOCATION;
+import static eu.dnetlib.dhp.wf.importer.ImportWorkflowRuntimeParameters.IMPORT_RESULT_SET_CLIENT_CONNECTION_TIMEOUT;
+import static eu.dnetlib.dhp.wf.importer.ImportWorkflowRuntimeParameters.IMPORT_RESULT_SET_CLIENT_READ_TIMEOUT;
+import static eu.dnetlib.dhp.wf.importer.ImportWorkflowRuntimeParameters.IMPORT_RESULT_SET_PAGESIZE;
+import static eu.dnetlib.dhp.wf.importer.ImportWorkflowRuntimeParameters.RESULTSET_CONNECTION_TIMEOUT_DEFAULT_VALUE;
+import static eu.dnetlib.dhp.wf.importer.ImportWorkflowRuntimeParameters.RESULTSET_PAGESIZE_DEFAULT_VALUE;
+import static eu.dnetlib.dhp.wf.importer.ImportWorkflowRuntimeParameters.RESULTSET_READ_TIMEOUT_DEFAULT_VALUE;
+
+import java.util.Map;
+
+import com.google.common.base.Preconditions;
+
+import eu.dnetlib.dhp.common.WorkflowRuntimeParameters;
+
+/**
+ * WebService Database service facade factory.
+ *
+ * @author mhorst
+ *
+ */
+public class WebServiceISLookupFacadeFactory implements ServiceFacadeFactory {
+
+
+ //------------------------ LOGIC --------------------------
+
+ @Override
+ public ISLookupFacade instantiate(Map parameters) {
+ Preconditions.checkArgument(parameters.containsKey(IMPORT_ISLOOKUP_SERVICE_LOCATION),
+ "unknown ISLookup service location: no parameter provided: '%s'", IMPORT_ISLOOKUP_SERVICE_LOCATION);
+
+ return new WebServiceISLookupFacade(parameters.get(IMPORT_ISLOOKUP_SERVICE_LOCATION),
+ Long.parseLong(WorkflowRuntimeParameters.getParamValue(DNET_SERVICE_CLIENT_READ_TIMEOUT, DNET_SERVICE_READ_TIMEOUT_DEFAULT_VALUE, parameters)),
+ Long.parseLong(WorkflowRuntimeParameters.getParamValue(DNET_SERVICE_CLIENT_CONNECTION_TIMEOUT, DNET_SERVICE_CONNECTION_TIMEOUT_DEFAULT_VALUE, parameters)),
+ Long.parseLong(WorkflowRuntimeParameters.getParamValue(IMPORT_RESULT_SET_CLIENT_READ_TIMEOUT, RESULTSET_READ_TIMEOUT_DEFAULT_VALUE, parameters)),
+ Long.parseLong(WorkflowRuntimeParameters.getParamValue(IMPORT_RESULT_SET_CLIENT_CONNECTION_TIMEOUT, RESULTSET_CONNECTION_TIMEOUT_DEFAULT_VALUE, parameters)),
+ Integer.parseInt(WorkflowRuntimeParameters.getParamValue(IMPORT_RESULT_SET_PAGESIZE, RESULTSET_PAGESIZE_DEFAULT_VALUE, parameters)));
+ }
+
+}
diff --git a/dhp-wf/dhp-wf-import/src/main/java/eu/dnetlib/dhp/wf/importer/facade/WebServiceMDStoreFacade.java b/dhp-wf/dhp-wf-import/src/main/java/eu/dnetlib/dhp/wf/importer/facade/WebServiceMDStoreFacade.java
new file mode 100644
index 000000000..d37d020ed
--- /dev/null
+++ b/dhp-wf/dhp-wf-import/src/main/java/eu/dnetlib/dhp/wf/importer/facade/WebServiceMDStoreFacade.java
@@ -0,0 +1,52 @@
+package eu.dnetlib.dhp.wf.importer.facade;
+
+import javax.xml.ws.wsaddressing.W3CEndpointReference;
+
+import eu.dnetlib.data.mdstore.MDStoreService;
+import eu.dnetlib.data.mdstore.MDStoreServiceException;
+import eu.dnetlib.enabling.resultset.client.ResultSetClientFactory;
+import eu.dnetlib.enabling.tools.JaxwsServiceResolverImpl;
+
+/**
+ * WebService based MDStore facade.
+ *
+ * @author mhorst
+ *
+ */
+public class WebServiceMDStoreFacade extends AbstractResultSetAwareWebServiceFacade implements MDStoreFacade {
+
+
+ //------------------------ CONSTRUCTORS -------------------
+
+ /**
+ * @param serviceLocation MDStore webservice location
+ * @param serviceReadTimeout service read timeout
+ * @param serviceConnectionTimeout service connection timeout
+ * @param resultSetReadTimeout resultset read timeout
+ * @param resultSetConnectionTimeout result set connection timeout
+ * @param resultSetPageSize resultset page size
+ */
+ public WebServiceMDStoreFacade(String serviceLocation,
+ long serviceReadTimeout, long serviceConnectionTimeout,
+ long resultSetReadTimeout, long resultSetConnectionTimeout, int resultSetPageSize) {
+ super(MDStoreService.class, serviceLocation,
+ serviceReadTimeout, serviceConnectionTimeout,
+ resultSetReadTimeout, resultSetConnectionTimeout, resultSetPageSize);
+ }
+
+ //------------------------ LOGIC --------------------------
+
+ @Override
+ public Iterable deliverMDRecords(String mdStoreId) throws ServiceFacadeException {
+ try {
+ W3CEndpointReference eprResult = getService().deliverMDRecords(mdStoreId, null, null, null);
+ ResultSetClientFactory rsFactory = new ResultSetClientFactory(
+ getResultSetPageSize(), getResultSetReadTimeout(), getResultSetConnectionTimeout());
+ rsFactory.setServiceResolver(new JaxwsServiceResolverImpl());
+ return rsFactory.getClient(eprResult);
+ } catch (MDStoreServiceException e) {
+ throw new ServiceFacadeException("delivering records for md store " + mdStoreId + " failed!", e);
+ }
+ }
+
+}
diff --git a/dhp-wf/dhp-wf-import/src/main/java/eu/dnetlib/dhp/wf/importer/facade/WebServiceMDStoreFacadeFactory.java b/dhp-wf/dhp-wf-import/src/main/java/eu/dnetlib/dhp/wf/importer/facade/WebServiceMDStoreFacadeFactory.java
new file mode 100644
index 000000000..00bb0c3f7
--- /dev/null
+++ b/dhp-wf/dhp-wf-import/src/main/java/eu/dnetlib/dhp/wf/importer/facade/WebServiceMDStoreFacadeFactory.java
@@ -0,0 +1,45 @@
+package eu.dnetlib.dhp.wf.importer.facade;
+
+import static eu.dnetlib.dhp.common.WorkflowRuntimeParameters.DNET_SERVICE_CLIENT_CONNECTION_TIMEOUT;
+import static eu.dnetlib.dhp.common.WorkflowRuntimeParameters.DNET_SERVICE_CLIENT_READ_TIMEOUT;
+import static eu.dnetlib.dhp.common.WorkflowRuntimeParameters.DNET_SERVICE_CONNECTION_TIMEOUT_DEFAULT_VALUE;
+import static eu.dnetlib.dhp.common.WorkflowRuntimeParameters.DNET_SERVICE_READ_TIMEOUT_DEFAULT_VALUE;
+import static eu.dnetlib.dhp.wf.importer.ImportWorkflowRuntimeParameters.IMPORT_MDSTORE_SERVICE_LOCATION;
+import static eu.dnetlib.dhp.wf.importer.ImportWorkflowRuntimeParameters.IMPORT_RESULT_SET_CLIENT_CONNECTION_TIMEOUT;
+import static eu.dnetlib.dhp.wf.importer.ImportWorkflowRuntimeParameters.IMPORT_RESULT_SET_CLIENT_READ_TIMEOUT;
+import static eu.dnetlib.dhp.wf.importer.ImportWorkflowRuntimeParameters.IMPORT_RESULT_SET_PAGESIZE;
+import static eu.dnetlib.dhp.wf.importer.ImportWorkflowRuntimeParameters.RESULTSET_CONNECTION_TIMEOUT_DEFAULT_VALUE;
+import static eu.dnetlib.dhp.wf.importer.ImportWorkflowRuntimeParameters.RESULTSET_PAGESIZE_DEFAULT_VALUE;
+import static eu.dnetlib.dhp.wf.importer.ImportWorkflowRuntimeParameters.RESULTSET_READ_TIMEOUT_DEFAULT_VALUE;
+
+import java.util.Map;
+
+import com.google.common.base.Preconditions;
+
+import eu.dnetlib.dhp.common.WorkflowRuntimeParameters;
+
+/**
+ * WebService MDStore service facade factory.
+ *
+ * @author mhorst
+ *
+ */
+public class WebServiceMDStoreFacadeFactory implements ServiceFacadeFactory {
+
+
+ //------------------------ LOGIC --------------------------
+
+ @Override
+ public WebServiceMDStoreFacade instantiate(Map parameters) {
+ Preconditions.checkArgument(parameters.containsKey(IMPORT_MDSTORE_SERVICE_LOCATION),
+ "unknown MDStore service location: no parameter provided: '%s'", IMPORT_MDSTORE_SERVICE_LOCATION);
+
+ return new WebServiceMDStoreFacade(parameters.get(IMPORT_MDSTORE_SERVICE_LOCATION),
+ Long.parseLong(WorkflowRuntimeParameters.getParamValue(DNET_SERVICE_CLIENT_READ_TIMEOUT, DNET_SERVICE_READ_TIMEOUT_DEFAULT_VALUE, parameters)),
+ Long.parseLong(WorkflowRuntimeParameters.getParamValue(DNET_SERVICE_CLIENT_CONNECTION_TIMEOUT, DNET_SERVICE_CONNECTION_TIMEOUT_DEFAULT_VALUE, parameters)),
+ Long.parseLong(WorkflowRuntimeParameters.getParamValue(IMPORT_RESULT_SET_CLIENT_READ_TIMEOUT, RESULTSET_READ_TIMEOUT_DEFAULT_VALUE, parameters)),
+ Long.parseLong(WorkflowRuntimeParameters.getParamValue(IMPORT_RESULT_SET_CLIENT_CONNECTION_TIMEOUT, RESULTSET_CONNECTION_TIMEOUT_DEFAULT_VALUE, parameters)),
+ Integer.parseInt(WorkflowRuntimeParameters.getParamValue(IMPORT_RESULT_SET_PAGESIZE, RESULTSET_PAGESIZE_DEFAULT_VALUE, parameters)));
+ }
+
+}
diff --git a/dhp-wf/dhp-wf-import/src/main/java/eu/dnetlib/dhp/wf/importer/facade/WebServiceObjectStoreFacade.java b/dhp-wf/dhp-wf-import/src/main/java/eu/dnetlib/dhp/wf/importer/facade/WebServiceObjectStoreFacade.java
new file mode 100644
index 000000000..6e1aee80b
--- /dev/null
+++ b/dhp-wf/dhp-wf-import/src/main/java/eu/dnetlib/dhp/wf/importer/facade/WebServiceObjectStoreFacade.java
@@ -0,0 +1,52 @@
+package eu.dnetlib.dhp.wf.importer.facade;
+
+import javax.xml.ws.wsaddressing.W3CEndpointReference;
+
+import eu.dnetlib.data.objectstore.rmi.ObjectStoreService;
+import eu.dnetlib.data.objectstore.rmi.ObjectStoreServiceException;
+import eu.dnetlib.enabling.resultset.client.ResultSetClientFactory;
+import eu.dnetlib.enabling.tools.JaxwsServiceResolverImpl;
+
+/**
+ * WebService based ObjectStore facade.
+ *
+ * @author mhorst
+ *
+ */
+public class WebServiceObjectStoreFacade extends AbstractResultSetAwareWebServiceFacade implements ObjectStoreFacade {
+
+
+ //------------------------ CONSTRUCTORS -------------------
+
+ /**
+ * @param serviceLocation ObjectStore webservice location
+ * @param serviceReadTimeout service read timeout
+ * @param serviceConnectionTimeout service connection timeout
+ * @param resultSetReadTimeout resultset read timeout
+ * @param resultSetConnectionTimeout result set connection timeout
+ * @param resultSetPageSize resultset page size
+ */
+ public WebServiceObjectStoreFacade(String serviceLocation,
+ long serviceReadTimeout, long serviceConnectionTimeout,
+ long resultSetReadTimeout, long resultSetConnectionTimeout, int resultSetPageSize) {
+ super(ObjectStoreService.class, serviceLocation,
+ serviceReadTimeout, serviceConnectionTimeout,
+ resultSetReadTimeout, resultSetConnectionTimeout, resultSetPageSize);
+ }
+
+ //------------------------ LOGIC --------------------------
+
+ @Override
+ public Iterable deliverObjects(String objectStoreId, long from, long until) throws ServiceFacadeException {
+ try {
+ W3CEndpointReference eprResult = getService().deliverObjects(objectStoreId, from, until);
+ ResultSetClientFactory rsFactory = new ResultSetClientFactory(
+ getResultSetPageSize(), getResultSetReadTimeout(), getResultSetConnectionTimeout());
+ rsFactory.setServiceResolver(new JaxwsServiceResolverImpl());
+ return rsFactory.getClient(eprResult);
+ } catch (ObjectStoreServiceException e) {
+ throw new ServiceFacadeException("delivering records for object store " + objectStoreId + " failed!", e);
+ }
+ }
+
+}
diff --git a/dhp-wf/dhp-wf-import/src/main/java/eu/dnetlib/dhp/wf/importer/facade/WebServiceObjectStoreFacadeFactory.java b/dhp-wf/dhp-wf-import/src/main/java/eu/dnetlib/dhp/wf/importer/facade/WebServiceObjectStoreFacadeFactory.java
new file mode 100644
index 000000000..9c77c4546
--- /dev/null
+++ b/dhp-wf/dhp-wf-import/src/main/java/eu/dnetlib/dhp/wf/importer/facade/WebServiceObjectStoreFacadeFactory.java
@@ -0,0 +1,44 @@
+package eu.dnetlib.dhp.wf.importer.facade;
+
+import static eu.dnetlib.dhp.common.WorkflowRuntimeParameters.DNET_SERVICE_CLIENT_CONNECTION_TIMEOUT;
+import static eu.dnetlib.dhp.common.WorkflowRuntimeParameters.DNET_SERVICE_CLIENT_READ_TIMEOUT;
+import static eu.dnetlib.dhp.common.WorkflowRuntimeParameters.DNET_SERVICE_CONNECTION_TIMEOUT_DEFAULT_VALUE;
+import static eu.dnetlib.dhp.common.WorkflowRuntimeParameters.DNET_SERVICE_READ_TIMEOUT_DEFAULT_VALUE;
+import static eu.dnetlib.dhp.wf.importer.ImportWorkflowRuntimeParameters.IMPORT_CONTENT_OBJECT_STORE_LOC;
+import static eu.dnetlib.dhp.wf.importer.ImportWorkflowRuntimeParameters.IMPORT_RESULT_SET_CLIENT_CONNECTION_TIMEOUT;
+import static eu.dnetlib.dhp.wf.importer.ImportWorkflowRuntimeParameters.IMPORT_RESULT_SET_CLIENT_READ_TIMEOUT;
+import static eu.dnetlib.dhp.wf.importer.ImportWorkflowRuntimeParameters.IMPORT_RESULT_SET_PAGESIZE;
+import static eu.dnetlib.dhp.wf.importer.ImportWorkflowRuntimeParameters.RESULTSET_CONNECTION_TIMEOUT_DEFAULT_VALUE;
+import static eu.dnetlib.dhp.wf.importer.ImportWorkflowRuntimeParameters.RESULTSET_PAGESIZE_DEFAULT_VALUE;
+import static eu.dnetlib.dhp.wf.importer.ImportWorkflowRuntimeParameters.RESULTSET_READ_TIMEOUT_DEFAULT_VALUE;
+
+import java.util.Map;
+
+import com.google.common.base.Preconditions;
+
+import eu.dnetlib.dhp.common.WorkflowRuntimeParameters;
+
+/**
+ * WebService ObjectStore facade factory.
+ *
+ * @author mhorst
+ *
+ */
+public class WebServiceObjectStoreFacadeFactory implements ServiceFacadeFactory {
+
+ //------------------------ LOGIC --------------------------
+
+ @Override
+ public WebServiceObjectStoreFacade instantiate(Map parameters) {
+ Preconditions.checkArgument(parameters.containsKey(IMPORT_CONTENT_OBJECT_STORE_LOC),
+ "unknown object store service location: no parameter provided: '%s'", IMPORT_CONTENT_OBJECT_STORE_LOC);
+
+ return new WebServiceObjectStoreFacade(parameters.get(IMPORT_CONTENT_OBJECT_STORE_LOC),
+ Long.parseLong(WorkflowRuntimeParameters.getParamValue(DNET_SERVICE_CLIENT_READ_TIMEOUT, DNET_SERVICE_READ_TIMEOUT_DEFAULT_VALUE, parameters)),
+ Long.parseLong(WorkflowRuntimeParameters.getParamValue(DNET_SERVICE_CLIENT_CONNECTION_TIMEOUT, DNET_SERVICE_CONNECTION_TIMEOUT_DEFAULT_VALUE, parameters)),
+ Long.parseLong(WorkflowRuntimeParameters.getParamValue(IMPORT_RESULT_SET_CLIENT_READ_TIMEOUT, RESULTSET_READ_TIMEOUT_DEFAULT_VALUE, parameters)),
+ Long.parseLong(WorkflowRuntimeParameters.getParamValue(IMPORT_RESULT_SET_CLIENT_CONNECTION_TIMEOUT, RESULTSET_CONNECTION_TIMEOUT_DEFAULT_VALUE, parameters)),
+ Integer.parseInt(WorkflowRuntimeParameters.getParamValue(IMPORT_RESULT_SET_PAGESIZE, RESULTSET_PAGESIZE_DEFAULT_VALUE, parameters)));
+ }
+
+}
diff --git a/dhp-wf/dhp-wf-import/src/main/java/eu/dnetlib/dhp/wf/importer/mdrecord/MDRecordHandler.java b/dhp-wf/dhp-wf-import/src/main/java/eu/dnetlib/dhp/wf/importer/mdrecord/MDRecordHandler.java
new file mode 100644
index 000000000..5d61f06a5
--- /dev/null
+++ b/dhp-wf/dhp-wf-import/src/main/java/eu/dnetlib/dhp/wf/importer/mdrecord/MDRecordHandler.java
@@ -0,0 +1,94 @@
+package eu.dnetlib.dhp.wf.importer.mdrecord;
+
+import java.util.Stack;
+
+import org.xml.sax.Attributes;
+import org.xml.sax.SAXException;
+import org.xml.sax.helpers.DefaultHandler;
+
+import eu.dnetlib.dhp.common.InfoSpaceConstants;
+
+/**
+ * MDRecord handler extracting record identifier.
+ *
+ * Notice: writer is not being closed by handler. Created outside, let it be closed outside as well.
+ * @author mhorst
+ *
+ */
+public class MDRecordHandler extends DefaultHandler {
+
+ public static final String ELEM_OBJ_IDENTIFIER = "objIdentifier";
+
+ private static final String ELEM_HEADER = "header";
+
+ private Stack parents;
+
+ private StringBuilder currentValue = new StringBuilder();
+
+ private String recordId;
+
+
+ // ------------------------ LOGIC --------------------------
+
+ @Override
+ public void startDocument() throws SAXException {
+ parents = new Stack();
+ recordId = null;
+ }
+
+ @Override
+ public void startElement(String uri, String localName, String qName,
+ Attributes attributes) throws SAXException {
+ if (this.recordId == null) {
+ if (isWithinElement(localName, ELEM_OBJ_IDENTIFIER, ELEM_HEADER)) {
+// identifierType attribute is mandatory
+ this.currentValue = new StringBuilder();
+ }
+ this.parents.push(localName);
+ }
+ }
+
+ @Override
+ public void endElement(String uri, String localName, String qName)
+ throws SAXException {
+ if (this.recordId == null) {
+ this.parents.pop();
+ if (isWithinElement(localName, ELEM_OBJ_IDENTIFIER, ELEM_HEADER)) {
+ this.recordId = InfoSpaceConstants.ROW_PREFIX_RESULT + this.currentValue.toString().trim();
+ }
+// resetting current value;
+ this.currentValue = null;
+ }
+ }
+
+ @Override
+ public void endDocument() throws SAXException {
+ parents.clear();
+ parents = null;
+ }
+
+ @Override
+ public void characters(char[] ch, int start, int length)
+ throws SAXException {
+ if (this.currentValue!=null) {
+ this.currentValue.append(ch, start, length);
+ }
+ }
+
+ /**
+ * @return record identifier
+ */
+ public String getRecordId() {
+ return recordId;
+ }
+
+ // ------------------------ PRIVATE --------------------------
+
+ private boolean isWithinElement(String localName, String expectedElement, String expectedParent) {
+ return localName.equals(expectedElement) && !this.parents.isEmpty() &&
+ expectedParent.equals(this.parents.peek());
+ }
+
+
+}
+
diff --git a/dhp-wf/dhp-wf-import/src/main/java/eu/dnetlib/dhp/wf/importer/mdrecord/MDStoreRecordsImporter.java b/dhp-wf/dhp-wf-import/src/main/java/eu/dnetlib/dhp/wf/importer/mdrecord/MDStoreRecordsImporter.java
new file mode 100644
index 000000000..461093851
--- /dev/null
+++ b/dhp-wf/dhp-wf-import/src/main/java/eu/dnetlib/dhp/wf/importer/mdrecord/MDStoreRecordsImporter.java
@@ -0,0 +1,157 @@
+package eu.dnetlib.dhp.wf.importer.mdrecord;
+
+import java.io.IOException;
+import java.io.StringReader;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import javax.xml.parsers.SAXParser;
+import javax.xml.parsers.SAXParserFactory;
+
+import com.google.common.base.Preconditions;
+import eu.dnetlib.dhp.common.WorkflowRuntimeParameters;
+import eu.dnetlib.dhp.common.counter.NamedCounters;
+import eu.dnetlib.dhp.common.counter.NamedCountersFileWriter;
+import eu.dnetlib.dhp.common.java.PortBindings;
+import eu.dnetlib.dhp.common.java.Process;
+import eu.dnetlib.dhp.common.java.io.DataStore;
+import eu.dnetlib.dhp.common.java.io.FileSystemPath;
+import eu.dnetlib.dhp.common.java.porttype.AvroPortType;
+import eu.dnetlib.dhp.common.java.porttype.PortType;
+import eu.dnetlib.dhp.importer.schemas.ImportedRecord;
+import eu.dnetlib.dhp.importer.schemas.RecordFormat;
+import eu.dnetlib.dhp.wf.importer.facade.MDStoreFacade;
+import eu.dnetlib.dhp.wf.importer.facade.ServiceFacadeUtils;
+import org.apache.avro.file.DataFileWriter;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.log4j.Logger;
+import org.xml.sax.InputSource;
+
+import static eu.dnetlib.dhp.common.WorkflowRuntimeParameters.OOZIE_ACTION_OUTPUT_FILENAME;
+import static eu.dnetlib.dhp.wf.importer.ImportWorkflowRuntimeParameters.IMPORT_MDSTORE_IDS_CSV;
+import static eu.dnetlib.dhp.wf.importer.ImportWorkflowRuntimeParameters.IMPORT_MDSTORE_RECORD_MAXLENGTH;
+
+/**
+ * {@link MDStoreFacade} based metadata records importer.
+ * @author mhorst
+ *
+ */
+public class MDStoreRecordsImporter implements Process {
+
+ protected static final String COUNTER_NAME_TOTAL = "TOTAL";
+
+ protected static final String COUNTER_NAME_SIZE_EXCEEDED = "SIZE_EXCEEDED";
+
+ protected static final String PORT_OUT_MDRECORDS = "mdrecords";
+
+ private static final Logger log = Logger.getLogger(MDStoreRecordsImporter.class);
+
+ private final static int progressLogInterval = 100000;
+
+ private final NamedCountersFileWriter countersWriter = new NamedCountersFileWriter();
+
+ private final Map outputPorts = new HashMap();
+
+
+ //------------------------ CONSTRUCTORS -------------------
+
+ public MDStoreRecordsImporter() {
+ outputPorts.put(PORT_OUT_MDRECORDS, new AvroPortType(ImportedRecord.SCHEMA$));
+ }
+
+ //------------------------ LOGIC --------------------------
+
+ @Override
+ public Map getInputPorts() {
+ return Collections.emptyMap();
+ }
+
+ @Override
+ public Map getOutputPorts() {
+ return outputPorts;
+ }
+
+ @Override
+ public void run(PortBindings portBindings, Configuration conf,
+ Map parameters) throws Exception {
+
+ Preconditions.checkArgument(parameters.containsKey(IMPORT_MDSTORE_IDS_CSV),
+ "unknown mdstore identifier, required parameter '%s' is missing!", IMPORT_MDSTORE_IDS_CSV);
+ String mdStoreIdsCSV = parameters.get(IMPORT_MDSTORE_IDS_CSV);
+ int recordMaxLength = parameters.containsKey(IMPORT_MDSTORE_RECORD_MAXLENGTH)?
+ Integer.parseInt(parameters.get(IMPORT_MDSTORE_RECORD_MAXLENGTH)):Integer.MAX_VALUE;
+
+ NamedCounters counters = new NamedCounters(new String[] { COUNTER_NAME_TOTAL, COUNTER_NAME_SIZE_EXCEEDED });
+
+ if (StringUtils.isNotBlank(mdStoreIdsCSV) && !WorkflowRuntimeParameters.UNDEFINED_NONEMPTY_VALUE.equals(mdStoreIdsCSV)) {
+
+ String[] mdStoreIds = StringUtils.split(mdStoreIdsCSV, WorkflowRuntimeParameters.DEFAULT_CSV_DELIMITER);
+
+ try (DataFileWriter recordWriter = getWriter(FileSystem.get(conf), portBindings)) {
+
+ MDStoreFacade mdStoreFacade = ServiceFacadeUtils.instantiate(parameters);
+
+ SAXParserFactory parserFactory = SAXParserFactory.newInstance();
+ parserFactory.setNamespaceAware(true);
+ SAXParser saxParser = parserFactory.newSAXParser();
+ MDRecordHandler mdRecordHandler = new MDRecordHandler();
+
+ long startTime = System.currentTimeMillis();
+ int currentCount = 0;
+
+ for (String mdStoreId : mdStoreIds) {
+ for (String mdRecord : mdStoreFacade.deliverMDRecords(mdStoreId)) {
+ if (!StringUtils.isEmpty(mdRecord)) {
+ if (mdRecord.length() <= recordMaxLength) {
+ saxParser.parse(new InputSource(new StringReader(mdRecord)), mdRecordHandler);
+ String recordId = mdRecordHandler.getRecordId();
+ if (StringUtils.isNotBlank(recordId)) {
+ recordWriter.append(
+ ImportedRecord.newBuilder()
+ .setId(recordId)
+ .setBody(mdRecord)
+ .setFormat(RecordFormat.XML)
+ .build());
+ counters.increment(COUNTER_NAME_TOTAL);
+ } else {
+ log.error("skipping, unable to extract identifier from record: " + mdRecord);
+ }
+ } else {
+ counters.increment(COUNTER_NAME_SIZE_EXCEEDED);
+ log.error("mdstore record maximum length (" + recordMaxLength + "): was exceeded: "
+ + mdRecord.length() + ", record content:\n" + mdRecord);
+ }
+
+ } else {
+ log.error("got empty metadata record from mdstore: " + mdStoreId);
+ }
+ currentCount++;
+ if (currentCount % progressLogInterval == 0) {
+ log.info("current progress: " + currentCount + ", last package of " + progressLogInterval
+ + " processed in " + ((System.currentTimeMillis() - startTime) / 1000) + " secs");
+ startTime = System.currentTimeMillis();
+ }
+ }
+ }
+ log.info("total number of processed records: " + currentCount);
+ }
+ }
+
+ if (counters.currentValue(COUNTER_NAME_TOTAL)==0) {
+ log.warn("parsed 0 metadata records from mdstores: " + mdStoreIdsCSV);
+ }
+ countersWriter.writeCounters(counters, System.getProperty(OOZIE_ACTION_OUTPUT_FILENAME));
+
+ }
+
+ /**
+ * Provides {@link ImportedRecord} writer consuming records.
+ */
+ protected DataFileWriter getWriter(FileSystem fs, PortBindings portBindings) throws IOException {
+ return DataStore.create(
+ new FileSystemPath(fs, portBindings.getOutput().get(PORT_OUT_MDRECORDS)), ImportedRecord.SCHEMA$);
+ }
+
+}
diff --git a/dhp-wf/dhp-wf-import/src/main/java/eu/dnetlib/dhp/wf/importer/mdrecord/MongoRecordImporter.java b/dhp-wf/dhp-wf-import/src/main/java/eu/dnetlib/dhp/wf/importer/mdrecord/MongoRecordImporter.java
new file mode 100644
index 000000000..4776af22b
--- /dev/null
+++ b/dhp-wf/dhp-wf-import/src/main/java/eu/dnetlib/dhp/wf/importer/mdrecord/MongoRecordImporter.java
@@ -0,0 +1,48 @@
+package eu.dnetlib.dhp.wf.importer.mdrecord;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import eu.dnetlib.dhp.common.java.PortBindings;
+import eu.dnetlib.dhp.common.java.Process;
+import eu.dnetlib.dhp.common.java.porttype.PortType;
+import org.apache.hadoop.conf.Configuration;
+
+public class MongoRecordImporter implements Process {
+
+ private final Map outputPorts = new HashMap();
+
+ @Override
+ public Map getInputPorts() {
+ return Collections.emptyMap();
+ }
+
+ @Override
+ public Map getOutputPorts() {
+ return outputPorts;
+ }
+
+ @Override
+ public void run(final PortBindings portBindings, final Configuration conf, final Map parameters) throws Exception {
+
+ /*
+ SparkSession spark = SparkSession.builder()
+ .master("local")
+ .appName("MongoSparkConnectorIntro")
+ .config("spark.mongodb.input.uri", "mongodb://127.0.0.1/test.myCollection")
+ .config("spark.mongodb.output.uri", "mongodb://127.0.0.1/test.myCollection")
+ .getOrCreate();
+
+ // Create a JavaSparkContext using the SparkSession's SparkContext object
+ JavaSparkContext jsc = new JavaSparkContext(spark.sparkContext());
+
+ // More application logic would go here...
+
+ jsc.close();
+ */
+
+ }
+
+
+}
diff --git a/dhp-wf/dhp-wf-import/src/main/resources/eu/dnetlib/dhp/wf/importer/mdrecord/oozie_app/workflow.xml b/dhp-wf/dhp-wf-import/src/main/resources/eu/dnetlib/dhp/wf/importer/mdrecord/oozie_app/workflow.xml
new file mode 100644
index 000000000..dee61af91
--- /dev/null
+++ b/dhp-wf/dhp-wf-import/src/main/resources/eu/dnetlib/dhp/wf/importer/mdrecord/oozie_app/workflow.xml
@@ -0,0 +1,124 @@
+
+
+
+
+ mdstore_facade_factory_classname
+ eu.dnetlib.dhp.wf.importer.facade.WebServiceMDStoreFacadeFactory
+ ServiceFacadeFactory implementation class name producing eu.dnetlib.dhp.wf.importer.facade.WebServiceMDStoreFacade
+
+
+ mdstore_service_location
+ $UNDEFINED$
+ mdstore service location
+
+
+ mdstore_ids_csv
+ $UNDEFINED$
+ comma separated mdstore identifiers
+
+
+ mdstore_record_maxlength
+ 500000
+ maximum allowed length of mdstore record
+
+
+ output
+ ImportedRecord avro datastore output holding mdrecords
+
+
+ output_report_root_path
+ base directory for storing reports
+
+
+ output_report_relative_path
+ import_mdrecord
+ directory for storing report (relative to output_report_root_path)
+
+
+ dnet_service_client_read_timeout
+ 60000
+ DNet service client reading timeout (expressed in milliseconds)
+
+
+ dnet_service_client_connection_timeout
+ 60000
+ DNet service client connection timeout (expressed in milliseconds)
+
+
+ resultset_client_read_timeout
+ 60000
+ result set client reading timeout (expressed in milliseconds)
+
+
+ resultset_client_connection_timeout
+ 60000
+ result set client connection timeout (expressed in milliseconds)
+
+
+ report_properties_prefix
+ import.mdrecord
+ report entry related to total number of imported records
+
+
+
+
+ ${jobTracker}
+ ${nameNode}
+
+
+ mapreduce.job.queuename
+ ${queueName}
+
+
+ oozie.launcher.mapred.job.queue.name
+ ${oozieLauncherQueueName}
+
+
+
+
+
+
+
+
+
+
+
+
+ eu.dnetlib.dhp.common.java.ProcessWrapper
+ eu.dnetlib.dhp.wf.importer.mdrecord.MDStoreRecordsImporter
+
+ -Pimport.mdstore.service.location=${mdstore_service_location}
+ -Pimport.mdstore.ids.csv=${mdstore_ids_csv}
+ -Pimport.mdstore.record.maxlength=${mdstore_record_maxlength}
+
+ -Pimport.resultset.client.read.timeout=${resultset_client_read_timeout}
+ -Pimport.resultset.client.connection.timeout=${resultset_client_connection_timeout}
+ -Pdnet.service.client.read.timeout=${dnet_service_client_read_timeout}
+ -Pdnet.service.client.connection.timeout=${dnet_service_client_connection_timeout}
+
+ -Pimport.facade.factory.classname=${mdstore_facade_factory_classname}
+ -Omdrecords=${output}
+
+
+
+
+
+
+
+
+ eu.dnetlib.dhp.common.java.ProcessWrapper
+ eu.dnetlib.dhp.common.report.ReportGenerator
+ -Preport.${report_properties_prefix}.total=${wf:actionData('mdrecord-importer')['TOTAL']}
+ -Preport.${report_properties_prefix}.invalid.sizeExceeded=${wf:actionData('mdrecord-importer')['SIZE_EXCEEDED']}
+ -Oreport=${output_report_root_path}/${output_report_relative_path}
+
+
+
+
+
+
+ Unfortunately, the process failed -- error message:
+ [${wf:errorMessage(wf:lastErrorNode())}]
+
+
+
diff --git a/dhp-wf/dhp-wf-import/target/classes/eu/dnetlib/dhp/wf/importer/DataFileRecordReceiver.class b/dhp-wf/dhp-wf-import/target/classes/eu/dnetlib/dhp/wf/importer/DataFileRecordReceiver.class
new file mode 100644
index 0000000000000000000000000000000000000000..5bd02d36034998ac1745f42ba3c408b55f779607
GIT binary patch
literal 1161
zcmb7D+iuf95IviuF*czsX%nE~)|P^k2v!Il5Q*}_MM8?G)JBDuK5U$AvXyIFTPKCj
zLgFIvzz6VAh*{@SOr$D(@XpT6&Y3yKyTAYZ`~~1S>JFB0-$o9b4z{pu;emx63lACc
zXIjNlGwifO-RB|_-av9O(jh-mzT~e(EZ)-QLvpOaVDGDbAmX8x47EjSnw?IgMTPmM
z3RK);aNR}98r1`a+^euheOB5kkne{lU8y^w>r|&uk+td7W#G`qh;D)GHHvh}
zX2?3VA7HimH)Ot$m&G!z%VdFd!YWa987t(K$EUH1>*P`01ihHbt<7@Itn)hUN>ras
z`W{aD+O@C9)W0Kp-qwr=Y|zk^e_GnJmNstSCV`!|G`Vq0le|oaT8bRZscxK_$VHb
zKuA3B0elqVcylRL`w-|p*fXB-%sFSgzkdJt3E(-(Ii&G0ixgI!wU)tp29E^ht@gDU
z>UKRG1d)JvCooqvfr)AYp4SL%N2J5!OYa(qo%ew-pdf3(i
z3k?(Kt-)bS+oo#uc(T|Cebw7l);NEBn2NgQNTAlx1KAFA)H5yF?)K&JzBGsZ(6Z{Z
z$$RaGw$0VSw0kD%#!%UpNRML#0&6n?6)2rX^Qk{URKF0EYO}dnBc94EFeb^_9n%R^
zG_X|Z$xKR*ZaTS(9UYA&nM^L}l{vf2c%A~wUgJOwmF%gYBez=z+K=K%%`t<5y2+f0
z3na-zAou#Cultb+gQE;Q>g@5@_1&sAU!1-6A9I|DklP6d*4OSe39L^Z7O9;-^QR7S
z0hf?1U=EoAuH%M4^&-zmVB?>!3oOOjnNYsno@#~r{Jqlr-2@U29jzp7f~B9M=hIJc
z&sQFQhp1$~A@L^uZstgew#P
literal 0
HcmV?d00001
diff --git a/dhp-wf/dhp-wf-import/target/classes/eu/dnetlib/dhp/wf/importer/ImportWorkflowRuntimeParameters.class b/dhp-wf/dhp-wf-import/target/classes/eu/dnetlib/dhp/wf/importer/ImportWorkflowRuntimeParameters.class
new file mode 100644
index 0000000000000000000000000000000000000000..13a46a67ad341470bf894e4ec54f0dfb22761bd7
GIT binary patch
literal 2706
zcmbW3YnRhR7{{N11tM}25nVu7U=i5W=`JEFAetuA-LPp=leDmSiD@&t4VxxCxh&$h
z@|EZK!Vlnw^7tf8H?uA3Irc>-zs$eSedhGfzkdG%0Pcf#5{`m;0@w*?5xAwbJErBh
zn$cnwGZf}_EVIW9xp-{qva+RXwnN~&{?gm`8le~MHtZ}3qTNRXPAhSgW?q!_yfEhy
zY!Fzg?Acpd$5olm9HzR>?b^Nsn*^>@D#z;Dj;pgC(_LrNV)mw~s}g7gu2l-TQ^upZ
zs_e+N*|ijAyNcbD;34dx>b3}6t#oZ>ZD3ZWy6ryGoK3geL95J`
zKqqj!(kQ0kFvD>V9GlIjOtBpkqnkKx6IhuM9~26cozok2HdEuJ#&Julu5Kz4m;`Rl
z7(FD?)XIBj-dbCJ?zS|YxwiI#x$UL|-x9b!V+~y!OmQ^Ra2>79%&sGWMd0#`%wn=y
z3v2=xD?wYGM0*ej4#l)d(>Ivnkum#ZzXgnz13l4|V-pvLdU@#RVq~
z)6!)d=kiAa%Mf6rjzvP1qCub5g4Wb>M@Mys4IEnXpQT9L5jMiN|){h
zW9st|6Y**Pb^SZM{whh9@+UFQIiGH%JlPMh-*pnx@TQ7{58}k97-h(#
z-E3G7M39DYyqAyZypgH@eXLDC5543y9h66HnwqeI>5TKNEJ1|F1P?_OJT@y2vy^9a
zfJ1=&Je=doKgr%yuEja=*xThr#R*&(_PvzTtT^fv4?^tYzkzyccf5h;bBYVj59Sei
zX*vn?*n>6v;AOJDwg&5P1pkKsT!uwhfMbA{4bl-ffh-C97V=4f-$p(q@M+{T0-r@b
zC-6JS=LLQj`GUajAzu{uedG@WUPQhma1rknfj>mPD)2SrC4sLa-w^mF^0L6UkUtXm
zHu4>T?;@`Vyo&s>z@H#ffg8wc0&v!
zhR~hcb(w|QiziOfv^h#1(}#2W}C%
literal 0
HcmV?d00001
diff --git a/dhp-wf/dhp-wf-import/target/classes/eu/dnetlib/dhp/wf/importer/facade/AbstractResultSetAwareWebServiceFacade.class b/dhp-wf/dhp-wf-import/target/classes/eu/dnetlib/dhp/wf/importer/facade/AbstractResultSetAwareWebServiceFacade.class
new file mode 100644
index 0000000000000000000000000000000000000000..abb39ad860b7920540735e30f0c2d958b3c9e53d
GIT binary patch
literal 3431
zcmb_e+jA3D82=rTbdz+`(kqmk0j;DdW|50PTMDJzS^}-4v{t;ZNlw!(o87RxX-mC}
z*BPID^UXg%N1d?+WN`G!8OJA|eA2(e7xg=vZMtbHQ${nHv*&!@cfRxcuIK#x*LU9o
zID(I(XvVl0o{!=xG7;>-goJBRgakQ+Nij@Gcp-|2phPe&WL}hTJ&czmyd1?Vcr}U}
zP=$hQBVx#fk#7XYOc;eQX2a0Jm-Z0z+5YvS*lMOTf2GM1g=IfvMhj8-sIr)-fxU}AzO?H0Gox|89~
zRj%eIv?4dl4g(uy*imH-n}$IOG}EXGv{aenYJq38CDO=77#apNLvschdJ{eX!@6qQ
z{XRm*u{5L5KT5xmo=Jw_u$da|JGipxFb7d%N
zJC>Rw2Y8+sT2L)M#j}};FVDMTq%)M$)uknd7M~#%%Lswz_%y*4j;alSXD$&vu^MM@
z_lO&~Im%1cnX;zmxy5kIJ$F%AEb7XFO@B3?r!d&$NoDHr@Hr!2GBtzjnBf*Ta@^ZU
z$U6nBH80*St$Olkt>XrKmU)mYF5;QERg7-3fb-F
zZxFq9KYy)GXfstxVql`L{_w$i5<@g&maQB=uZfE2-CVw=MET1|!jfUbkx-T~j|CY|
zAkJ`9a34F9WGv#gj3vCraL89!+@RtVS?HLiZY!he;)3n@nA~UT^V}LKmUJ2C@Vbn>
z*vGJ|2EZB3$kYCD%beHTR3Emv;}9k8R63F!pE0d?x7`=Fs7JElIe%jmmG>~sC)Vx#>zUE_vB$Z}kp=hdd%y^)ZoYOQ(8
z=*H7B_KP8b9)^p)Q2wVLNo{b=<8Bex#Gq1&bn_%c%MVj=crtA@Ww_c>xBbgCN7oo_
ztz3Bw2kKftf6t>XNUvuyT=H-9xYheQWZo@TBPK)r=W>
ztaSKQc&$~$y~S;cwfA?+Y6AFf@R$pJl|Wfj5)YH(8)$trF7N2zUV26~(>pGNR?!1#
z)+$;N7@nm07Cc33Dh!uSwc&oLJm?!jkossZ$s7RKpODZ*)IXAU!IH@pY`KTPGJ+2f
zn!byMKDkRiumWkSOI|^ED)|5rB1e1Ox#9u3!(ey2w;j`Zn>ma!|@K_GX3Lna3C42_fMpz%1Z+Q@+?upNEmzhTk!c6ltA7l;o$N;2^z)$*Fi2hv+M2
zhv~`9+}Tk)N6r_tV-%QxJ3CIZkUJAWliZmITg07-z%>%mp9o$C=!
zIO!TGR|GuM1e7(308q)l3nX=sTtzL7XhWoaRqf6%*Qvd}DYf+a-B8`G_3CbHQr#t_
atLWB@-u=ZobbM3lws>Y=ri>8fH}((h0*LMa
literal 0
HcmV?d00001
diff --git a/dhp-wf/dhp-wf-import/target/classes/eu/dnetlib/dhp/wf/importer/facade/ISLookupFacade.class b/dhp-wf/dhp-wf-import/target/classes/eu/dnetlib/dhp/wf/importer/facade/ISLookupFacade.class
new file mode 100644
index 0000000000000000000000000000000000000000..7689db9d69df5d0b1e2c8d834ac9e17251f61d08
GIT binary patch
literal 378
zcma)&K~4fe5Ji8%$OsBC@e;V9S-3EPxRZp0M3X*2PfyLzFm_Lx9`tN3Jb;HXG!vt^
z(%r90)vN#Oc_)hC5bXub;dtOvQxpc(qY`B`Rf0A%i!zoZXzXmVv#>
zDxMJr53ic*V4U5h7!vx@lvahtN6h@6nE46%>)QPhEnVX^FIr32OSy7x+tdrCRnC*6
zKf;t7ky{Rx$wa=c#Y-W~bM1WeCyI7Z{N5v6{Um$LHmn2T?r(}@zc_A
amjES%E|OTC#VU#Y9@3~fIFGvDs=)!r)NWJ&
literal 0
HcmV?d00001
diff --git a/dhp-wf/dhp-wf-import/target/classes/eu/dnetlib/dhp/wf/importer/facade/MDStoreFacade.class b/dhp-wf/dhp-wf-import/target/classes/eu/dnetlib/dhp/wf/importer/facade/MDStoreFacade.class
new file mode 100644
index 0000000000000000000000000000000000000000..ec0403005b2c247fe581bb9b5c9d772afe6673b6
GIT binary patch
literal 379
zcma)&O-=(b427S|(4pl=h*Oje*|0!r#i~?7qCz4k$RzHNDkP&cGjKK*9DqY1(h-8P
zg59%ae}2#JAFpo!E^$)gu*4DJ)UdN1$L0Kib%<<{Fur-w+$0CeQH9?_Qa|m6u;JT*q>bY)tYr8T>TA^96Otx^vLhPLjqxt
bp9xSw*h7)4{ah8fKSY_;00&u*dNuw8CAn^F
literal 0
HcmV?d00001
diff --git a/dhp-wf/dhp-wf-import/target/classes/eu/dnetlib/dhp/wf/importer/facade/ObjectStoreFacade.class b/dhp-wf/dhp-wf-import/target/classes/eu/dnetlib/dhp/wf/importer/facade/ObjectStoreFacade.class
new file mode 100644
index 0000000000000000000000000000000000000000..e9e7a2cbdf9d0dc04cd59e3b783f1d11be85610a
GIT binary patch
literal 389
zcma)&Jx;?w5QX0a6O#ZD6=whnwtxaDC>@cAB2nxK*7n3~w5+Yo+Hf`s4#1%zCJur~
z6rp?b_syI6T)%$+xWy>Naf%bds96=aCjSQA@4&w|PBw5|H6iDzbwWGe8q~20JTVM5-0cTx4y_eO9row+K7r7a
aw*r(94vU6#hE#0&iUs5Gktw~egSZV?IhwT>rl~Q!@#D2Erzrwl;n0tC{Mc1ZR3K0T{A47
zyN>5`$M0}E5OG3;A4B3)I?_L5sO~lI`IwtFclu__SJLU%XDaQ1awoiNi~4?tA$IQe
z1jA}mI^rf6c7T}#_BgNVJcaO=I
z8z_<@wXe#2VTW2u3QI`okk_HWaQt7N3~OKBKHXy%F?wQvp^O9!S{EpbU=dN;)8!PG
zsnVc4N~=+=9Xx?OhD|FJlhh2PX*h#b+U2{ay_4@
s)}9dgNnIytxsaOw==!84rc});%wt7=vWmni{lU_@=wC*WltfT^2Ub0y-v9sr
literal 0
HcmV?d00001
diff --git a/dhp-wf/dhp-wf-import/target/classes/eu/dnetlib/dhp/wf/importer/facade/ServiceFacadeFactory.class b/dhp-wf/dhp-wf-import/target/classes/eu/dnetlib/dhp/wf/importer/facade/ServiceFacadeFactory.class
new file mode 100644
index 0000000000000000000000000000000000000000..18e211bf9c36ee6ecde15b23d2feda618147c488
GIT binary patch
literal 347
zcmZutyHdhX5Iu`X1T1W=ZJ_Zs4h3N-t#mAsA6Sx0TqfLPE*lts&B71xqZ}`RA|n>N
zv*+xdeSQCYegSyGZG=&TF<~rjr|eW%E#YDEvf(|OMx`{bT*uQBF4-04ZMCtP#)M%a
zg=1}eP|W`*79ByNUR)7hGRt_HrlHYfk&<^^!d-iL=h6Xz&xoeZTS-
zw3qe?jPxY=+o^zGa;D>YVc;xoO8bQ{l0n6nLqyh<#zyw@q>)12%0cmj|+{M
z!9^J_MQ{n1Wn8Jp%Mr-98bKpok@2b+Uz4FlFbiEqq7f}HWF+g65_0JXI$(;25y2eh
zBgi1z2n)7&5fRK>F$}45FWD=hJ=>5X&IA+?5yETYqJ@y
zw8m{+%gktwDdxq+u$wmX5{|}=f|{@lH)GDKiF8g~OsZx!XFIOps7X!N5{5cuI18q3
zobqNDTr-m&lF(>cc~`SsQ*-HJcXxbFThP=3chw0kH`L?LGn$oB&(6*nx=Wo}J)_;Y
zAz^)Bb1b9fJ$t%MgHAzrZAZe6fP`ZtGt^hd$^x|2oG(R1jq4Hy)|v_P5t_6mF@uc(
zk0-C`M$R>Di+hor=4e@ia1gB)U#OI&giV!Uc-c%+UZCzF;vq2QI;NGPwf4$VT+c0+
zV?#jNGcp_-q5Q7E@+vs^LeD&5=EKv|L&C|C%B+S1K8t~`ToHOrQ?91Zv)W!uvGLi<
z1gwM}0c)5tQO^67F@v|tafKngO-LWJID7>Wd$Y7tkGeWhMAU(1qoN54A&~Xu+gXNf=fC(rQmJ6Lt-rl?zOLicky0DK0aN@cwfN>_>c$J8uK~j2diKYx+Juf
z{RI-$jAi1F+PYqF93v53OtZex@?l3FFX1qI{Q)QjABTg;`6C4$2iYGjRhz1
zmlw$gynmz&UPro$OddS*S0&!v0lM^gCa(sL$zrIonLokRdGc7$aPdUyx%`lIUWDYN
z?f7T#p4Bu1XOP!xy
z6pzE8lhVB@wn`d-@TSv_y(q|eNL&NEsw=UCgWWxABw(i48K~Vu&sz!gW~o`)>89K-
z%S}X6a$3tz8rRr4yuY%XVauCrSe6g=5oK}(N}etvQTbu;wpm@$fupO|I|L=fszzZ5
z2Y5Gc;$@>%CCZpJRYE4~1Bqm#1+!9u7J%2FGR
z)pBhj)>?BH;SlblHr{&&bvM!UE96V9^|#S*j|-8BzM9|AxCCW#A77f5(0m^)gW)aV
z*j=m(;nsvF-r9E?ZTIM|1J7a<>uFvD(?zG^sbDYK(9PXm2G+-CKL(h=DE5<<4tT+i
zF!Wu_ZZG9RlxRl}VhneHHhQTOreq`bQ8Oe0r}lo13T+&~L3%#KS&UM?jl(4yM@lx1
za@W^9#&N*Y4*iV|8Bg=iXb?kO3Dc5{VZMd%j0f3>51Hpf;9VcPgAI4E@s=l3hWxmf
zg%5c>juQv0usDi{pkdmMa`z{WA|e`WkH?mZ+vASJl
z7%A9JRX3W3S>rmbT-(tLcD2f#d4@zzSi)Uk7#%O{7*7nnZdg^l=sLox&acwn%ESYP
zc-}5^hIBz#eBEo5xU*%H>a>z6*rri`U^qg~!^OB;6D@{ZfqQz{;%;4(^m47K?^Sfs
zXxfg;9lc_hMw#mmd8x>qC&J__#e!|`dd($YLCDgrKp{Q2!nUNdV+d(6+;)!}HKPdeGr5ukIGPd*sw;d-*MA2HMI^qsoioCt}Z582Ve>aLn4K
zV^@TX$oO00I1vH9LhNO%h#{lxi_su)i!>B?U9>1qG8($m5PA@r`^`JH={30JuG{XC
z?OEl;rzUSoVr1+3|F9E1r#{^@+}d4_JI{zq(KXE7HKXY#l8TTDj6rShr=<24Mb$D~
z&mru&E~(}s^ux+pAjrv_No4)vi;c@FdlBW(pPoe2$df
z2~iE}Z+>CE3-`yWR4v+`WAY_G_vc^ZIJ$9K7B0h;NJDgoDuof;OQDFZ6z+p!l<(d|
z$8t&GEY2~UeY2Zxd!k0vk9Zw;bt@A)^dymN;9Ch)Q6
z?Rq`