diff --git a/dhp-common/src/main/java/eu/dnetlib/dhp/oa/merge/GroupEntitiesSparkJob.java b/dhp-common/src/main/java/eu/dnetlib/dhp/oa/merge/GroupEntitiesSparkJob.java
index 2a31b264c4..a85afaf258 100644
--- a/dhp-common/src/main/java/eu/dnetlib/dhp/oa/merge/GroupEntitiesSparkJob.java
+++ b/dhp-common/src/main/java/eu/dnetlib/dhp/oa/merge/GroupEntitiesSparkJob.java
@@ -14,7 +14,7 @@ import java.util.stream.Collectors;
 import org.apache.commons.io.IOUtils;
 import org.apache.spark.SparkConf;
 import org.apache.spark.api.java.function.MapFunction;
-import org.apache.spark.api.java.function.ReduceFunction;
+import org.apache.spark.api.java.function.MapGroupsFunction;
 import org.apache.spark.sql.*;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -135,10 +135,10 @@ public class GroupEntitiesSparkJob {
 					.applyCoarVocabularies(entity, vocs),
 				OAFENTITY_KRYO_ENC)
 			.groupByKey((MapFunction<OafEntity, String>) OafEntity::getId, Encoders.STRING())
-			.reduceGroups((ReduceFunction<OafEntity>) MergeUtils::checkedMerge)
+			.mapGroups((MapGroupsFunction<String, OafEntity, OafEntity>) MergeUtils::mergeGroup, OAFENTITY_KRYO_ENC)
 			.map(
-				(MapFunction<Tuple2<String, OafEntity>, Tuple2<String, OafEntity>>) t -> new Tuple2<>(
-					t._2().getClass().getName(), t._2()),
+				(MapFunction<OafEntity, Tuple2<String, OafEntity>>) t -> new Tuple2<>(
+					t.getClass().getName(), t),
 				Encoders.tuple(Encoders.STRING(), OAFENTITY_KRYO_ENC));
 
 		// pivot on "_1" (classname of the entity)
diff --git a/dhp-common/src/main/java/eu/dnetlib/dhp/schema/oaf/utils/MergeUtils.java b/dhp-common/src/main/java/eu/dnetlib/dhp/schema/oaf/utils/MergeUtils.java
index 316891faf5..c95c31c512 100644
--- a/dhp-common/src/main/java/eu/dnetlib/dhp/schema/oaf/utils/MergeUtils.java
+++ b/dhp-common/src/main/java/eu/dnetlib/dhp/schema/oaf/utils/MergeUtils.java
@@ -5,7 +5,11 @@ import static com.google.common.base.Preconditions.checkArgument;
 import static org.apache.commons.lang3.ObjectUtils.firstNonNull;
 
 import java.text.ParseException;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
 import java.time.ZoneId;
+import java.time.format.DateTimeFormatter;
+import java.time.format.DateTimeParseException;
 import java.util.*;
 import java.util.function.BinaryOperator;
 import java.util.function.Function;
@@ -19,6 +23,7 @@ import org.apache.commons.lang3.tuple.Pair;
 import com.github.sisyphsu.dateparser.DateParserUtils;
 import com.google.common.base.Joiner;
 
+import eu.dnetlib.dhp.oa.merge.AuthorMerger;
 import eu.dnetlib.dhp.schema.common.AccessRightComparator;
 import eu.dnetlib.dhp.schema.common.ModelConstants;
 import eu.dnetlib.dhp.schema.common.ModelSupport;
@@ -26,6 +31,37 @@ import eu.dnetlib.dhp.schema.oaf.*;
 
 public class MergeUtils {
 
+	public static <T extends Oaf> T mergeGroup(String s, Iterator<T> oafEntityIterator) {
+		TreeSet<T> sortedEntities = new TreeSet<>((o1, o2) -> {
+			int res = 0;
+
+			if (o1.getDataInfo() != null && o2.getDataInfo() != null) {
+				res = o1.getDataInfo().getTrust().compareTo(o2.getDataInfo().getTrust());
+			}
+
+			if (res == 0) {
+				if (o1 instanceof Result && o2 instanceof Result) {
+					return ResultTypeComparator.INSTANCE.compare((Result) o1, (Result) o2);
+				}
+			}
+
+			return res;
+		});
+
+		while (oafEntityIterator.hasNext()) {
+			sortedEntities.add(oafEntityIterator.next());
+		}
+
+		T merged = sortedEntities.descendingIterator().next();
+
+		Iterator<T> it = sortedEntities.descendingIterator();
+		while (it.hasNext()) {
+			merged = checkedMerge(merged, it.next());
+		}
+
+		return merged;
+	}
+
 	public static <T extends Oaf> T checkedMerge(final T left, final T right) {
 		return (T) merge(left, right, false);
 	}
@@ -34,7 +70,7 @@ public class MergeUtils {
 		return merge(left, right, false);
 	}
 
-	public static Oaf merge(final Oaf left, final Oaf right, boolean checkDelegatedAuthority) {
+	static Oaf merge(final Oaf left, final Oaf right, boolean checkDelegatedAuthority) {
 		if (sameClass(left, right, OafEntity.class)) {
 			return mergeEntities(left, right, checkDelegatedAuthority);
 		} else if (sameClass(left, right, Relation.class)) {
@@ -190,7 +226,7 @@ public class MergeUtils {
 			.concat(h.stream(), l.stream())
 			.filter(Objects::nonNull)
 			.distinct()
-			.collect(Collectors.toMap(keyExtractor, v -> v, merger))
+			.collect(Collectors.toMap(keyExtractor, v -> v, merger, LinkedHashMap::new))
 			.values());
 	}
 
@@ -226,7 +262,7 @@ public class MergeUtils {
 	}
 
 	// TODO review
-	private static List<KeyValue> mergeKeyValue(List<KeyValue> left, List<KeyValue> right, int trust) {
+	private static List<KeyValue> mergeByKey(List<KeyValue> left, List<KeyValue> right, int trust) {
 		if (trust < 0) {
 			List<KeyValue> s = left;
 			left = right;
@@ -268,8 +304,7 @@ public class MergeUtils {
 	 */
 	private static <T extends Oaf> T mergeOafFields(T merged, T enrich, int trust) {
 
-		// TODO: union of all values, but what does it mean with KeyValue pairs???
-		merged.setCollectedfrom(mergeKeyValue(merged.getCollectedfrom(), enrich.getCollectedfrom(), trust));
+		merged.setCollectedfrom(mergeByKey(merged.getCollectedfrom(), enrich.getCollectedfrom(), trust));
 		merged.setDataInfo(chooseDataInfo(merged.getDataInfo(), enrich.getDataInfo(), trust));
 		merged.setLastupdatetimestamp(max(merged.getLastupdatetimestamp(), enrich.getLastupdatetimestamp()));
 
@@ -289,16 +324,13 @@ public class MergeUtils {
 
 		merged.setOriginalId(unionDistinctListOfString(merged.getOriginalId(), enrich.getOriginalId()));
 		merged.setPid(unionDistinctLists(merged.getPid(), enrich.getPid(), trust));
-		// dateofcollection mettere today quando si fa merge
-		merged.setDateofcollection(chooseString(merged.getDateofcollection(), enrich.getDateofcollection(), trust));
-		// setDateoftransformation mettere vuota in dedup, nota per Claudio
+		merged.setDateofcollection(LocalDateTime.now().toString());
 		merged
 			.setDateoftransformation(
 				chooseString(merged.getDateoftransformation(), enrich.getDateoftransformation(), trust));
-		// TODO: was missing in OafEntity.merge
 		merged.setExtraInfo(unionDistinctLists(merged.getExtraInfo(), enrich.getExtraInfo(), trust));
-		// oaiprovenanze da mettere a null quando si genera merge
-		merged.setOaiprovenance(chooseReference(merged.getOaiprovenance(), enrich.getOaiprovenance(), trust));
+		// When merging records OAI provenance becomes null
+		merged.setOaiprovenance(null);
 		merged.setMeasures(unionDistinctLists(merged.getMeasures(), enrich.getMeasures(), trust));
 
 		return merged;
@@ -330,7 +362,7 @@ public class MergeUtils {
 		}
 
 		// TODO keyvalue merge
-		merge.setProperties(mergeKeyValue(merge.getProperties(), enrich.getProperties(), trust));
+		merge.setProperties(mergeByKey(merge.getProperties(), enrich.getProperties(), trust));
 
 		return merge;
 	}
@@ -345,73 +377,70 @@ public class MergeUtils {
 			merge.setProcessingchargecurrency(enrich.getProcessingchargecurrency());
 		}
 
-		// author = usare la stessa logica che in dedup
-		merge.setAuthor(chooseReference(merge.getAuthor(), enrich.getAuthor(), trust));
-		// il primo che mi arriva secondo l'ordinamento per priorita'
-		merge.setResulttype(chooseReference(merge.getResulttype(), enrich.getResulttype(), trust));
-		// gestito come il resulttype perche' e' un subtype
-		merge.setMetaResourceType(chooseReference(merge.getMetaResourceType(), enrich.getMetaResourceType(), trust));
-		// spostiamo nell'instance e qui prendo il primo che arriva
-		merge.setLanguage(chooseReference(merge.getLanguage(), enrich.getLanguage(), trust));
-		// country lasicamo,o cosi' -> parentesi sul datainfo
-		merge.setCountry(unionDistinctLists(merge.getCountry(), enrich.getCountry(), trust));
-		// ok
-		merge.setSubject(unionDistinctLists(merge.getSubject(), enrich.getSubject(), trust));
-		// union per priority quindi vanno in append
-		merge.setTitle(unionTitle(merge.getTitle(), enrich.getTitle(), trust));
-		// ok
-		merge.setRelevantdate(unionDistinctLists(merge.getRelevantdate(), enrich.getRelevantdate(), trust));
-		// prima trust e poi longest list
-		merge.setDescription(longestLists(merge.getDescription(), enrich.getDescription()));
-		// trust piu' alto e poi piu' vecchia
-		merge.setDateofacceptance(chooseReference(merge.getDateofacceptance(), enrich.getDateofacceptance(), trust));
-		// ok, ma publisher va messo ripetibile
-		merge.setPublisher(chooseReference(merge.getPublisher(), enrich.getPublisher(), trust));
-		// ok
-		merge.setEmbargoenddate(chooseReference(merge.getEmbargoenddate(), enrich.getEmbargoenddate(), trust));
-		// ok
+		merge.setAuthor(mergeAuthors(merge.getAuthor(), enrich.getAuthor(), trust));
+
+		// keep merge value if present
+		if (merge.getResulttype() == null) {
+			merge.setResulttype(enrich.getResulttype());
+			merge.setMetaResourceType(enrich.getMetaResourceType());
+		}
+
+		// should be an instance attribute, get the first non-null value
+		merge.setLanguage(coalesce(merge.getLanguage(), enrich.getLanguage()));
+
+		// distinct countries, do not manage datainfo
+		merge.setCountry(mergeQualifiers(merge.getCountry(), enrich.getCountry(), trust));
+
+		// distinct subjects
+		merge.setSubject(mergeStructuredProperties(merge.getSubject(), enrich.getSubject(), trust));
+
+		// distinct titles
+		merge.setTitle(mergeStructuredProperties(merge.getTitle(), enrich.getTitle(), trust));
+
+		merge.setRelevantdate(mergeStructuredProperties(merge.getRelevantdate(), enrich.getRelevantdate(), trust));
+
+		if (merge.getDescription() == null || merge.getDescription().isEmpty() || trust == 0) {
+			merge.setDescription(longestLists(merge.getDescription(), enrich.getDescription()));
+		}
+
+		merge
+			.setDateofacceptance(
+				mergeDateOfAcceptance(merge.getDateofacceptance(), enrich.getDateofacceptance(), trust));
+
+		merge.setPublisher(coalesce(merge.getPublisher(), enrich.getPublisher()));
+		merge.setEmbargoenddate(coalesce(merge.getEmbargoenddate(), enrich.getEmbargoenddate()));
 		merge.setSource(unionDistinctLists(merge.getSource(), enrich.getSource(), trust));
-		// ok
 		merge.setFulltext(unionDistinctLists(merge.getFulltext(), enrich.getFulltext(), trust));
-		// ok
 		merge.setFormat(unionDistinctLists(merge.getFormat(), enrich.getFormat(), trust));
-		// ok
 		merge.setContributor(unionDistinctLists(merge.getContributor(), enrich.getContributor(), trust));
 
-		// prima prendo l'higher trust, su questo prendo il valore migliore nelle istanze TODO
-		// trust maggiore ma a parita' di trust il piu' specifico (base del vocabolario)
-		// vedi note
-		// cannot use com.google.common.base.Objects.firstNonNull as it throws NPE when both terms are null
-		merge.setResourcetype(firstNonNull(merge.getResourcetype(), enrich.getResourcetype()));
+		// this field might contain the original type from the raw metadata, no strategy yet to merge it
+		merge.setResourcetype(coalesce(merge.getResourcetype(), enrich.getResourcetype()));
 
-		// ok
 		merge.setCoverage(unionDistinctLists(merge.getCoverage(), enrich.getCoverage(), trust));
 
-		// most open ok
 		if (enrich.getBestaccessright() != null
 			&& new AccessRightComparator<>()
 				.compare(enrich.getBestaccessright(), merge.getBestaccessright()) < 0) {
 			merge.setBestaccessright(enrich.getBestaccessright());
 		}
 
-		// TODO merge of datainfo given same id
-		merge.setContext(unionDistinctLists(merge.getContext(), enrich.getContext(), trust));
+		// merge datainfo for same context id
+		merge.setContext(mergeLists(merge.getContext(), enrich.getContext(), trust, Context::getId, (r, l) -> {
+			r.getDataInfo().addAll(l.getDataInfo());
+			return r;
+		}));
 
 		// ok
 		merge
 			.setExternalReference(
-				unionDistinctLists(merge.getExternalReference(), enrich.getExternalReference(), trust));
+				mergeExternalReference(merge.getExternalReference(), enrich.getExternalReference(), trust));
 
 		// instance enrichment or union
 		// review instance equals => add pid to comparision
-		if (!isAnEnrichment(merge) && !isAnEnrichment(enrich))
-			merge
-				.setInstance(
-					mergeLists(
-						merge.getInstance(), enrich.getInstance(), trust,
-						MergeUtils::instanceKeyExtractor,
-						MergeUtils::instanceMerger));
-		else {
+		if (!isAnEnrichment(merge) && !isAnEnrichment(enrich)) {
+			merge.setInstance(mergeInstances(merge.getInstance(), enrich.getInstance(), trust));
+		} else {
 			final List<Instance> enrichmentInstances = isAnEnrichment(merge) ? merge.getInstance()
 				: enrich.getInstance();
 			final List<Instance> enrichedInstances = isAnEnrichment(merge) ? enrich.getInstance()
@@ -421,16 +450,123 @@ public class MergeUtils {
 			merge.setInstance(enrichInstances(enrichedInstances, enrichmentInstances));
 		}
 
-		merge.setEoscifguidelines(unionDistinctLists(merge.getEoscifguidelines(), enrich.getEoscifguidelines(), trust));
+		merge
+			.setEoscifguidelines(
+				mergeEosciifguidelines(merge.getEoscifguidelines(), enrich.getEoscifguidelines(), trust));
 		merge.setIsGreen(booleanOR(merge.getIsGreen(), enrich.getIsGreen()));
 		// OK but should be list of values
-		merge.setOpenAccessColor(chooseReference(merge.getOpenAccessColor(), enrich.getOpenAccessColor(), trust));
+		merge.setOpenAccessColor(coalesce(merge.getOpenAccessColor(), enrich.getOpenAccessColor()));
 		merge.setIsInDiamondJournal(booleanOR(merge.getIsInDiamondJournal(), enrich.getIsInDiamondJournal()));
 		merge.setPubliclyFunded(booleanOR(merge.getPubliclyFunded(), enrich.getPubliclyFunded()));
 
 		return merge;
 	}
 
+	private static Field<String> mergeDateOfAcceptance(Field<String> merge, Field<String> enrich, int trust) {
+		// higher trust then oldest date
+		if ((merge == null || trust == 0) && enrich != null) {
+			if (merge == null) {
+				return enrich;
+			} else {
+				try {
+					LocalDate merge_date = LocalDate.parse(merge.getValue(), DateTimeFormatter.ISO_DATE);
+					try {
+						LocalDate enrich_date = LocalDate.parse(enrich.getValue(), DateTimeFormatter.ISO_DATE);
+
+						if (enrich_date.getYear() > 1300
+							&& (merge_date.getYear() < 1300 || merge_date.isAfter(enrich_date))) {
+							return enrich;
+						}
+					} catch (NullPointerException | DateTimeParseException e) {
+						return merge;
+					}
+				} catch (NullPointerException | DateTimeParseException e) {
+					return enrich;
+				}
+			}
+		}
+
+		// keep value
+		return merge;
+	}
+
+	private static List<Instance> mergeInstances(List<Instance> v1, List<Instance> v2, int trust) {
+		return mergeLists(
+			v1, v2, trust,
+			MergeUtils::instanceKeyExtractor,
+			MergeUtils::instanceMerger);
+	}
+
+	private static List<EoscIfGuidelines> mergeEosciifguidelines(List<EoscIfGuidelines> v1, List<EoscIfGuidelines> v2,
+		int trust) {
+		return mergeLists(
+			v1, v2, trust, er -> Joiner
+				.on("||")
+				.useForNull("")
+				.join(er.getCode(), er.getLabel(), er.getUrl(), er.getSemanticRelation()),
+			(r, l) -> r);
+
+	}
+
+	private static List<ExternalReference> mergeExternalReference(List<ExternalReference> v1,
+		List<ExternalReference> v2, int trust) {
+		return mergeLists(
+			v1, v2, trust, er -> Joiner
+				.on(',')
+				.useForNull("")
+				.join(
+					er.getSitename(), er.getLabel(),
+					er.getUrl(), toString(er.getQualifier()), er.getRefidentifier(),
+					er.getQuery(), toString(er.getDataInfo())),
+			(r, l) -> r);
+	}
+
+	private static String toString(DataInfo di) {
+		return Joiner
+			.on(',')
+			.useForNull("")
+			.join(
+				di.getInvisible(), di.getInferred(), di.getDeletedbyinference(), di.getTrust(),
+				di.getInferenceprovenance(), toString(di.getProvenanceaction()));
+	}
+
+	private static String toString(Qualifier q) {
+		return Joiner
+			.on(',')
+			.useForNull("")
+			.join(q.getClassid(), q.getClassname(), q.getSchemeid(), q.getSchemename());
+	}
+
+	private static String toString(StructuredProperty sp) {
+		return Joiner
+			.on(',')
+			.useForNull("")
+			.join(toString(sp.getQualifier()), sp.getValue());
+	}
+
+	private static <T extends StructuredProperty> List<T> mergeStructuredProperties(List<T> v1, List<T> v2, int trust) {
+		return mergeLists(v1, v2, trust, MergeUtils::toString, (r, l) -> r);
+	}
+
+	private static <T extends Qualifier> List<T> mergeQualifiers(List<T> v1, List<T> v2, int trust) {
+		return mergeLists(v1, v2, trust, MergeUtils::toString, (r, l) -> r);
+	}
+
+	private static <T> T coalesce(T m, T e) {
+		return m != null ? m : e;
+	}
+
+	private static List<Author> mergeAuthors(List<Author> author, List<Author> author1, int trust) {
+		List<List<Author>> authors = new ArrayList<>();
+		if (author != null) {
+			authors.add(author);
+		}
+		if (author1 != null) {
+			authors.add(author1);
+		}
+		return AuthorMerger.merge(authors);
+	}
+
 	private static String instanceKeyExtractor(Instance i) {
 		return String
 			.join(
@@ -472,9 +608,9 @@ public class MergeUtils {
 					MergeUtils::instanceTypeMappingKeyExtractor, (itm1, itm2) -> itm1));
 		i.setFulltext(selectFulltext(i1.getFulltext(), i2.getFulltext()));
 		i.setDateofacceptance(selectOldestDate(i1.getDateofacceptance(), i2.getDateofacceptance()));
-		i.setLicense(firstNonNull(i1.getLicense(), i2.getLicense()));
-		i.setProcessingchargeamount(firstNonNull(i1.getProcessingchargeamount(), i2.getProcessingchargeamount()));
-		i.setProcessingchargecurrency(firstNonNull(i1.getProcessingchargecurrency(), i2.getProcessingchargecurrency()));
+		i.setLicense(coalesce(i1.getLicense(), i2.getLicense()));
+		i.setProcessingchargeamount(coalesce(i1.getProcessingchargeamount(), i2.getProcessingchargeamount()));
+		i.setProcessingchargecurrency(coalesce(i1.getProcessingchargecurrency(), i2.getProcessingchargecurrency()));
 		i
 			.setMeasures(
 				mergeLists(i1.getMeasures(), i2.getMeasures(), 0, MergeUtils::measureKeyExtractor, (m1, m2) -> m1));
@@ -698,7 +834,7 @@ public class MergeUtils {
 	 * @param b the b
 	 * @return the list
 	 */
-	public static List<Field<String>> longestLists(List<Field<String>> a, List<Field<String>> b) {
+	private static List<Field<String>> longestLists(List<Field<String>> a, List<Field<String>> b) {
 		if (a == null || b == null)
 			return a == null ? b : a;
 
@@ -849,8 +985,8 @@ public class MergeUtils {
 	 * single attribute only if in the current instance is missing
 	 * The only repeatable field enriched is measures
 	 *
-	 * @param merge the current instance
-	 * @param enrichment      the enrichment instance
+	 * @param merge      the current instance
+	 * @param enrichment the enrichment instance
 	 */
 	private static void applyEnrichment(final Instance merge, final Instance enrichment) {
 		if (merge == null || enrichment == null)
diff --git a/dhp-common/src/main/java/eu/dnetlib/dhp/schema/oaf/utils/OrganizationPidComparator.java b/dhp-common/src/main/java/eu/dnetlib/dhp/schema/oaf/utils/OrganizationPidComparator.java
index 3a6df2924c..7649c3fec1 100644
--- a/dhp-common/src/main/java/eu/dnetlib/dhp/schema/oaf/utils/OrganizationPidComparator.java
+++ b/dhp-common/src/main/java/eu/dnetlib/dhp/schema/oaf/utils/OrganizationPidComparator.java
@@ -9,10 +9,18 @@ public class OrganizationPidComparator implements Comparator<StructuredProperty>
 
 	@Override
 	public int compare(StructuredProperty left, StructuredProperty right) {
+		if (left == null) {
+			return right == null ? 0 : -1;
+		} else if (right == null) {
+			return 1;
+		}
 
 		PidType lClass = PidType.tryValueOf(left.getQualifier().getClassid());
 		PidType rClass = PidType.tryValueOf(right.getQualifier().getClassid());
 
+		if (lClass.equals(rClass))
+			return 0;
+
 		if (lClass.equals(PidType.openorgs))
 			return -1;
 		if (rClass.equals(PidType.openorgs))
diff --git a/dhp-common/src/main/java/eu/dnetlib/dhp/schema/oaf/utils/RefereedComparator.java b/dhp-common/src/main/java/eu/dnetlib/dhp/schema/oaf/utils/RefereedComparator.java
index 91c7b68451..8669336c75 100644
--- a/dhp-common/src/main/java/eu/dnetlib/dhp/schema/oaf/utils/RefereedComparator.java
+++ b/dhp-common/src/main/java/eu/dnetlib/dhp/schema/oaf/utils/RefereedComparator.java
@@ -4,7 +4,6 @@ package eu.dnetlib.dhp.schema.oaf.utils;
 import java.util.Comparator;
 
 import eu.dnetlib.dhp.schema.oaf.Qualifier;
-import eu.dnetlib.dhp.schema.oaf.StructuredProperty;
 
 /**
  * Comparator for sorting the values from the dnet:review_levels vocabulary, implements the following ordering
@@ -15,10 +14,18 @@ public class RefereedComparator implements Comparator<Qualifier> {
 
 	@Override
 	public int compare(Qualifier left, Qualifier right) {
+		if (left == null || left.getClassid() == null) {
+			return (right == null || right.getClassid() == null) ? 0 : -1;
+		} else if (right == null || right.getClassid() == null) {
+			return 1;
+		}
 
 		String lClass = left.getClassid();
 		String rClass = right.getClassid();
 
+		if (lClass.equals(rClass))
+			return 0;
+
 		if ("0001".equals(lClass))
 			return -1;
 		if ("0001".equals(rClass))
diff --git a/dhp-common/src/main/java/eu/dnetlib/dhp/schema/oaf/utils/ResultPidComparator.java b/dhp-common/src/main/java/eu/dnetlib/dhp/schema/oaf/utils/ResultPidComparator.java
index e51c4801f5..00ad3bdc0c 100644
--- a/dhp-common/src/main/java/eu/dnetlib/dhp/schema/oaf/utils/ResultPidComparator.java
+++ b/dhp-common/src/main/java/eu/dnetlib/dhp/schema/oaf/utils/ResultPidComparator.java
@@ -13,6 +13,9 @@ public class ResultPidComparator implements Comparator<StructuredProperty> {
 		PidType lClass = PidType.tryValueOf(left.getQualifier().getClassid());
 		PidType rClass = PidType.tryValueOf(right.getQualifier().getClassid());
 
+		if (lClass.equals(rClass))
+			return 0;
+
 		if (lClass.equals(PidType.doi))
 			return -1;
 		if (rClass.equals(PidType.doi))
diff --git a/dhp-common/src/main/java/eu/dnetlib/dhp/schema/oaf/utils/ResultTypeComparator.java b/dhp-common/src/main/java/eu/dnetlib/dhp/schema/oaf/utils/ResultTypeComparator.java
index 04d855d1c0..ba55621e55 100644
--- a/dhp-common/src/main/java/eu/dnetlib/dhp/schema/oaf/utils/ResultTypeComparator.java
+++ b/dhp-common/src/main/java/eu/dnetlib/dhp/schema/oaf/utils/ResultTypeComparator.java
@@ -14,6 +14,8 @@ import eu.dnetlib.dhp.schema.oaf.Result;
 
 public class ResultTypeComparator implements Comparator<Result> {
 
+	public static final ResultTypeComparator INSTANCE = new ResultTypeComparator();
+
 	@Override
 	public int compare(Result left, Result right) {
 
@@ -37,28 +39,27 @@ public class ResultTypeComparator implements Comparator<Result> {
 		String lClass = left.getResulttype().getClassid();
 		String rClass = right.getResulttype().getClassid();
 
-		if (lClass.equals(rClass))
-			return 0;
+		if (!lClass.equals(rClass)) {
+			if (lClass.equals(ModelConstants.PUBLICATION_RESULTTYPE_CLASSID))
+				return -1;
+			if (rClass.equals(ModelConstants.PUBLICATION_RESULTTYPE_CLASSID))
+				return 1;
 
-		if (lClass.equals(ModelConstants.PUBLICATION_RESULTTYPE_CLASSID))
-			return -1;
-		if (rClass.equals(ModelConstants.PUBLICATION_RESULTTYPE_CLASSID))
-			return 1;
+			if (lClass.equals(ModelConstants.DATASET_RESULTTYPE_CLASSID))
+				return -1;
+			if (rClass.equals(ModelConstants.DATASET_RESULTTYPE_CLASSID))
+				return 1;
 
-		if (lClass.equals(ModelConstants.DATASET_RESULTTYPE_CLASSID))
-			return -1;
-		if (rClass.equals(ModelConstants.DATASET_RESULTTYPE_CLASSID))
-			return 1;
+			if (lClass.equals(ModelConstants.SOFTWARE_RESULTTYPE_CLASSID))
+				return -1;
+			if (rClass.equals(ModelConstants.SOFTWARE_RESULTTYPE_CLASSID))
+				return 1;
 
-		if (lClass.equals(ModelConstants.SOFTWARE_RESULTTYPE_CLASSID))
-			return -1;
-		if (rClass.equals(ModelConstants.SOFTWARE_RESULTTYPE_CLASSID))
-			return 1;
-
-		if (lClass.equals(ModelConstants.ORP_RESULTTYPE_CLASSID))
-			return -1;
-		if (rClass.equals(ModelConstants.ORP_RESULTTYPE_CLASSID))
-			return 1;
+			if (lClass.equals(ModelConstants.ORP_RESULTTYPE_CLASSID))
+				return -1;
+			if (rClass.equals(ModelConstants.ORP_RESULTTYPE_CLASSID))
+				return 1;
+		}
 
 		// Else (but unlikely), lexicographical ordering will do.
 		return lClass.compareTo(rClass);
diff --git a/dhp-pace-core/src/main/java/eu/dnetlib/pace/clustering/WordsStatsSuffixPrefixChain.java b/dhp-pace-core/src/main/java/eu/dnetlib/pace/clustering/WordsStatsSuffixPrefixChain.java
index 22351cf8ff..1e897182db 100644
--- a/dhp-pace-core/src/main/java/eu/dnetlib/pace/clustering/WordsStatsSuffixPrefixChain.java
+++ b/dhp-pace-core/src/main/java/eu/dnetlib/pace/clustering/WordsStatsSuffixPrefixChain.java
@@ -20,7 +20,7 @@ public class WordsStatsSuffixPrefixChain extends AbstractClusteringFunction {
 		return suffixPrefixChain(s, param("mod"));
 	}
 
-	private Collection<String> suffixPrefixChain(String s, int mod) {
+	static Collection<String> suffixPrefixChain(String s, int mod) {
 
 		// create the list of words from the string (remove short words)
 		List<String> wordsList = Arrays
@@ -38,7 +38,7 @@ public class WordsStatsSuffixPrefixChain extends AbstractClusteringFunction {
 
 	}
 
-	private Collection<String> doSuffixPrefixChain(List<String> wordsList, String prefix) {
+	static private Collection<String> doSuffixPrefixChain(List<String> wordsList, String prefix) {
 
 		Set<String> set = Sets.newLinkedHashSet();
 		switch (wordsList.size()) {
@@ -80,12 +80,16 @@ public class WordsStatsSuffixPrefixChain extends AbstractClusteringFunction {
 
 	}
 
-	private String suffix(String s, int len) {
+	private static String suffix(String s, int len) {
 		return s.substring(s.length() - len);
 	}
 
-	private String prefix(String s, int len) {
+	private static String prefix(String s, int len) {
 		return s.substring(0, len);
 	}
 
+	static public void main(String[] args) {
+		String title = "MY LIFE AS A BOSON: THE STORY OF \"THE HIGGS\"".toLowerCase();
+		System.out.println(suffixPrefixChain(title, 10));
+	}
 }
diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/DedupRecordFactory.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/DedupRecordFactory.java
index 574b13b263..cf8c9ac3bd 100644
--- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/DedupRecordFactory.java
+++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/DedupRecordFactory.java
@@ -7,6 +7,7 @@ import java.util.stream.Stream;
 import org.apache.commons.beanutils.BeanUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.spark.api.java.function.FlatMapFunction;
+import org.apache.spark.api.java.function.FlatMapGroupsFunction;
 import org.apache.spark.api.java.function.MapFunction;
 import org.apache.spark.api.java.function.ReduceFunction;
 import org.apache.spark.sql.*;
@@ -94,44 +95,59 @@ public class DedupRecordFactory {
 			.join(entities, JavaConversions.asScalaBuffer(Collections.singletonList("id")), "left")
 			.select("dedupId", "id", "kryoObject")
 			.as(Encoders.tuple(Encoders.STRING(), Encoders.STRING(), kryoEncoder))
-			.map(
-				(MapFunction<Tuple3<String, String, OafEntity>, DedupRecordReduceState>) t -> new DedupRecordReduceState(
-					t._1(), t._2(), t._3()),
-				Encoders.kryo(DedupRecordReduceState.class))
-			.groupByKey(
-				(MapFunction<DedupRecordReduceState, String>) DedupRecordReduceState::getDedupId, Encoders.STRING())
-			.reduceGroups(
-				(ReduceFunction<DedupRecordReduceState>) (t1, t2) -> {
-					if (t1.entity == null) {
-						t2.aliases.addAll(t1.aliases);
-						return t2;
+			.groupByKey((MapFunction<Tuple3<String, String, OafEntity>, String>) Tuple3::_1, Encoders.STRING())
+			.flatMapGroups(
+				(FlatMapGroupsFunction<String, Tuple3<String, String, OafEntity>, OafEntity>) (dedupId, it) -> {
+					if (!it.hasNext())
+						return Collections.emptyIterator();
+
+					final ArrayList<OafEntity> cliques = new ArrayList<>();
+
+					final ArrayList<String> aliases = new ArrayList<>();
+
+					final HashSet<String> acceptanceDate = new HashSet<>();
+
+					while (it.hasNext()) {
+						Tuple3<String, String, OafEntity> t = it.next();
+						OafEntity entity = t._3();
+
+						if (entity == null) {
+							aliases.add(t._2());
+						} else {
+							cliques.add(entity);
+
+							if (acceptanceDate.size() < MAX_ACCEPTANCE_DATE) {
+								if (Result.class.isAssignableFrom(entity.getClass())) {
+									Result result = (Result) entity;
+									if (result.getDateofacceptance() != null
+										&& StringUtils.isNotBlank(result.getDateofacceptance().getValue())) {
+										acceptanceDate.add(result.getDateofacceptance().getValue());
+									}
+								}
+							}
+						}
+
 					}
-					if (t1.acceptanceDate.size() < MAX_ACCEPTANCE_DATE) {
-						t1.acceptanceDate.addAll(t2.acceptanceDate);
+
+					if (acceptanceDate.size() >= MAX_ACCEPTANCE_DATE || cliques.isEmpty()) {
+						return Collections.emptyIterator();
 					}
-					t1.aliases.addAll(t2.aliases);
-					t1.entity = reduceEntity(t1.entity, t2.entity);
 
-					return t1;
-				})
-			.flatMap((FlatMapFunction<Tuple2<String, DedupRecordReduceState>, OafEntity>) t -> {
-				String dedupId = t._1();
-				DedupRecordReduceState agg = t._2();
+					OafEntity mergedEntity = MergeUtils.mergeGroup(dedupId, cliques.iterator());
+					// dedup records do not have date of transformation attribute
+					mergedEntity.setDateoftransformation(null);
 
-				if (agg.acceptanceDate.size() >= MAX_ACCEPTANCE_DATE) {
-					return Collections.emptyIterator();
-				}
+					return Stream
+						.concat(
+							Stream
+								.of(dedupId)
+								.map(id -> createDedupOafEntity(id, mergedEntity, dataInfo, ts)),
+							aliases
+								.stream()
+								.map(id -> createMergedDedupAliasOafEntity(id, mergedEntity, dataInfo, ts)))
+						.iterator();
 
-				return Stream
-					.concat(
-						Stream
-							.of(agg.getDedupId())
-							.map(id -> createDedupOafEntity(id, agg.entity, dataInfo, ts)),
-						agg.aliases
-							.stream()
-							.map(id -> createMergedDedupAliasOafEntity(id, agg.entity, dataInfo, ts)))
-					.iterator();
-			}, beanEncoder);
+				}, beanEncoder);
 	}
 
 	private static OafEntity createDedupOafEntity(String id, OafEntity base, DataInfo dataInfo, long ts) {
diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/GenerateEntitiesApplication.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/GenerateEntitiesApplication.java
index df87288511..21d06692fe 100644
--- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/GenerateEntitiesApplication.java
+++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/GenerateEntitiesApplication.java
@@ -130,10 +130,12 @@ public class GenerateEntitiesApplication extends AbstractMigrationApplication {
 		switch (mode) {
 			case claim:
 				save(
-					inputRdd
-						.mapToPair(oaf -> new Tuple2<>(ModelSupport.idFn().apply(oaf), oaf))
-						.reduceByKey(MergeUtils::merge)
-						.map(Tuple2::_2),
+					inputRdd.keyBy(oaf -> ModelSupport.idFn().apply(oaf))
+							.groupByKey()
+							.map(t -> MergeUtils.mergeGroup(t._1, t._2.iterator())),
+						//.mapToPair(oaf -> new Tuple2<>(ModelSupport.idFn().apply(oaf), oaf))
+						//.reduceByKey(MergeUtils::merge)
+						//.map(Tuple2::_2),
 					targetPath);
 				break;
 			case graph:
diff --git a/dhp-workflows/dhp-graph-mapper/src/main/scala/eu/dnetlib/dhp/sx/graph/SparkCreateInputGraph.scala b/dhp-workflows/dhp-graph-mapper/src/main/scala/eu/dnetlib/dhp/sx/graph/SparkCreateInputGraph.scala
index ecf612cc74..d94a23947a 100644
--- a/dhp-workflows/dhp-graph-mapper/src/main/scala/eu/dnetlib/dhp/sx/graph/SparkCreateInputGraph.scala
+++ b/dhp-workflows/dhp-graph-mapper/src/main/scala/eu/dnetlib/dhp/sx/graph/SparkCreateInputGraph.scala
@@ -8,6 +8,8 @@ import org.apache.spark.SparkConf
 import org.apache.spark.sql._
 import org.slf4j.{Logger, LoggerFactory}
 
+import scala.collection.JavaConverters._
+
 object SparkCreateInputGraph {
 
   def main(args: Array[String]): Unit = {
@@ -131,8 +133,9 @@ object SparkCreateInputGraph {
     val ds: Dataset[T] = spark.read.load(sourcePath).as[T]
 
     ds.groupByKey(_.getId)
-      .reduceGroups { (x: T, y: T) => MergeUtils.merge(x, y).asInstanceOf[T] }
-      .map(_._2)
+      .mapGroups { (id, it) => MergeUtils.mergeGroup(id, it.asJava).asInstanceOf[T] }
+//      .reduceGroups { (x: T, y: T) => MergeUtils.merge(x, y).asInstanceOf[T] }
+//      .map(_)
       .write
       .mode(SaveMode.Overwrite)
       .save(targetPath)