wkafka is a wrapper for kafka library to initialize and use for microservices.
go get github.com/worldline-go/wkafka
This library is using franz-go.
First set the connection config to create a new kafka client.
Main config struct that contains brokers, security settings and consumer validation.
brokers: # list of brokers, default is empty
- localhost:9092
security:
tls:
enabled: false
cert_file: ""
key_file: ""
ca_file: ""
sasl: # SASL/SCRAM authentication could be multiple and will be used in order
- plain:
enabled: false
user: ""
pass: ""
scram:
enabled: false
algorithm: "" # "SCRAM-SHA-256" or "SCRAM-SHA-512"
user: ""
pass: ""
consumer: # consumer validation and default values
prefix_group_id: "" # add always a prefix to group id
format_dlq_topic: "" # format dead letter topic name, ex: "finops_{{.AppName}}_dlq"
validation:
group_id: # validate group id
enabled: false
rgx_group_id: "" # regex to validate group id ex: "^finops_.*$"
For creating a consumer we need to give additional consumer config when initializing the client.
topics: [] # list of topics to subscribe
group_id: "" # group id to subscribe, make is as unique as possible per service
# start offset to consume, 0 is the earliest offset, -1 is the latest offset and more than 0 is the offset number
# group_id has already committed offset then this will be ignored
start_offset: 0 # -1 to start end of the offsets
skip: # this is programatically skip, kafka will still consume the message
# example skip topic and offset
mytopic: # topic name to skip
0: # partition number
offsets: # list of offsets to skip
- 31
- 90
before: 20 # skip all offsets before or equal to this offset
# max records to consume per poll, 0 is default value from kafka usually 500
# no need to touch most of the time, but batch consume's count max is max_poll_records
max_poll_records: 0
# max records to consume per batch to give callback function, default is 100
# if this value is more than max_poll_records then max_poll_records will be used
batch_count: 100
dlq:
disabled: false # disable dead letter queue
topic: "" # dead letter topic name, it can be assigned in the kafka config's format_dlq_topic
retry_interval: "10s" # retry time interval of the message if can't be processed, default is 10s
retry_max_interval: "15m" # max interval for exponential time duration limit, default is 15m
start_offset: 0 # -1 to start end of the offsets
skip: # same as skip but just for dead letter topic and not need to specify topic name
# example skip offset
0:
offsets:
- 31
before: 20
Always give the client information so we can view in publish message's headers and kafka UI.
client, err := wkafka.New(
ctx, kafkaConfig,
wkafka.WithConsumer(consumeConfig),
wkafka.WithClientInfo("testapp", "v0.1.0"),
)
if err != nil {
return err
}
defer client.Close()
Now you need to run consumer with a callback function.
There is 2 options to run consumer, batch or single (WithCallbackBatch or WithCallback).
Default decoder is json, but you can change it with WithDecode option.
If you use []byte
as data type then raw data will be passed to the callback function, batch consumer like [][]byte
type.
// example single consumer
if err := client.Consume(ctx, wkafka.WithCallback(ProcessSingle)); err != nil {
return fmt.Errorf("consume: %w", err)
}
Send record to dead letter queue, use WrapErrDLQ function with to wrap the error and it will be send to dead letter queue.
Check the aditional options for custom decode and precheck.
Editing the skip map and use our handler to initialize server mux.
// import github.com/worldline-go/wkafka/handler
mux := http.NewServeMux()
mux.Handle(handler.New(client))
Handler Example
make env
# run the example
EXAMPLE=consumer_single_handler make example
Add messages in here to skip the message http://localhost:7071
Use consumer client or create without consumer settings, New
also try to connect to brokers.
client, err := wkafka.New(kafkaConfig)
if err != nil {
return err
}
defer client.Close()
Create a producer based of client and specific data type.
WithHook, WithEncoder, WithHeaders options are optional.
Use WithHook to get metadata of the record and modify to produce record.
producer, err := wkafka.NewProducer[*Data](client, "test", wkafka.WithHook(ProduceHook))
if err != nil {
return err
}
return producer.Produce(ctx, data)
go get github.com/twmb/franz-go/plugin/kotel
Use that with initializing the kafka client.
kafkaTracer := kotel.NewTracer()
kafkaClient, err = wkafka.New(ctx,
config.Application.KafkaConfig,
wkafka.WithConsumer(config.Application.KafkaConsumer),
wkafka.WithClientInfo(config.ServiceName, config.ServiceVersion),
wkafka.WithKGOOptions(kgo.WithHooks(kotel.NewKotel(kotel.WithTracer(kafkaTracer)).Hooks()...)),
)
Important to have span kind as producer.
ctx, spanKafka := otel.Tracer("").Start(ctx, "produce_message", trace.WithSpanKind(trace.SpanKindProducer))
defer spanKafka.End()
if err := h.KafkaProducer.Produce(ctx, product); err != nil {
spanKafka.SetStatus(codes.Error, err.Error())
return c.JSON(http.StatusBadRequest, model.Message{
Message: err.Error(),
})
}
k.Tracer
is we initialized on kafka client (kotel.NewTracer()
).
func (k *Kafka) Consume(ctx context.Context, product model.Product) error {
// use tracer's returned ctx for next spans
ctx, span := k.Tracer.WithProcessSpan(wkafka.CtxRecord(ctx))
defer span.End()
span.SetAttributes(attribute.String("product.name", product.Name))
log.Info().Str("product", product.Name).Str("description", product.Description).Msg("consume message")
return nil
}
Initialize kafka and redpanda console with docker-compose.
# using "docker compose" command, if you use podman then add compose extension and link docker with podman binary
make env
Service | Description |
---|---|
localhost:9092 | Kafka broker |
localhost:7071 | Redpanda console |
Use examples with EXAMPLE
env variable:
EXAMPLE=... make example