@@ -43,7 +43,7 @@ type Channel struct {
4343
4444 topicName string
4545 name string
46- ctx * context
46+ nsqd * NSQD
4747
4848 backend BackendQueue
4949
@@ -71,7 +71,7 @@ type Channel struct {
7171}
7272
7373// NewChannel creates a new instance of the Channel type and returns a pointer
74- func NewChannel (topicName string , channelName string , ctx * context ,
74+ func NewChannel (topicName string , channelName string , nsqd * NSQD ,
7575 deleteCallback func (* Channel )) * Channel {
7676
7777 c := & Channel {
@@ -80,16 +80,16 @@ func NewChannel(topicName string, channelName string, ctx *context,
8080 memoryMsgChan : nil ,
8181 clients : make (map [int64 ]Consumer ),
8282 deleteCallback : deleteCallback ,
83- ctx : ctx ,
83+ nsqd : nsqd ,
8484 }
8585 // create mem-queue only if size > 0 (do not use unbuffered chan)
86- if ctx . nsqd .getOpts ().MemQueueSize > 0 {
87- c .memoryMsgChan = make (chan * Message , ctx . nsqd .getOpts ().MemQueueSize )
86+ if nsqd .getOpts ().MemQueueSize > 0 {
87+ c .memoryMsgChan = make (chan * Message , nsqd .getOpts ().MemQueueSize )
8888 }
89- if len (ctx . nsqd .getOpts ().E2EProcessingLatencyPercentiles ) > 0 {
89+ if len (nsqd .getOpts ().E2EProcessingLatencyPercentiles ) > 0 {
9090 c .e2eProcessingLatencyStream = quantile .New (
91- ctx . nsqd .getOpts ().E2EProcessingLatencyWindowTime ,
92- ctx . nsqd .getOpts ().E2EProcessingLatencyPercentiles ,
91+ nsqd .getOpts ().E2EProcessingLatencyWindowTime ,
92+ nsqd .getOpts ().E2EProcessingLatencyPercentiles ,
9393 )
9494 }
9595
@@ -100,30 +100,30 @@ func NewChannel(topicName string, channelName string, ctx *context,
100100 c .backend = newDummyBackendQueue ()
101101 } else {
102102 dqLogf := func (level diskqueue.LogLevel , f string , args ... interface {}) {
103- opts := ctx . nsqd .getOpts ()
103+ opts := nsqd .getOpts ()
104104 lg .Logf (opts .Logger , opts .LogLevel , lg .LogLevel (level ), f , args ... )
105105 }
106106 // backend names, for uniqueness, automatically include the topic...
107107 backendName := getBackendName (topicName , channelName )
108108 c .backend = diskqueue .New (
109109 backendName ,
110- ctx . nsqd .getOpts ().DataPath ,
111- ctx . nsqd .getOpts ().MaxBytesPerFile ,
110+ nsqd .getOpts ().DataPath ,
111+ nsqd .getOpts ().MaxBytesPerFile ,
112112 int32 (minValidMsgLength ),
113- int32 (ctx . nsqd .getOpts ().MaxMsgSize )+ minValidMsgLength ,
114- ctx . nsqd .getOpts ().SyncEvery ,
115- ctx . nsqd .getOpts ().SyncTimeout ,
113+ int32 (nsqd .getOpts ().MaxMsgSize )+ minValidMsgLength ,
114+ nsqd .getOpts ().SyncEvery ,
115+ nsqd .getOpts ().SyncTimeout ,
116116 dqLogf ,
117117 )
118118 }
119119
120- c .ctx . nsqd .Notify (c )
120+ c .nsqd .Notify (c )
121121
122122 return c
123123}
124124
125125func (c * Channel ) initPQ () {
126- pqSize := int (math .Max (1 , float64 (c .ctx . nsqd .getOpts ().MemQueueSize )/ 10 ))
126+ pqSize := int (math .Max (1 , float64 (c .nsqd .getOpts ().MemQueueSize )/ 10 ))
127127
128128 c .inFlightMutex .Lock ()
129129 c .inFlightMessages = make (map [MessageID ]* Message )
@@ -160,13 +160,13 @@ func (c *Channel) exit(deleted bool) error {
160160 }
161161
162162 if deleted {
163- c .ctx . nsqd .logf (LOG_INFO , "CHANNEL(%s): deleting" , c .name )
163+ c .nsqd .logf (LOG_INFO , "CHANNEL(%s): deleting" , c .name )
164164
165165 // since we are explicitly deleting a channel (not just at system exit time)
166166 // de-register this from the lookupd
167- c .ctx . nsqd .Notify (c )
167+ c .nsqd .Notify (c )
168168 } else {
169- c .ctx . nsqd .logf (LOG_INFO , "CHANNEL(%s): closing" , c .name )
169+ c .nsqd .logf (LOG_INFO , "CHANNEL(%s): closing" , c .name )
170170 }
171171
172172 // this forceably closes client connections
@@ -212,7 +212,7 @@ finish:
212212// it does not drain inflight/deferred because it is only called in Close()
213213func (c * Channel ) flush () error {
214214 if len (c .memoryMsgChan ) > 0 || len (c .inFlightMessages ) > 0 || len (c .deferredMessages ) > 0 {
215- c .ctx . nsqd .logf (LOG_INFO , "CHANNEL(%s): flushing %d memory %d in-flight %d deferred messages to backend" ,
215+ c .nsqd .logf (LOG_INFO , "CHANNEL(%s): flushing %d memory %d in-flight %d deferred messages to backend" ,
216216 c .name , len (c .memoryMsgChan ), len (c .inFlightMessages ), len (c .deferredMessages ))
217217 }
218218
@@ -221,7 +221,7 @@ func (c *Channel) flush() error {
221221 case msg := <- c .memoryMsgChan :
222222 err := writeMessageToBackend (msg , c .backend )
223223 if err != nil {
224- c .ctx . nsqd .logf (LOG_ERROR , "failed to write message to backend - %s" , err )
224+ c .nsqd .logf (LOG_ERROR , "failed to write message to backend - %s" , err )
225225 }
226226 default :
227227 goto finish
@@ -233,7 +233,7 @@ finish:
233233 for _ , msg := range c .inFlightMessages {
234234 err := writeMessageToBackend (msg , c .backend )
235235 if err != nil {
236- c .ctx . nsqd .logf (LOG_ERROR , "failed to write message to backend - %s" , err )
236+ c .nsqd .logf (LOG_ERROR , "failed to write message to backend - %s" , err )
237237 }
238238 }
239239 c .inFlightMutex .Unlock ()
@@ -243,7 +243,7 @@ finish:
243243 msg := item .Value .(* Message )
244244 err := writeMessageToBackend (msg , c .backend )
245245 if err != nil {
246- c .ctx . nsqd .logf (LOG_ERROR , "failed to write message to backend - %s" , err )
246+ c .nsqd .logf (LOG_ERROR , "failed to write message to backend - %s" , err )
247247 }
248248 }
249249 c .deferredMutex .Unlock ()
@@ -306,9 +306,9 @@ func (c *Channel) put(m *Message) error {
306306 case c .memoryMsgChan <- m :
307307 default :
308308 err := writeMessageToBackend (m , c .backend )
309- c .ctx . nsqd .SetHealth (err )
309+ c .nsqd .SetHealth (err )
310310 if err != nil {
311- c .ctx . nsqd .logf (LOG_ERROR , "CHANNEL(%s): failed to write message to backend - %s" ,
311+ c .nsqd .logf (LOG_ERROR , "CHANNEL(%s): failed to write message to backend - %s" ,
312312 c .name , err )
313313 return err
314314 }
@@ -331,9 +331,9 @@ func (c *Channel) TouchMessage(clientID int64, id MessageID, clientMsgTimeout ti
331331
332332 newTimeout := time .Now ().Add (clientMsgTimeout )
333333 if newTimeout .Sub (msg .deliveryTS ) >=
334- c .ctx . nsqd .getOpts ().MaxMsgTimeout {
334+ c .nsqd .getOpts ().MaxMsgTimeout {
335335 // we would have gone over, set to the max
336- newTimeout = msg .deliveryTS .Add (c .ctx . nsqd .getOpts ().MaxMsgTimeout )
336+ newTimeout = msg .deliveryTS .Add (c .nsqd .getOpts ().MaxMsgTimeout )
337337 }
338338
339339 msg .pri = newTimeout .UnixNano ()
@@ -398,7 +398,7 @@ func (c *Channel) AddClient(clientID int64, client Consumer) error {
398398 return nil
399399 }
400400
401- maxChannelConsumers := c .ctx . nsqd .getOpts ().MaxChannelConsumers
401+ maxChannelConsumers := c .nsqd .getOpts ().MaxChannelConsumers
402402 if maxChannelConsumers != 0 && len (c .clients ) >= maxChannelConsumers {
403403 return errors .New ("E_TOO_MANY_CHANNEL_CONSUMERS" )
404404 }
0 commit comments