diff --git a/cshared.go b/cshared.go index fa8353b..ec55e07 100644 --- a/cshared.go +++ b/cshared.go @@ -168,6 +168,12 @@ func FLBPluginInit(ptr unsafe.Pointer) int { func flbPluginReset() { theInputLock.Lock() defer theInputLock.Unlock() + defer func() { + if ret := recover(); ret != nil { + fmt.Fprintf(os.Stderr, "Channel is already closed") + return + } + }() once = sync.Once{} close(theChannel) diff --git a/cshared_test.go b/cshared_test.go index e501b2d..86d294f 100644 --- a/cshared_test.go +++ b/cshared_test.go @@ -26,9 +26,12 @@ func init() { registerWG.Done() } -func TestInputCallbackCtrlC(t *testing.T) { +func TestMain(m *testing.M) { defer flbPluginReset() + m.Run() +} +func TestInputCallbackCtrlC(t *testing.T) { theInputLock.Lock() theInput = testPluginInputCallbackCtrlC{} theInputLock.Unlock() @@ -83,8 +86,6 @@ func (t testPluginInputCallbackDangle) Collect(ctx context.Context, ch chan<- Me // Collect multiple times. This is inline with backward-compatible // behavior. func TestInputCallbackDangle(t *testing.T) { - defer flbPluginReset() - theInputLock.Lock() theInput = testPluginInputCallbackDangle{} theInputLock.Unlock() @@ -158,8 +159,6 @@ func (t testPluginInputCallbackInfinite) Collect(ctx context.Context, ch chan<- // TestInputCallbackInfinite is a test for the main method most plugins // use where they do not return from the first invocation of collect. func TestInputCallbackInfinite(t *testing.T) { - defer flbPluginReset() - theInputLock.Lock() theInput = testPluginInputCallbackInfinite{} theInputLock.Unlock() @@ -246,8 +245,6 @@ func (t testPluginInputCallbackLatency) Collect(ctx context.Context, ch chan<- M // TestInputCallbackInfiniteLatency is a test of the latency between // messages. func TestInputCallbackLatency(t *testing.T) { - defer flbPluginReset() - theInputLock.Lock() theInput = testPluginInputCallbackLatency{} theInputLock.Unlock() @@ -377,8 +374,6 @@ func (t testInputCallbackInfiniteConcurrent) Collect(ctx context.Context, ch cha // TestInputCallbackInfiniteConcurrent is meant to make sure we do not // break anythin with respect to concurrent ingest. func TestInputCallbackInfiniteConcurrent(t *testing.T) { - defer flbPluginReset() - theInputLock.Lock() theInput = testInputCallbackInfiniteConcurrent{} theInputLock.Unlock()