diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..485dee6 --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +.idea diff --git a/.golangci.yml b/.golangci.yml new file mode 100644 index 0000000..3662079 --- /dev/null +++ b/.golangci.yml @@ -0,0 +1,27 @@ +# This file contains all available configuration options +# with their default values. + +# options for analysis running +run: + + # timeout for analysis, e.g. 30s, 5m, default is 1m + deadline: 1m + + # exit code when at least one issue was found, default is 1 + issues-exit-code: 1 + + skip-dirs: + - snap + - parts + - stage + - tmp + - prime + - vendor + + + +linters: + disable: + - megacheck + + diff --git a/.travis.yml b/.travis.yml new file mode 100644 index 0000000..08db94b --- /dev/null +++ b/.travis.yml @@ -0,0 +1,28 @@ +dist: xenial +env: + global: + - COMMIT=${TRAVIS_COMMIT::8} + - GO111MODULE=on + +language: go + +go: +- 1.11.x + +git: + depth: 1 + +before_install: +- go get github.com/mattn/goveralls + +script: +- curl -sfL https://install.goreleaser.com/github.com/golangci/golangci-lint.sh | bash -s -- -b $GOPATH/bin v1.15.0 +- golangci-lint run +- go test -v -race ./... -coverprofile=fifo.coverprofile +- goveralls -coverprofile=fifo.coverprofile -service travis-ci + +after_success: + - mv fifo.coverprofile coverage.txt + - bash <(curl -s https://codecov.io/bash) + + diff --git a/fifo.go b/fifo.go index 3ffc145..49c9f73 100644 --- a/fifo.go +++ b/fifo.go @@ -3,14 +3,9 @@ // TODO: // - travis CI -// - maybe add method (*Queue).Peek() package fifo -import ( - "sync" -) - const chunkSize = 64 // chunks are used to make a queue auto resizeable. @@ -21,16 +16,15 @@ type chunk struct { } // fifo queue -type Queue struct { - head, tail *chunk // chunk head and tail - count int // total amount of items in the queue - lock sync.Mutex // synchronisation lock +type UnsafeQueue struct { + head, tail *chunk // chunk head and tail + count int // total amount of items in the queue } -// NewQueue creates a new and empty *fifo.Queue -func NewQueue() (q *Queue) { +// NewUnsafeQueue creates a new and empty *fifo.UnsafeQueue +func NewUnsafeQueue() (q *UnsafeQueue) { initChunk := new(chunk) - q = &Queue{ + q = &UnsafeQueue{ head: initChunk, tail: initChunk, } @@ -38,22 +32,14 @@ func NewQueue() (q *Queue) { } // Return the number of items in the queue -func (q *Queue) Len() (length int) { - // locking to make Queue thread-safe - q.lock.Lock() - defer q.lock.Unlock() - +func (q *UnsafeQueue) Len() (length int) { // copy q.count and return length length = q.count return length } // Add an item to the end of the queue -func (q *Queue) Add(item interface{}) { - // locking to make Queue thread-safe - q.lock.Lock() - defer q.lock.Unlock() - +func (q *UnsafeQueue) Add(item interface{}) { // check if item is valid if item == nil { panic("can not add nil item to fifo queue") @@ -71,12 +57,100 @@ func (q *Queue) Add(item interface{}) { q.count++ } +// Adds an list of items to the queue +func (q *UnsafeQueue) AddList(items []interface{}) { + // check if item is valid + if len(items) == 0 { + // len(nil) == 0 as well + return + } + // + if len(items) > chunkSize { // Add each piece separated + chunks := len(items) / chunkSize + if chunks*chunkSize != len(items) { // Rouding up + chunks++ + } + + for i := 0; i < chunks; i++ { + s := i * chunkSize + e := (i + 1) * chunkSize + + if e > len(items) { + e = len(items) + } + + q.AddList(items[s:e]) + } + return + } + + // if the tail chunk is full, create a new one and add it to the queue. + if q.tail.last >= chunkSize { + q.tail.next = new(chunk) + q.tail = q.tail.next + } + + s := q.tail.last + e := len(items) - s + n := copy(q.tail.items[s:e], items) + q.tail.last += n + q.count += n + items = items[e:] + + if len(items) > 0 { + q.AddList(items) // Add Remaining Items + } +} + +// Returns the next N elements from the queue +// In case of not enough elements, returns the elements that are available +func (q *UnsafeQueue) NextN(n int) []interface{} { + if n > chunkSize { + // Recursive call to append + chunks := n / chunkSize + if chunks*chunkSize < n { + chunks++ + } + + out := make([]interface{}, 0) + read := 0 + for i := 0; i < chunks; i++ { + e := chunkSize + if read+e > n { + e = n - read + } + + out = append(out, q.NextN(e)...) + } + return out + } + + if q.count < n { + n = q.count // Not enough elements + } + + if q.count == 0 || q.head.first >= q.head.last { + return make([]interface{}, 0) + } + + // TODO: Slice it + out := make([]interface{}, n) + + read := 0 + for i := 0; i < n; i++ { + if q.count == 0 { + break + } + read++ + out[i] = q.Next() + } + + return out[:read] +} + // Remove the item at the head of the queue and return it. // Returns nil when there are no items left in queue. -func (q *Queue) Next() (item interface{}) { - // locking to make Queue thread-safe - q.lock.Lock() - defer q.lock.Unlock() +func (q *UnsafeQueue) Next() (item interface{}) { // Return nil if there are no items to return if q.count == 0 { @@ -111,3 +185,19 @@ func (q *Queue) Next() (item interface{}) { // return the retrieved item return item } + +// Reads the item at the head of the queue without removing it +// Returns nil when there are no items left in queue +func (q *UnsafeQueue) Peek() (item interface{}) { + // Return nil if there are no items to return + if q.count == 0 { + return nil + } + // FIXME: why would this check be required? + if q.head.first >= q.head.last { + return nil + } + + // Get item from queue + return q.head.items[q.head.first] +} diff --git a/fifo_test.go b/fifo_test.go index ac1c539..03cff01 100644 --- a/fifo_test.go +++ b/fifo_test.go @@ -20,15 +20,22 @@ func testAssert(t *testing.T, b bool, objs ...interface{}) { func TestBasic(t *testing.T) { q := NewQueue() - testAssert(t, q.Len() == 0, "Could not assert that new Queue has length zero (0).") + testAssert(t, q.Len() == 0, "Could not assert that new UnsafeQueue has length zero (0).") + q.Add(10) - testAssert(t, q.Len() == 1, "Could not assert that Queue has lenght 1 after adding one item.") - testAssert(t, q.Next().(int) == 10, "Could not retrieve item from Queue correctly.") - testAssert(t, q.Len() == 0, "Could not assert that Queue has length 0 after retrieving item.") + testAssert(t, q.Len() == 1, "Could not assert that UnsafeQueue has lenght 1 after adding one item.") + testAssert(t, q.Next().(int) == 10, "Could not retrieve item from UnsafeQueue correctly.") + testAssert(t, q.Len() == 0, "Could not assert that UnsafeQueue has length 0 after retrieving item.") + + q.Add(11) + testAssert(t, q.Len() == 1, "Could not assert that UnsafeQueue has length 1 after adding one item the second time.") + testAssert(t, q.Next().(int) == 11, "Could not retrieve item from UnsafeQueue correctly the second time.") + testAssert(t, q.Len() == 0, "Could not assert that UnsafeQueue has length 0 after retrieving item the second time.") + q.Add(11) - testAssert(t, q.Len() == 1, "Could not assert that Queue has length 1 after adding one item the second time.") - testAssert(t, q.Next().(int) == 11, "Could not retrieve item from Queue correctly the second time.") - testAssert(t, q.Len() == 0, "Could not assert that Queue has length 0 after retrieving item the second time.") + v := q.Peek() + testAssert(t, q.Len() == 1, "Could not assert that UnsafeQueue has length 1 after adding one item the second time.") + testAssert(t, v.(int) == 11, "Could not retrieve item from UnsafeQueue correctly the second time.") } func TestRandomized(t *testing.T) { @@ -54,3 +61,73 @@ func TestRandomized(t *testing.T) { } } } + +func TestRandomizedLockUnsafeQueue(t *testing.T) { + var first, last int + q := NewQueue() + for i := 0; i < 10000; i++ { + if rand.Intn(2) == 0 { + count := rand.Intn(128) + for j := 0; j < count; j++ { + q.Add(last) + last++ + } + } else { + count := rand.Intn(128) + if count > (last - first) { + count = last - first + } + for i := 0; i < count; i++ { + testAssert(t, q.Len() > 0, "len==0", q.Len()) + testAssert(t, q.Next().(int) == first) + first++ + } + } + } +} + +func TestAddList(t *testing.T) { + q := NewUnsafeQueue() + for i := 0; i < 1000; i++ { + elements := rand.Intn(128) + o := make([]interface{}, elements) + for j := 0; j < elements; j++ { + o[j] = rand.Intn(1024) + } + + q.AddList(o) + + testAssert(t, q.count == elements, "Num Elements: ", q.count, elements) + + for j := 0; j < elements; j++ { + v := q.Next().(int) + testAssert(t, v == o[j].(int), "Element at", j, v, o[j].(int)) + } + + testAssert(t, q.count == 0, "UnsafeQueue should be empty", q.count) + } +} + +func TestReadList(t *testing.T) { + q := NewUnsafeQueue() + for i := 0; i < 1000; i++ { + elements := rand.Intn(128) + o := make([]interface{}, elements) + for j := 0; j < elements; j++ { + o[j] = rand.Intn(1024) + } + + q.AddList(o) + + testAssert(t, q.count == elements, "Num Elements: ", q.count, elements) + + newV := q.NextN(elements) + + for j := 0; j < elements; j++ { + v := newV[j].(int) + testAssert(t, v == o[j].(int), "Element at", j, v, o[j].(int)) + } + + testAssert(t, q.count == 0, "UnsafeQueue should be empty", q.count) + } +} diff --git a/readme.md b/readme.md index 2251511..3a4d413 100644 --- a/readme.md +++ b/readme.md @@ -14,8 +14,8 @@ The queue itself is implemented as a single-linked list of chunks containing max package main import ( - "github.com/foize/go.fifo" "fmt" + "github.com/foize/go.fifo" ) func main() { @@ -38,8 +38,8 @@ func main() { package main import ( - "github.com/foize/go.fifo" "fmt" + "github.com/foize/go.fifo" ) type thing struct { @@ -64,7 +64,7 @@ func main() { // retrieve items from the queue for { // get a new item from the things queue - item := things.Next(); + item := things.Next() // check if there was an item if item == nil { @@ -104,4 +104,7 @@ There are several differences: - Add() does not accept nil interface{} and will panic when trying to add nil interface{}. - Made fifo.Queue thread/goroutine-safe (sync.Mutex) - Added a lot of comments -- renamed internal variable/field names \ No newline at end of file +- renamed internal variable/field names +- added AddList([]interface{}) to add several items in one call +- added NextN(int) []interface to receive several items in one call +- added UnsafeQueue for non locking operations \ No newline at end of file diff --git a/safefifo.go b/safefifo.go new file mode 100644 index 0000000..9cf5ee5 --- /dev/null +++ b/safefifo.go @@ -0,0 +1,75 @@ +package fifo + +import ( + "sync" +) + +// fifo queue +type Queue struct { + l sync.Mutex + queue *UnsafeQueue +} + +// NewUnsafeQueue creates a new and empty *fifo.UnsafeQueue +func NewQueue() (q *Queue) { + q = &Queue{ + l: sync.Mutex{}, + queue: NewUnsafeQueue(), + } + return q +} + +// Return the number of items in the queue +func (q *Queue) Len() (length int) { + // locking to make UnsafeQueue thread-safe + q.l.Lock() + c := q.queue.Len() + q.l.Unlock() + + return c +} + +// Add an item to the end of the queue +func (q *Queue) Add(item interface{}) { + // locking to make UnsafeQueue thread-safe + q.l.Lock() + q.queue.Add(item) + q.l.Unlock() +} + +func (q *Queue) AddList(items []interface{}) { + q.l.Lock() + q.queue.AddList(items) + q.l.Unlock() +} + +// Remove the item at the head of the queue and return it. +// Returns nil when there are no items left in queue. +func (q *Queue) Next() (item interface{}) { + // locking to make UnsafeQueue thread-safe + q.l.Lock() + i := q.queue.Next() + q.l.Unlock() + + return i +} + +// Returns the next N elements from the queue +// In case of not enough elements, returns the elements that are available +func (q *Queue) NextN(n int) []interface{} { + q.l.Lock() + i := q.queue.NextN(n) + q.l.Unlock() + + return i +} + +// Reads the item at the head of the queue without removing it +// Returns nil when there are no items left in queue +func (q *Queue) Peek() (item interface{}) { + q.l.Lock() + i := q.queue.Peek() + q.l.Unlock() + + return i +}