Skip to content

Sample concurrency solutions #181

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions projects/concurrency/atomics/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package main

import (
"fmt"
"sync"
"sync/atomic"
)

var x atomic.Int32

func increment(wg *sync.WaitGroup) {
x.Add(1)
wg.Done()
}

func main() {
var w sync.WaitGroup
for i := 0; i < 1000; i++ {
w.Add(1)
go increment(&w)
}
w.Wait()
fmt.Println("final value of x", x.Load())
}
93 changes: 93 additions & 0 deletions projects/concurrency/lru_cache_coarse_grained_generations/cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
package cache

import (
"sync"
"sync/atomic"
"time"
)

// type Cache implements a roughly-LRU cache. It attempts to keep to a maximum of targetSize, but may contain more entries at points in time.
// When under size pressure, it garbage collects entries which haven't been read or written, with no strict eviction ordering guarantees.
type Cache[K comparable, V any] struct {
targetSize uint64

mu sync.RWMutex
// Every time we Get/Put a value, we store which generation it was last accessed.
// We have a garbage collection goroutine which will delete entries that haven't been recently accessed, if the cache is full.
currentGeneration atomic.Uint64
values map[K]*valueAndGeneration[V]
}

// NewCache constructs a new Cache who aims to contain up to targetSize elements (but my occasionally exceed this limit),
// and which performs garbage collection to maintain this target size each time gcTicker receives a value.
// The value received by gcTicker is ignored, but is present to make it easy to use the standard library's `time.Ticker` type.
func NewCache[K comparable, V any](targetSize uint64, gcTicker <-chan time.Time) *Cache[K, V] {
cache := &Cache[K, V]{
targetSize: targetSize,
values: make(map[K]*valueAndGeneration[V], targetSize),
}

go func() {
for range gcTicker {
currentGeneration := cache.currentGeneration.Load()
cache.currentGeneration.Add(1)

// Accumulate a keysToDelete slice so that we can collect the keys to delete under a read lock rather than holding a write lock for the entire GC cycle.
// This will use extra memory, and has a disadvantage that we may bump a generation from a Get but then still evict that value because we already decided to GC it.
var keysToDelete []K
cache.mu.RLock()
// If we have free space, don't garbage collect at all. This will probably lead to very spiky evictions.
if uint64(len(cache.values)) <= targetSize {
cache.mu.RUnlock()
continue
}
for k, v := range cache.values {
// This is a _very_ coarse-grained eviction policy. As soon as our cache becomes full, we may evict lots of entries.
// It may be more useful to treat different values of generation differently, e.g. always evict if v.generation < currentGeneration - 5, and only evict more recent entries if that didn't free up any space.
if v.generation.Load() != currentGeneration {
keysToDelete = append(keysToDelete, k)
}
}
cache.mu.RUnlock()
if len(keysToDelete) > 0 {
cache.mu.Lock()
for _, keyToDelete := range keysToDelete {
if len(cache.values) <= int(targetSize) {
break
}
delete(cache.values, keyToDelete)
}
cache.mu.Unlock()
}
}
}()

return cache
}

type valueAndGeneration[V any] struct {
value V
generation atomic.Uint64
}

func (c *Cache[K, V]) Put(key K, value V) bool {
c.mu.Lock()
defer c.mu.Unlock()
valueWrapper := &valueAndGeneration[V]{
value: value,
}
valueWrapper.generation.Store(c.currentGeneration.Load())
c.values[key] = valueWrapper
return false
}

func (c *Cache[K, V]) Get(key K) (*V, bool) {
c.mu.RLock()
defer c.mu.RUnlock()
valueWrapper, ok := c.values[key]
if !ok {
return nil, false
}
valueWrapper.generation.Store(c.currentGeneration.Load())
return &valueWrapper.value, true
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
package cache

import (
"fmt"
"sync"
"testing"
"time"

"github.com/stretchr/testify/require"
)

func TestPutThenGet(t *testing.T) {
gcTicker := make(chan time.Time)
cache := NewCache[string, string](10, gcTicker)
previouslyExisted := cache.Put("greeting", "hello")
require.False(t, previouslyExisted)

// Write to the channel twice twice, because we know that once the second write has sent the first one must be done processing.
gcTicker <- time.Now()
gcTicker <- time.Now()

value, present := cache.Get("greeting")
require.True(t, present)
require.Equal(t, "hello", *value)
}

func TestGetMissing(t *testing.T) {
gcTicker := make(chan time.Time)
cache := NewCache[string, string](1, gcTicker)
value, present := cache.Get("greeting")
require.False(t, present)
require.Nil(t, value)
}

func TestEviction_JustWrites(t *testing.T) {
gcTicker := make(chan time.Time)
cache := NewCache[string, string](10, gcTicker)

for i := 0; i < 10; i++ {
cache.Put(fmt.Sprintf("entry-%d", i), "hello")
}

gcTicker <- time.Now()
gcTicker <- time.Now()

_, present0 := cache.Get("entry-0")
require.True(t, present0)

_, present10 := cache.Get("entry-9")
require.True(t, present10)

cache.Put("entry-10", "hello")

gcTicker <- time.Now()
gcTicker <- time.Now()

presentCount := 0
for key := 0; key < 11; key++ {
got, present := cache.Get(fmt.Sprintf("entry-%d", key))
if present {
presentCount++
require.Equal(t, "hello", *got)
}
}
require.Equal(t, 10, presentCount)

// entries 0, 9, and 10 were accessed a generation after the others, so should be present.
_, present0AfterGC := cache.Get("entry-0")
require.True(t, present0AfterGC)

_, present9AfterGC := cache.Get("entry-9")
require.True(t, present9AfterGC)

_, present10AfterGC := cache.Get("entry-10")
require.True(t, present10AfterGC)
}

func TestConcurrentWrites(t *testing.T) {
gcTicker := make(chan time.Time)
cache := NewCache[int, string](1, gcTicker)

var wg sync.WaitGroup

for iteration := 0; iteration < 100000; iteration++ {
wg.Add(1)
go func() {
for key := 0; key < 3; key++ {
cache.Put(key, fmt.Sprintf("entry-%d", key))
}
wg.Done()
}()
}

wg.Wait()

// No gc tick has happened, so all three keys should be present.
got0, present0 := cache.Get(0)
require.True(t, present0)
require.Equal(t, "entry-0", *got0)

got1, present1 := cache.Get(1)
require.True(t, present1)
require.Equal(t, "entry-1", *got1)

got2, present2 := cache.Get(2)
require.True(t, present2)
require.Equal(t, "entry-2", *got2)

gcTicker <- time.Now()
gcTicker <- time.Now()

presentCount := 0
for key := 0; key < 3; key++ {
got, present := cache.Get(key)
if present {
presentCount++
require.Equal(t, fmt.Sprintf("entry-%d", key), *got)
}
}
require.Equal(t, 1, presentCount)
}
11 changes: 11 additions & 0 deletions projects/concurrency/lru_cache_coarse_grained_generations/go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
module github.com/CodeYourFuture/immersive-go-course/projects/concurrency/lru_cache_coarse_grained_generations

go 1.21.5

require github.com/stretchr/testify v1.8.4

require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
10 changes: 10 additions & 0 deletions projects/concurrency/lru_cache_coarse_grained_generations/go.sum
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk=
github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
131 changes: 131 additions & 0 deletions projects/concurrency/lru_cache_computing/cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
package cache

import (
"container/list"
"sync"
)

type Cache[K comparable, V any] struct {
entryLimit uint64

computeChannel chan K

mu sync.Mutex
computedEntries map[K]cacheEntry[K, V]
pendingEntries map[K]*channelList[K, V]
// Front is most recently used, back is least recently used
evictionList *list.List
}

// entryLimit and concurrentComputeLimit must both be non-zero.
// computer must never panic.
func NewCache[K comparable, V any](entryLimit uint64, concurrentComputeLimit uint64, computer func(K) V) *Cache[K, V] {
computeChannel := make(chan K, concurrentComputeLimit)

resultChannel := make(chan keyValuePair[K, V], concurrentComputeLimit)

for i := 0; i < int(concurrentComputeLimit); i++ {
go func() {
for key := range computeChannel {
value := computer(key)
resultChannel <- keyValuePair[K, V]{
key: key,
value: &value,
}
}
}()
}

cache := &Cache[K, V]{
entryLimit: entryLimit,
computeChannel: computeChannel,

computedEntries: make(map[K]cacheEntry[K, V], entryLimit),
pendingEntries: make(map[K]*channelList[K, V]),
evictionList: list.New(),
}

go func() {
for result := range resultChannel {
cache.mu.Lock()
pendingEntry := cache.pendingEntries[result.key]
delete(cache.pendingEntries, result.key)

if len(cache.computedEntries) == int(cache.entryLimit) {
keyToEvict := cache.evictionList.Remove(cache.evictionList.Back()).(K)
delete(cache.computedEntries, keyToEvict)
}

evictionListPointer := cache.evictionList.PushFront(result.key)

cache.computedEntries[result.key] = cacheEntry[K, V]{
evictionListPointer: evictionListPointer,
value: *result.value,
}
pendingEntry.mu.Lock()
pendingEntry.value = result.value
cache.mu.Unlock()
for _, ch := range pendingEntry.channels {
ch <- result
}
pendingEntry.mu.Unlock()
}
}()

return cache
}

type cacheEntry[K any, V any] struct {
evictionListPointer *list.Element
value V
}

type keyValuePair[K any, V any] struct {
key K
value *V
}

type channelList[K any, V any] struct {
mu sync.Mutex
channels []chan (keyValuePair[K, V])
value *V
}

func (c *Cache[K, V]) Get(key K) (V, bool) {
c.mu.Lock()
computedEntry, isComputed := c.computedEntries[key]
pendingEntry, isPending := c.pendingEntries[key]
if isComputed {
c.evictionList.MoveToFront(computedEntry.evictionListPointer)
c.mu.Unlock()
return computedEntry.value, true
}
if !isPending {
pendingEntry = &channelList[K, V]{}
c.pendingEntries[key] = pendingEntry
}
c.mu.Unlock()
if !isPending {
c.computeChannel <- key
}

pendingEntry.mu.Lock()
// Maybe the value was computed but hasn't been transfered from pending to computed yet
if pendingEntry.value != nil {
pendingEntry.mu.Unlock()
return *pendingEntry.value, isPending
}
channel := make(chan keyValuePair[K, V], 1)
pendingEntry.channels = append(pendingEntry.channels, channel)
pendingEntry.mu.Unlock()
value := <-channel
return *value.value, isPending
}

// Only exists for testing. Doesn't count as a usage for LRU purposes.
func (c *Cache[K, V]) has(key K) bool {
c.mu.Lock()
defer c.mu.Unlock()
_, ok := c.computedEntries[key]
return ok
}
Loading
Loading