Skip to content

Commit 2e8cd9f

Browse files
authored
Fix bugs from #541 (#544)
1 parent 89fc454 commit 2e8cd9f

File tree

9 files changed

+124
-33
lines changed

9 files changed

+124
-33
lines changed

eventuals/closure.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -89,9 +89,9 @@ struct _Closure final {
8989
using ErrorsFrom = typename E_::template ErrorsFrom<Arg, Errors>;
9090

9191
template <typename Downstream>
92-
static constexpr bool CanCompose = true;
92+
static constexpr bool CanCompose = E_::template CanCompose<Downstream>;
9393

94-
using Expects = StreamOrValue;
94+
using Expects = typename E_::Expects;
9595

9696
template <typename Arg, typename K>
9797
auto k(K k) && {

eventuals/generator.h

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -313,6 +313,11 @@ struct _Generator final {
313313
template <typename Arg, typename Errors>
314314
using ErrorsFrom = tuple_types_union_t<Errors, Errors_>;
315315

316+
template <typename Downstream>
317+
static constexpr bool CanCompose = Downstream::ExpectsStream;
318+
319+
using Expects = SingleValue;
320+
316321
template <typename T>
317322
using From = std::enable_if_t<
318323
IsUndefined<From_>::value,
@@ -472,11 +477,6 @@ struct _Generator final {
472477
std::move(dispatch_));
473478
}
474479

475-
template <typename Downstream>
476-
static constexpr bool CanCompose = true;
477-
478-
using Expects = SingleValue;
479-
480480
std::conditional_t<
481481
std::disjunction_v<IsUndefined<From_>, IsUndefined<To_>>,
482482
Undefined,

eventuals/lock.h

Lines changed: 53 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -477,6 +477,11 @@ struct _Acquire final {
477477
template <typename Arg, typename Errors>
478478
using ErrorsFrom = Errors;
479479

480+
// TODO(benh): nothing can actually compose with anything else
481+
// because it depends one what gets composed with this. Alas, we
482+
// need to change the way 'Reschedule()' works here to take the
483+
// eventual that it wants to run so that we can properly check
484+
// that things compose correctly.
480485
template <typename Downstream>
481486
static constexpr bool CanCompose = true;
482487

@@ -559,6 +564,11 @@ struct _Release final {
559564
template <typename Arg, typename Errors>
560565
using ErrorsFrom = Errors;
561566

567+
// TODO(benh): nothing can actually compose with anything else
568+
// because it depends one what gets composed with this. Alas, we
569+
// need to change the way 'Reschedule()' works here to take the
570+
// eventual that it wants to run so that we can properly check
571+
// that things compose correctly.
562572
template <typename Downstream>
563573
static constexpr bool CanCompose = true;
564574

@@ -792,16 +802,21 @@ struct _Wait final {
792802
template <typename Arg, typename Errors>
793803
using ErrorsFrom = Errors;
794804

795-
template <typename Arg, typename K>
796-
auto k(K k) && {
797-
return Continuation<K, F_, Arg>(std::move(k), lock_, std::move(f_));
798-
}
799-
805+
// TODO(benh): nothing can actually compose with anything else
806+
// because it depends one what gets composed with this. Alas, we
807+
// need to change the way 'Reschedule()' works here to take the
808+
// eventual that it wants to run so that we can properly check
809+
// that things compose correctly.
800810
template <typename Downstream>
801811
static constexpr bool CanCompose = true;
802812

803813
using Expects = StreamOrValue;
804814

815+
template <typename Arg, typename K>
816+
auto k(K k) && {
817+
return Continuation<K, F_, Arg>(std::move(k), lock_, std::move(f_));
818+
}
819+
805820
Lock* lock_;
806821
F_ f_;
807822
};
@@ -828,15 +843,45 @@ template <typename F>
828843

829844
////////////////////////////////////////////////////////////////////////
830845

846+
struct _Synchronized final {
847+
template <typename E_>
848+
struct Composable final {
849+
// NOTE: declaring these here to use them to make type aliases.
850+
Lock* lock_;
851+
E_ e_;
852+
853+
using Composed_ =
854+
decltype(Acquire(lock_) >> std::move(e_) >> Release(lock_));
855+
856+
template <typename Arg>
857+
using ValueFrom = typename Composed_::template ValueFrom<Arg>;
858+
859+
template <typename Arg, typename Errors>
860+
using ErrorsFrom = typename Composed_::template ErrorsFrom<Arg, Errors>;
861+
862+
template <typename Downstream>
863+
static constexpr bool CanCompose = E_::template CanCompose<Downstream>;
864+
865+
using Expects = typename E_::Expects;
866+
867+
template <typename Arg, typename K>
868+
auto k(K k) && {
869+
return Build<Arg>(
870+
Acquire(lock_) >> std::move(e_) >> Release(lock_),
871+
std::move(k));
872+
}
873+
};
874+
};
875+
876+
////////////////////////////////////////////////////////////////////////
877+
831878
class Synchronizable {
832879
public:
833880
virtual ~Synchronizable() = default;
834881

835882
template <typename E>
836883
[[nodiscard]] auto Synchronized(E e) {
837-
return Acquire(&lock_)
838-
>> std::move(e)
839-
>> Release(&lock_);
884+
return _Synchronized::Composable<E>{&lock_, std::move(e)};
840885
}
841886

842887
template <typename F>

eventuals/scheduler.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -316,6 +316,11 @@ struct _Reschedule final {
316316
template <typename Arg, typename Errors>
317317
using ErrorsFrom = Errors;
318318

319+
// TODO(benh): nothing can actually compose with anything else
320+
// because it depends one what gets composed with this. Alas, we
321+
// need to change the way 'Reschedule()' works here to take the
322+
// eventual that it wants to run so that we can properly check
323+
// that things compose correctly.
319324
template <typename Downstream>
320325
static constexpr bool CanCompose = true;
321326

eventuals/static-thread-pool.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -535,7 +535,7 @@ struct _StaticThreadPoolSchedule final {
535535
}
536536

537537
template <typename Downstream>
538-
static constexpr bool CanCompose = true;
538+
static constexpr bool CanCompose = E_::template CanCompose<Downstream>;
539539

540540
using Expects = StreamOrValue;
541541

eventuals/task.h

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -318,6 +318,11 @@ struct _TaskFromToWith final {
318318
template <typename Arg, typename Errors>
319319
using ErrorsFrom = tuple_types_union_t<Errors, Errors_>;
320320

321+
template <typename Downstream>
322+
static constexpr bool CanCompose = Downstream::ExpectsValue;
323+
324+
using Expects = SingleValue;
325+
321326
Composable(MonostateIfVoidOrReferenceWrapperOr<To_> value)
322327
: value_or_dispatch_(std::move(value)) {}
323328

@@ -449,11 +454,6 @@ struct _TaskFromToWith final {
449454
std::move(value_or_dispatch_.value()));
450455
}
451456

452-
template <typename Downstream>
453-
static constexpr bool CanCompose = Downstream::ExpectsValue;
454-
455-
using Expects = SingleValue;
456-
457457
// See comment in `Continuation` for explanation of `dispatch_` member.
458458
// Using 'std::optional' because of implicitly deleted 'std::variant'
459459
// constructor.
@@ -486,6 +486,11 @@ class _Task final {
486486
template <typename Arg, typename Errors>
487487
using ErrorsFrom = tuple_types_union_t<Errors, Errors_>;
488488

489+
template <typename Downstream>
490+
static constexpr bool CanCompose = Downstream::ExpectsValue;
491+
492+
using Expects = SingleValue;
493+
489494
template <typename T>
490495
using From = std::enable_if_t<
491496
IsUndefined<From_>::value,
@@ -526,11 +531,6 @@ class _Task final {
526531
CHECK(!that.k_.has_value()) << "moving after starting";
527532
}
528533

529-
template <typename Downstream>
530-
static constexpr bool CanCompose = Downstream::ExpectsValue;
531-
532-
using Expects = SingleValue;
533-
534534
_Task(_Task&& that) noexcept
535535
: e_(std::move(that.e_)) {
536536
CHECK(!that.k_.has_value()) << "moving after starting";

eventuals/transformer.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -243,7 +243,7 @@ struct _Transformer final {
243243
template <typename Downstream>
244244
static constexpr bool CanCompose = Downstream::ExpectsStream;
245245

246-
using Expects = StreamOrValue;
246+
using Expects = StreamOfValues;
247247

248248
template <typename T>
249249
using From = std::enable_if_t<

eventuals/type-check.h

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,11 @@ struct _TypeCheck {
1919
template <typename Arg, typename Errors>
2020
using ErrorsFrom = typename E_::template ErrorsFrom<Arg, Errors>;
2121

22+
template <typename Downstream>
23+
static constexpr bool CanCompose = E_::template CanCompose<Downstream>;
24+
25+
using Expects = typename E_::Expects;
26+
2227
template <typename Arg, typename K>
2328
auto k(K k) && {
2429
static_assert(
@@ -28,11 +33,6 @@ struct _TypeCheck {
2833
return std::move(e_).template k<Arg>(std::move(k));
2934
}
3035

31-
template <typename Downstream>
32-
static constexpr bool CanCompose = true;
33-
34-
using Expects = StreamOrValue;
35-
3636
E_ e_;
3737
};
3838

test/compose.cc

Lines changed: 42 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
#include "eventuals/raise.h"
2525
#include "eventuals/range.h"
2626
#include "eventuals/reduce.h"
27+
#include "eventuals/static-thread-pool.h"
2728
#include "eventuals/stream.h"
2829
#include "eventuals/take.h"
2930
#include "eventuals/until.h"
@@ -271,7 +272,10 @@ TEST(CanCompose, Generator) {
271272
CanCompose<decltype(gen()), decltype(collect)>);
272273

273274
static_assert(
274-
CanCompose<decltype(gen()), decltype(then)>);
275+
!CanCompose<decltype(gen()), decltype(then)>);
276+
277+
static_assert(
278+
CanCompose<decltype(collect), decltype(then)>);
275279
}
276280

277281
TEST(CanCompose, Head) {
@@ -398,6 +402,43 @@ TEST(CanCompose, Take) {
398402
!CanCompose<decltype(take2), decltype(then)>);
399403
}
400404

405+
TEST(CanCompose, Schedule) {
406+
class Actor : public StaticThreadPool::Schedulable {
407+
public:
408+
Actor()
409+
: StaticThreadPool::Schedulable(Pinned::Any()) {}
410+
411+
auto Function() {
412+
return Repeat()
413+
>> Schedule(Map([]() {}));
414+
}
415+
};
416+
417+
Actor actor;
418+
419+
auto then = Then([]() {});
420+
421+
static_assert(
422+
!CanCompose<decltype(actor.Function()), decltype(then)>);
423+
}
424+
425+
TEST(CanCompose, Synchronized) {
426+
class Object : public Synchronizable {
427+
public:
428+
auto Function() {
429+
return Repeat()
430+
>> Synchronized(Map([]() {}));
431+
}
432+
};
433+
434+
Object object;
435+
436+
auto then = Then([]() {});
437+
438+
static_assert(
439+
!CanCompose<decltype(object.Function()), decltype(then)>);
440+
}
441+
401442
////////////////////////////////////////////////////////////////////////
402443

403444
} // namespace

0 commit comments

Comments
 (0)