Skip to content

go-xlan/go-rocketmq

Folders and files

NameName
Last commit message
Last commit date

Latest commit

Β 

History

5 Commits
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 

Repository files navigation

GitHub Workflow Status (branch) GoDoc Coverage Status Supported Go Versions GitHub Release Go Report Card

go-rocketmq

Apache RocketMQ client wrapper with simplified producer and consumer interfaces for Go.


CHINESE README

δΈ­ζ–‡θ―΄ζ˜Ž

Main Features

πŸš€ Simplified API: Clean producer and consumer API for RocketMQ message queue operations πŸ”Œ Auto Resolution: Automatic hostname resolution with IPv4 address support πŸ›‘οΈ Seamless Errors: Elegant error handling with mate ecosystem integration πŸ“ Go Practices: Interface design following Go best practices πŸ”„ Auto Pulling: Automatic message pulling with callback-based message handling βš™οΈ Configurable: Timeout and retry settings for message sending operations

Installation

go get github.com/go-xlan/go-rocketmq

Prerequisites

  • Go 1.23.0 or higher
  • Apache RocketMQ server instance (NameServer accessible)

Usage

Complete Producer and Consumer Example

This example shows how to create both a producer and consumer in the same program.

package main

import (
	"context"
	"time"

	"github.com/apache/rocketmq-client-go/v2/consumer"
	"github.com/apache/rocketmq-client-go/v2/primitive"
	"github.com/go-xlan/go-rocketmq/rocketmq"
	"github.com/yyle88/must"
	"github.com/yyle88/neatjson/neatjsons"
	"github.com/yyle88/rese"
	"github.com/yyle88/zaplog"
)

func main() {
	config := &rocketmq.Config{
		NameServerAddress: "127.0.0.1:9876",
		GroupName:         "TestGroup",
		ProducerOptions: &rocketmq.ProducerOptions{
			SendMessageTimeout: 3 * time.Second,
			RetryAttempts:      3,
		},
	}

	const topic = "TestTopic"

	consumerCli := rese.P1(rocketmq.NewConsumer(config))
	defer rese.F0(consumerCli.Close)
	must.Done(consumerCli.StartSubscribe(topic, func(message *primitive.MessageExt) (consumer.ConsumeResult, error) {
		zaplog.SUG.Debugln("consume message body:", string(message.Body))
		return consumer.ConsumeSuccess, nil
	}))

	producerCli := rese.P1(rocketmq.NewProducer(config))
	defer rese.F0(producerCli.Close)
	for idx := 0; idx < 10000; idx++ {
		type MessagePayload struct {
			Name       string `json:"name"`
			SequenceNo int64  `json:"sequenceNo"`
		}

		payload := neatjsons.S(&MessagePayload{
			Name:       "demo",
			SequenceNo: int64(idx),
		})
		must.Done(producerCli.SendMessage(context.Background(), topic, []byte(payload)))
		time.Sleep(time.Second)
	}

	select {}
}

⬆️ Source: Source

API Reference

Configuration

  • Config - Client configuration for both producer and consumer
  • ProducerOptions - Producer specific configuration options

Producer

  • NewProducer(config *Config) (*Producer, error) - Creates and starts a new producer instance
  • SendMessage(ctx context.Context, topic string, payload []byte) error - Sends message to specified topic
  • Close() error - Shuts down producer and releases resources

Consumer

  • NewConsumer(config *Config) (*Consumer, error) - Creates a new consumer instance
  • StartSubscribe(topic string, handler func(*primitive.MessageExt) (consumer.ConsumeResult, error)) error - Starts consumer and subscribes to topic
  • Close() error - Shuts down consumer and releases resources

Utilities

  • ResolveNameServer(nameServer string) ([]string, error) - Resolves NameServer hostname to IPv4 addresses

Examples

Creating Producer

Basic producer setup:

config := &rocketmq.Config{
    NameServerAddress: "127.0.0.1:9876",
    GroupName:         "ProducerGroup",
    ProducerOptions: &rocketmq.ProducerOptions{
        SendMessageTimeout: 3 * time.Second,
        RetryAttempts:      3,
    },
}
producerCli := rese.P1(rocketmq.NewProducer(config))
defer rese.F0(producerCli.Close)

Sending messages:

payload := []byte(`{"name":"demo","value":123}`)
must.Done(producerCli.SendMessage(context.Background(), "MyTopic", payload))

Creating Consumer

Basic consumer setup:

config := &rocketmq.Config{
    NameServerAddress: "127.0.0.1:9876",
    GroupName:         "ConsumerGroup",
}
consumerCli := rese.P1(rocketmq.NewConsumer(config))
defer rese.F0(consumerCli.Close)

Subscribing to topics:

must.Done(consumerCli.StartSubscribe("MyTopic", func(message *primitive.MessageExt) (consumer.ConsumeResult, error) {
    fmt.Printf("Message: %s\n", string(message.Body))
    return consumer.ConsumeSuccess, nil
}))

Hostname Resolution

Resolve NameServer hostname to IPv4 addresses:

addresses := rese.V1(rocketmq.ResolveNameServer("mq.example.com:9876"))
fmt.Printf("Resolved addresses: %v\n", addresses)

Dependencies

This project uses the mate ecosystem for elegant error handling and resource management:

  • github.com/apache/rocketmq-client-go/v2 - Official Apache RocketMQ Go client
  • github.com/yyle88/erero - Error handling
  • github.com/yyle88/must - Panic-based assertion for errors
  • github.com/yyle88/rese - Resource management with panic handling
  • github.com/yyle88/zaplog - Structured logging
  • go.uber.org/zap - Logging foundation

πŸ“„ License

MIT License. See LICENSE.


🀝 Contributing

Contributions are welcome! Report bugs, suggest features, and contribute code:

  • πŸ› Found a mistake? Open an issue on GitHub with reproduction steps
  • πŸ’‘ Have a feature idea? Create an issue to discuss the suggestion
  • πŸ“– Documentation confusing? Report it so we can improve
  • πŸš€ Need new features? Share the use cases to help us understand requirements
  • ⚑ Performance issue? Help us optimize through reporting slow operations
  • πŸ”§ Configuration problem? Ask questions about complex setups
  • πŸ“’ Follow project progress? Watch the repo to get new releases and features
  • 🌟 Success stories? Share how this package improved the workflow
  • πŸ’¬ Feedback? We welcome suggestions and comments

πŸ”§ Development

New code contributions, follow this process:

  1. Fork: Fork the repo on GitHub (using the webpage UI).
  2. Clone: Clone the forked project (git clone https://github.com/yourname/repo-name.git).
  3. Navigate: Navigate to the cloned project (cd repo-name)
  4. Branch: Create a feature branch (git checkout -b feature/xxx).
  5. Code: Implement the changes with comprehensive tests
  6. Testing: (Golang project) Ensure tests pass (go test ./...) and follow Go code style conventions
  7. Documentation: Update documentation to support client-facing changes and use significant commit messages
  8. Stage: Stage changes (git add .)
  9. Commit: Commit changes (git commit -m "Add feature xxx") ensuring backward compatible code
  10. Push: Push to the branch (git push origin feature/xxx).
  11. PR: Open a merge request on GitHub (on the GitHub webpage) with detailed description.

Please ensure tests pass and include relevant documentation updates.


🌟 Support

Welcome to contribute to this project via submitting merge requests and reporting issues.

Project Support:

  • ⭐ Give GitHub stars if this project helps you
  • 🀝 Share with teammates and (golang) programming friends
  • πŸ“ Write tech blogs about development tools and workflows - we provide content writing support
  • 🌟 Join the ecosystem - committed to supporting open source and the (golang) development scene

Have Fun Coding with this package! πŸŽ‰πŸŽ‰πŸŽ‰


GitHub Stars

Stargazers

About

use rocket MQ in Golang

Resources

License

Stars

Watchers

Forks

Packages

No packages published