Skip to content

Added Unsafe Methods for Group Processing #2

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
.idea
27 changes: 27 additions & 0 deletions .golangci.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# This file contains all available configuration options
# with their default values.

# options for analysis running
run:

# timeout for analysis, e.g. 30s, 5m, default is 1m
deadline: 1m

# exit code when at least one issue was found, default is 1
issues-exit-code: 1

skip-dirs:
- snap
- parts
- stage
- tmp
- prime
- vendor



linters:
disable:
- megacheck


28 changes: 28 additions & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
dist: xenial
env:
global:
- COMMIT=${TRAVIS_COMMIT::8}
- GO111MODULE=on

language: go

go:
- 1.11.x

git:
depth: 1

before_install:
- go get github.com/mattn/goveralls

script:
- curl -sfL https://install.goreleaser.com/github.com/golangci/golangci-lint.sh | bash -s -- -b $GOPATH/bin v1.15.0
- golangci-lint run
- go test -v -race ./... -coverprofile=fifo.coverprofile
- goveralls -coverprofile=fifo.coverprofile -service travis-ci

after_success:
- mv fifo.coverprofile coverage.txt
- bash <(curl -s https://codecov.io/bash)


142 changes: 116 additions & 26 deletions fifo.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,9 @@

// TODO:
// - travis CI
// - maybe add method (*Queue).Peek()

package fifo

import (
"sync"
)

const chunkSize = 64

// chunks are used to make a queue auto resizeable.
Expand All @@ -21,39 +16,30 @@ type chunk struct {
}

// fifo queue
type Queue struct {
head, tail *chunk // chunk head and tail
count int // total amount of items in the queue
lock sync.Mutex // synchronisation lock
type UnsafeQueue struct {
head, tail *chunk // chunk head and tail
count int // total amount of items in the queue
}

// NewQueue creates a new and empty *fifo.Queue
func NewQueue() (q *Queue) {
// NewUnsafeQueue creates a new and empty *fifo.UnsafeQueue
func NewUnsafeQueue() (q *UnsafeQueue) {
initChunk := new(chunk)
q = &Queue{
q = &UnsafeQueue{
head: initChunk,
tail: initChunk,
}
return q
}

// Return the number of items in the queue
func (q *Queue) Len() (length int) {
// locking to make Queue thread-safe
q.lock.Lock()
defer q.lock.Unlock()

func (q *UnsafeQueue) Len() (length int) {
// copy q.count and return length
length = q.count
return length
}

// Add an item to the end of the queue
func (q *Queue) Add(item interface{}) {
// locking to make Queue thread-safe
q.lock.Lock()
defer q.lock.Unlock()

func (q *UnsafeQueue) Add(item interface{}) {
// check if item is valid
if item == nil {
panic("can not add nil item to fifo queue")
Expand All @@ -71,12 +57,100 @@ func (q *Queue) Add(item interface{}) {
q.count++
}

// Adds an list of items to the queue
func (q *UnsafeQueue) AddList(items []interface{}) {
// check if item is valid
if len(items) == 0 {
// len(nil) == 0 as well
return
}
//
if len(items) > chunkSize { // Add each piece separated
chunks := len(items) / chunkSize
if chunks*chunkSize != len(items) { // Rouding up
chunks++
}

for i := 0; i < chunks; i++ {
s := i * chunkSize
e := (i + 1) * chunkSize

if e > len(items) {
e = len(items)
}

q.AddList(items[s:e])
}
return
}

// if the tail chunk is full, create a new one and add it to the queue.
if q.tail.last >= chunkSize {
q.tail.next = new(chunk)
q.tail = q.tail.next
}

s := q.tail.last
e := len(items) - s
n := copy(q.tail.items[s:e], items)
q.tail.last += n
q.count += n
items = items[e:]

if len(items) > 0 {
q.AddList(items) // Add Remaining Items
}
}

// Returns the next N elements from the queue
// In case of not enough elements, returns the elements that are available
func (q *UnsafeQueue) NextN(n int) []interface{} {
if n > chunkSize {
// Recursive call to append
chunks := n / chunkSize
if chunks*chunkSize < n {
chunks++
}

out := make([]interface{}, 0)
read := 0
for i := 0; i < chunks; i++ {
e := chunkSize
if read+e > n {
e = n - read
}

out = append(out, q.NextN(e)...)
}
return out
}

if q.count < n {
n = q.count // Not enough elements
}

if q.count == 0 || q.head.first >= q.head.last {
return make([]interface{}, 0)
}

// TODO: Slice it
out := make([]interface{}, n)

read := 0
for i := 0; i < n; i++ {
if q.count == 0 {
break
}
read++
out[i] = q.Next()
}

return out[:read]
}

// Remove the item at the head of the queue and return it.
// Returns nil when there are no items left in queue.
func (q *Queue) Next() (item interface{}) {
// locking to make Queue thread-safe
q.lock.Lock()
defer q.lock.Unlock()
func (q *UnsafeQueue) Next() (item interface{}) {

// Return nil if there are no items to return
if q.count == 0 {
Expand Down Expand Up @@ -111,3 +185,19 @@ func (q *Queue) Next() (item interface{}) {
// return the retrieved item
return item
}

// Reads the item at the head of the queue without removing it
// Returns nil when there are no items left in queue
func (q *UnsafeQueue) Peek() (item interface{}) {
// Return nil if there are no items to return
if q.count == 0 {
return nil
}
// FIXME: why would this check be required?
if q.head.first >= q.head.last {
return nil
}

// Get item from queue
return q.head.items[q.head.first]
}
91 changes: 84 additions & 7 deletions fifo_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,22 @@ func testAssert(t *testing.T, b bool, objs ...interface{}) {

func TestBasic(t *testing.T) {
q := NewQueue()
testAssert(t, q.Len() == 0, "Could not assert that new Queue has length zero (0).")
testAssert(t, q.Len() == 0, "Could not assert that new UnsafeQueue has length zero (0).")

q.Add(10)
testAssert(t, q.Len() == 1, "Could not assert that Queue has lenght 1 after adding one item.")
testAssert(t, q.Next().(int) == 10, "Could not retrieve item from Queue correctly.")
testAssert(t, q.Len() == 0, "Could not assert that Queue has length 0 after retrieving item.")
testAssert(t, q.Len() == 1, "Could not assert that UnsafeQueue has lenght 1 after adding one item.")
testAssert(t, q.Next().(int) == 10, "Could not retrieve item from UnsafeQueue correctly.")
testAssert(t, q.Len() == 0, "Could not assert that UnsafeQueue has length 0 after retrieving item.")

q.Add(11)
testAssert(t, q.Len() == 1, "Could not assert that UnsafeQueue has length 1 after adding one item the second time.")
testAssert(t, q.Next().(int) == 11, "Could not retrieve item from UnsafeQueue correctly the second time.")
testAssert(t, q.Len() == 0, "Could not assert that UnsafeQueue has length 0 after retrieving item the second time.")

q.Add(11)
testAssert(t, q.Len() == 1, "Could not assert that Queue has length 1 after adding one item the second time.")
testAssert(t, q.Next().(int) == 11, "Could not retrieve item from Queue correctly the second time.")
testAssert(t, q.Len() == 0, "Could not assert that Queue has length 0 after retrieving item the second time.")
v := q.Peek()
testAssert(t, q.Len() == 1, "Could not assert that UnsafeQueue has length 1 after adding one item the second time.")
testAssert(t, v.(int) == 11, "Could not retrieve item from UnsafeQueue correctly the second time.")
}

func TestRandomized(t *testing.T) {
Expand All @@ -54,3 +61,73 @@ func TestRandomized(t *testing.T) {
}
}
}

func TestRandomizedLockUnsafeQueue(t *testing.T) {
var first, last int
q := NewQueue()
for i := 0; i < 10000; i++ {
if rand.Intn(2) == 0 {
count := rand.Intn(128)
for j := 0; j < count; j++ {
q.Add(last)
last++
}
} else {
count := rand.Intn(128)
if count > (last - first) {
count = last - first
}
for i := 0; i < count; i++ {
testAssert(t, q.Len() > 0, "len==0", q.Len())
testAssert(t, q.Next().(int) == first)
first++
}
}
}
}

func TestAddList(t *testing.T) {
q := NewUnsafeQueue()
for i := 0; i < 1000; i++ {
elements := rand.Intn(128)
o := make([]interface{}, elements)
for j := 0; j < elements; j++ {
o[j] = rand.Intn(1024)
}

q.AddList(o)

testAssert(t, q.count == elements, "Num Elements: ", q.count, elements)

for j := 0; j < elements; j++ {
v := q.Next().(int)
testAssert(t, v == o[j].(int), "Element at", j, v, o[j].(int))
}

testAssert(t, q.count == 0, "UnsafeQueue should be empty", q.count)
}
}

func TestReadList(t *testing.T) {
q := NewUnsafeQueue()
for i := 0; i < 1000; i++ {
elements := rand.Intn(128)
o := make([]interface{}, elements)
for j := 0; j < elements; j++ {
o[j] = rand.Intn(1024)
}

q.AddList(o)

testAssert(t, q.count == elements, "Num Elements: ", q.count, elements)

newV := q.NextN(elements)

for j := 0; j < elements; j++ {
v := newV[j].(int)
testAssert(t, v == o[j].(int), "Element at", j, v, o[j].(int))
}

testAssert(t, q.count == 0, "UnsafeQueue should be empty", q.count)
}
}
11 changes: 7 additions & 4 deletions readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ The queue itself is implemented as a single-linked list of chunks containing max
package main

import (
"github.com/foize/go.fifo"
"fmt"
"github.com/foize/go.fifo"
)

func main() {
Expand All @@ -38,8 +38,8 @@ func main() {
package main

import (
"github.com/foize/go.fifo"
"fmt"
"github.com/foize/go.fifo"
)

type thing struct {
Expand All @@ -64,7 +64,7 @@ func main() {
// retrieve items from the queue
for {
// get a new item from the things queue
item := things.Next();
item := things.Next()

// check if there was an item
if item == nil {
Expand Down Expand Up @@ -104,4 +104,7 @@ There are several differences:
- Add() does not accept nil interface{} and will panic when trying to add nil interface{}.
- Made fifo.Queue thread/goroutine-safe (sync.Mutex)
- Added a lot of comments
- renamed internal variable/field names
- renamed internal variable/field names
- added AddList([]interface{}) to add several items in one call
- added NextN(int) []interface to receive several items in one call
- added UnsafeQueue for non locking operations
Loading