diff --git a/parsl/config.py b/parsl/config.py
index c3725eccf8..5979cddf29 100644
--- a/parsl/config.py
+++ b/parsl/config.py
@@ -5,6 +5,7 @@
 from typing_extensions import Literal
 
 from parsl.dataflow.dependency_resolvers import DependencyResolver
+from parsl.dataflow.retries import RetryBehaviour
 from parsl.dataflow.taskrecord import TaskRecord
 from parsl.errors import ConfigurationError
 from parsl.executors.base import ParslExecutor
@@ -110,7 +111,7 @@ def __init__(self,
                  garbage_collect: bool = True,
                  internal_tasks_max_threads: int = 10,
                  retries: int = 0,
-                 retry_handler: Optional[Callable[[Exception, TaskRecord], float]] = None,
+                 retry_handler: Optional[Callable[[Exception, TaskRecord], Union[float, RetryBehaviour]]] = None,
                  run_dir: str = 'runinfo',
                  std_autopath: Optional[Callable] = None,
                  strategy: Optional[str] = 'simple',
diff --git a/parsl/dataflow/dflow.py b/parsl/dataflow/dflow.py
index a62a2261d0..b849c6e54e 100644
--- a/parsl/dataflow/dflow.py
+++ b/parsl/dataflow/dflow.py
@@ -33,6 +33,7 @@
 from parsl.dataflow.errors import BadCheckpoint, DependencyError, JoinError
 from parsl.dataflow.futures import AppFuture
 from parsl.dataflow.memoization import Memoizer
+from parsl.dataflow.retries import CompleteWithAlternateValue
 from parsl.dataflow.rundirs import make_rundir
 from parsl.dataflow.states import FINAL_FAILURE_STATES, FINAL_STATES, States
 from parsl.dataflow.taskrecord import TaskRecord
@@ -340,8 +341,11 @@ def handle_exec_update(self, task_record: TaskRecord, future: Future) -> None:
         if not future.done():
             raise InternalConsistencyError("done callback called, despite future not reporting itself as done")
 
+        returned_result = False
+
         try:
             res = self._unwrap_remote_exception_wrapper(future)
+            returned_result = True
 
         except Exception as e:
             logger.info(f"Task {task_id} try {task_record['try_id']} failed with exception of type {type(e).__name__}")
@@ -351,7 +355,7 @@ def handle_exec_update(self, task_record: TaskRecord, future: Future) -> None:
             task_record['fail_count'] += 1
             if self._config.retry_handler:
                 try:
-                    cost = self._config.retry_handler(e, task_record)
+                    retry_behaviour = self._config.retry_handler(e, task_record)
                 except Exception as retry_handler_exception:
                     logger.exception("retry_handler raised an exception - will not retry")
 
@@ -363,7 +367,17 @@ def handle_exec_update(self, task_record: TaskRecord, future: Future) -> None:
                     # rather than the execution level exception
                     e = retry_handler_exception
                 else:
-                    task_record['fail_cost'] += cost
+                    if isinstance(retry_behaviour, float) or isinstance(retry_behaviour, int):
+                        task_record['fail_cost'] += retry_behaviour
+                    elif isinstance(retry_behaviour, CompleteWithAlternateValue):
+                        # retry_behaviour.value contains the value we will complete with, instead
+                        # of trying to either retry or fail.
+                        res = retry_behaviour.value
+                        returned_result = True
+                    else:
+                        # TODO: this error doesn't go anywhere... just makes a hang...
+                        # should, like the block above, turn it into a final exception.
+                        raise InternalConsistencyError(f"Unrecognised retry behaviour {retry_behaviour}")
             else:
                 task_record['fail_cost'] += 1
 
@@ -399,7 +413,9 @@ def handle_exec_update(self, task_record: TaskRecord, future: Future) -> None:
                 with task_record['app_fu']._update_lock:
                     task_record['app_fu'].set_exception(e)
 
-        else:
+        # we might do the result path even if we did exception handling above...
+        # because the exception handling might have decided we should have a result
+        if returned_result:
             if task_record['from_memo']:
                 self._complete_task(task_record, States.memo_done, res)
                 self._send_task_log_info(task_record)
diff --git a/parsl/dataflow/retries.py b/parsl/dataflow/retries.py
new file mode 100644
index 0000000000..e6eab4f67b
--- /dev/null
+++ b/parsl/dataflow/retries.py
@@ -0,0 +1,11 @@
+from abc import ABCMeta
+from dataclasses import dataclass
+
+
+class RetryBehaviour(metaclass=ABCMeta):
+    pass
+
+
+@dataclass
+class CompleteWithAlternateValue(RetryBehaviour):
+    value: object
diff --git a/parsl/tests/test_error_handling/test_retry_handler.py b/parsl/tests/test_error_handling/test_retry_handler.py
index 788c6063e5..88521506f0 100644
--- a/parsl/tests/test_error_handling/test_retry_handler.py
+++ b/parsl/tests/test_error_handling/test_retry_handler.py
@@ -4,6 +4,7 @@
 
 import parsl
 from parsl import bash_app
+from parsl.dataflow.retries import CompleteWithAlternateValue
 from parsl.tests.configs.local_threads import fresh_config
 
 
diff --git a/parsl/tests/test_error_handling/test_retry_handler_result.py b/parsl/tests/test_error_handling/test_retry_handler_result.py
new file mode 100644
index 0000000000..132f438a3c
--- /dev/null
+++ b/parsl/tests/test_error_handling/test_retry_handler_result.py
@@ -0,0 +1,57 @@
+import os
+
+import pytest
+
+import parsl
+from parsl import python_app
+from parsl.dataflow.retries import CompleteWithAlternateValue
+from parsl.tests.configs.local_threads import fresh_config
+
+
+class SpecialException(Exception):
+    pass
+
+
+def error_to_result_handler(exc, task_record):
+    """Given a particular exception, turn it into a specific result"""
+    if isinstance(exc, SpecialException):
+        # substitute the exception with an alternate success
+        return CompleteWithAlternateValue(8)
+    else:
+        return 1  # regular retry cost
+
+
+def local_config():
+    c = fresh_config()
+    c.retries = 2
+    c.retry_handler = error_to_result_handler
+    return c
+
+
+@python_app
+def returns_val():
+    return 7
+
+
+@python_app
+def raises_runtime():
+    raise RuntimeError("from raises_runtime")
+
+
+@python_app
+def raises_special():
+    raise SpecialException(Exception)
+
+
+@pytest.mark.local
+def test_retry():
+
+    # two pre-reqs to validate that results and normal exceptions are handled
+    # correctly with the test retry handler
+    assert returns_val().result() == 7
+    with pytest.raises(RuntimeError):
+        raises_runtime().result()
+
+    # the actual test: check that a special exception has been replaced by a
+    # a real value
+    assert raises_special().result() == 8