██████╗░██╗░░░██╗░██████╗
██╔══██╗██║░░░██║██╔════╝
██████╦╝██║░░░██║╚█████╗░
██╔══██╗██║░░░██║░╚═══██╗
██████╦╝╚██████╔╝██████╔╝
╚═════╝░░╚═════╝░╚═════╝░
bus
is a robust, persistent message bus designed to streamline event handling with simplicity and flexibility. Based on task, a task runner, solid, a signal/broadcast library, and immuta, an append-only log, bus
delivers high performance, intuitive APIs, and resilient message persistence.
- ✅ Persistent Event Storage - Ensures message durability and reliability with a persistent log.
- ✅ Pattern Matching on Subjects - Supports wildcard patterns like
a.*.b
ora.>
to route events dynamically. - ✅ Request/Reply Pattern - Easily implement request-response communication with in-built support.
- ✅ HTTP and Server-Sent Events (SSE) - Uses both standard HTTP and SSE for flexible, web-friendly transport.
- ✅ Multi-Consumer Confirmation - Allows publishers to confirm when an event is acknowledged by a specified number of consumers.
- ✅ Ergonomic, Idiomatic API - Designed with simplicity, adhering closely to Golang conventions for ease of use.
- ✅ High Performance - Optimized for rapid event persistence and delivery.
- ✅ Redelivery and Acknowledgement - Provides automatic message redelivery and various acknowledgement strategies for reliability.
- ✅ CLI for Debugging - Comes with a command-line interface to publish, consume, and debug events easily.
To install bus
, use:
go get ella.to/bus@v0.3.3
to install a cli, run the following
go install ella.to/bus/cmd/bus@v0.3.3
and to run the server using docker, simply use the provided docker-compose and run it
docker-compose up
Namespaces have been introduced to efficiently organize events by ensuring that not all events are saved in a single file. Each namespace has its own dedicated file. All namespaces must be defined when starting the Bus server by using the --namespaces
flag.
Namespaces are essentially the first segment of a topic. For example, in the topic a.b.c, the namespace is a.
The Bus server also includes a special namespace called _bus_
, reserved for internal bus operations. It is strongly recommended not to consume events from the _bus_
namespace.
When defining namespaces, consider your business logic and choose meaningful names that clearly represent their purpose. For instance:
- If the Bus is used to handle RPC calls, a good namespace might be
rpc
. - For user-related operations, you might use user.
Event Sequencing Within Namespaces: The Bus guarantees the sequence of events stored within a single namespace. No Cross-Namespace Sequencing Guarantee: The Bus does not guarantee the sequence of messages stored across different namespaces. By following these guidelines, you can keep your Bus server organized and aligned with your application's goals.
At its core, bus is a pub/sub library, enabling asynchronous communication between publishers and subscribers. Here’s how to publish an event after creating a client
package main
import (
"context"
"ella.to/bus"
)
func main() {
client := bus.NewClient("http://localhost:2021")
ctx := context.Background()
// publish an event to subject "a.b.c" with data "hello world"
err := client.Put(
ctx,
bus.WithSubject("a.b.c"),
bus.WithData("hello world"),
).Error()
if err != nil {
panic(err)
}
// subscribe to subject "a.b.c" and since subscription is blocking
// we can use range to iterate over the events. For every event we
// need to ack it. If ack is not called, the event will be redelivered.
// Since an event is already published, we start from the oldest event by passing bus.WithStartFrom(bus.StartOldest) options.
for event, err := range client.Get(
ctx,
bus.WithSubject("a.b.c"),
bus.WithStartFrom(bus.StartOldest),
) {
if err != nil {
panic(err)
}
// do something with the event
// e.g. print the data
println(string(event.Payload))
// ack the event
if err := event.Ack(ctx); err != nil {
panic(err)
}
// since there is only one event, we can break the loop
break
}
}
for more examples, checkout examples folder
logo was created using https://fsymbols.com/generators/carty